很高兴和你相遇
这里正在记录我的所思所学
订阅免费邮件通讯接收最新内容
首页 归档 想法 工具 通讯 播客 简历 主页

小议 linux 并行方法

─=≡Σ(((つ •̀ω•́) つ 车速飚起来,坐稳扶好

前几天朋友圈各种神仙「打架」秀了一波在 R 里分组统计的骚操作,思路总结起来大致是:split-apply-combine。果子还给我直接来了一次需求提速,几个月前需要十几分钟完成的操作如今只要十几秒就拿下了,特别生猛。

在 linux 环境下,不少软件和命令其实也都面临着如何提速的问题。我现在还记得大概一年前 jimmy 问过一个问题:要从一个很大的文本中提取出一批想要的行,如果用 grep -f hewantfile.txt rawdata.txt 来 grep 的话实在是太慢了。当然,这个需要用 R 或者 Python 来做的话,极短时间就可以完成。但如果一定要用 grep 来实现,一种可行的加速方法就是把可拆的元素最大程度拆分,极端的说,就是把 1 个 1 万行的文本拆成 1 万次分析,只运行 1 次。

当然,这些操作都是在你自己一台机器上利用自身多 CPU 特性完成的,如果你能同时操控 1 万台(即使性能很 low 的)机器,那就可以通过 remote 的方式来批量运行,这时「split-apply-combine」的分组思想就近似变成了「MapReduce」的并行思想,而在 linux 也有不少方法可以实现类似这样的效果。

本文所提到的在 linux 中并行主要针对两种需求:一种是只能单线程工作的命令比如 grep 和 sed 以及 bzip2 这类;另一种是一些虽然支持多线程但是并不能充分利用分配线程数的软件,比如 trimmomatic 在实际使用的时候给它 5 个或者 10 个甚至 20 个线程,但每次用到的就是两三个。

并行的使用场景也有两种:多文件和大文件。通常又可以把大文件的场景转换为多文件的场景去解决。

首先介绍下文会用到的测试文件:

  • 37923 行的测试文件 file.txt(其实是 gff 格式)
  • 5000 行的待查找文件 need.id (其实是转录本 id)

为了模拟多文件处理场景,我们把 file.txt 分割成 100 个小文件,每个文件都是以 split_file 开头。

split -n l/100 file.txt -d -a 3 split_file
# -n l/100 这个参数的写法略讲究,目的是为了防止 split 把某一行给拆开在两个文件。

最耗时做法:大文件直接 grep 的常规操作,在我当时的测试环境下需要 5 分多钟才能完成。

time grep -w -f need.id file.txt > filter.out

real	5m15.913s
user	5m15.772s
sys	0m0.140s

shell 脚本

多文件处理

说到 linux 批量运行命令,可能最简单的就是在命令结尾使用 & 从而让命令在后台执行紧接着再运行下一条命令。比如:

for i in `ls split_file*`
do
grep -w -f need.id $i > ${i}.out &
done

这种命令处理方法要比直接对 100 个文件因此操作的总时长节省很多。但是也有一个问题,循环一旦结束 grep 的进程仍在后台执行,如果想让所有命令都结束之后这个脚本在结束,可以上一点技巧。

for i in `ls split_file*`
do
    grep -w -f need.id $i > ${i}.out &
    pid+=("$!")
done
wait ${pid[@]}

在这里,使用 $! 来获得进程的 PID,$! 保存着最近一个后台进程的 PID,然后放入数组,用 wait 命令等待这些进程结束。但是这种方法一旦面临文件远远超过自己的线程数时容易失控,造成服务器卡顿甚至卡死。比较理想的改进是能够识别这个循环执行次数,当达到某个数量时就停止添加新任务等待前述命令结束。

如下所示,一旦循环执行次数超过了 19,就让循环等一等。

t=0
for i in `ls split_file*`
do
    echo $i
    grep -w -f need.id $i > ${i}.out &
    sleep 1s
    t=$(($t+1))
    if [[ $t -gt 19 ]]
    then
        echo $t wait a moment
        wait
        t=0
    fi
done

大文件处理

大文件处理的思路其实和多文件很类似,只是需要我们提前把多文件拆分为大文件,例如我们把一个大文件利用 split 拆分为 100 个小文件,每次利用 20 个 cpu 同时运行,最后再把结果进行合并。这样会比直接操作一个大文件节省非常多的时间。

t=0
for i in `ls split_file*`
do
    echo $i
    grep -w -f need.id $i > ${i}.out &
    t=$(($t+1))
    if [[ $t -gt 19 ]]
    then
        echo $t
        wait
        t=0
    fi
done
wait
cat split_file*out > final.all.out && rm -f split_file*out

# 脚本运行时间
#real	0m29.666s
#user	7m41.872s
#sys	0m3.312s

PPSS

当然,上面的脚本是我们的入门操作,如果仔细推敲存在不少问题而且不够灵活,例如不能时时刻刻充分利用好设定的最大线程数(上述为 20)。很早之前就有人写过一个更加复杂的 shell 脚本 PPSS 来实现并行操作,这个脚本一共有 3000 多行,具体用法可以查看其 GitHub 的说明。

以下是一个明令行的帮助说明:

ppss
|P|P|S|S| Distributed Parallel Processing Shell Script 2.60

usage: ./ppss [ -d <sourcedir> | -f <sourcefile> ]  [ -c '<command>"$ITEM"' ]
                 [ -C <configfile> ] [ -j ] [ -l <logfile> ] [ -p <# jobs> ]
                 [ -D <delay> ] [ -h ] [ --help ] [ -r ]

Examples:
                 ./ppss -d /dir/with/some/files -c 'gzip'
                 ./ppss -d /dir/with/some/files -c 'cp"$ITEM"/tmp' -p 2
                 ./ppss -f <file> -c 'wget -q -P /destination/directory"$ITEM"' -p 10

除了利用 bash 脚本,已经有大量写好的工具来完成这个需求,在这篇文章里简要介绍几个用的相对多的工具。

parallel

GNU parallel 这个命令在一部分服务器中可能没有被预装,通过 官网 进行下载安装。它应该是目前使用量最广的 linux 端并行工具,后续大多数工具都是在其思路上利用不同的语言进行开发,并且都是以它作为标准进行比较。

GNU parallel 的主要目的就是用来代替 xargs (xargs -P 可以实现并行处理) 和 for 循环这些操作,所以大多数用 for 来写的循环都可以使用 GNU parallel 来进行改写提速(解决多文件问题),同时它也可以把输入的大文件进行 block 切分再并行的进行处理(解决大文件问题)。

parallel 可以支持各种格式的输入,比如 stdin、单一文件、多个文件,命令行等等。可以输出 stdout,整合结果或者未整合结果。

多文件处理

# 多文件压缩
ls split_file* |parallel gzip

# 多文件解压缩
ls split_file* |parallel gunzip

再比如一次创建 20 个目录

seq 20 | parallel mkdir temp_{}

大文件处理

time cat file.txt |parallel --pipe grep -w -f need.id > temp.txt

real	2m0.893s
user	5m31.536s
sys	0m0.344s

时间从原始的 5 分多降低到 2 分钟,如果感觉上面的数据处理还不够快那么不要用默认参数可以有另一种写法,20s 搞定。

time parallel --pipepart -a file.txt --block -10 grep -w -f need.id > temp2.txt

real	0m21.260s
user	9m32.360s
sys	0m12.236s

xjobs

xjobs 也是一个平时会偶尔使用的命令,它可以直接执行命令行中的命令,也可以执行一个包含多个命令行的文件。然后自己根据分配的线程数进行分配,每一个命令结束就会启动新的命令,保证 CPU 的利用效率。

多文件处理


# 多文件压缩
ls split_file* |xjobs -j 20 gzip
# -j 指定处理线程数

# 多文件解压缩
ls split_file* |xjobs -j 20 gunzip

大文件处理

xjobs 如果需要进行大文件处理,需要首先对文件自行拆分,这里直接使用 split 好的文件。

time ls split_file0* |xjobs -j20 -v0 grep -w -f need.id |grep -v bin > temp4.txt

real	0m20.344s
user	6m5.264s
sys	0m2.748s

# xjobs 直接使用 sdtout 的时候有一个问题就是会写出执行的命令,需要在输出的结果中把命令行除去。

rush

rush 是 seqkit 开发者(江湖人称爪哥)的作品。因为爪哥本身做生物信息,在介绍这个工具时他还提到了在生物信息中的应用示例。而这个工具也体现出了爪哥一贯的风格,说明文档的用法示例清晰移动,给出各了种参数对应 GNU parallel 的功能,安排的明明白白。

多文件处理

多文件处理最基础的版本和其它工具类似,不过其借鉴了 awk 的赋值方法,可以利用 -v 给变量进行赋值。另外,如果命令被中断还可以通过 -c 继续进行。

ls split_file* |rush -j 20 'gunzip' -c

大文件处理

rush 不支持直接传 stdin。你对一个文件 grep 操作,没法通过 rush 来并行。如果文件多的话,可以 ls *.txt | rush 'cat {} | grep' 这样多文件并行。
--- 爪哥本爪

我的理解:如果需要进行大文件的处理,可以自行对文件进行拆分,和 xjobs 类似,问题就再一次转换为多文件处理。

time ls split_file0* |rush -j 20 'cat {} |grep -w -f need.id || true' > temp5.txt
# 在 rush 中,直接使用 grep 会有问题,需要按照 grep foo bar || true 这样的格式来使用
real	0m18.891s
user	5m48.664s
sys	0m2.684s

生物信息应用

因为爪哥特意给了一个 bwa 比对的例子,在这里直接引用一下,供参考。


A bioinformatics example: mapping with bwa, and processing result with samtools:

 $ tree raw.cluster.clean.mapping
 raw.cluster.clean.mapping
 ├── M1
 │   ├── M1_1.fq.gz -> ../../raw.cluster.clean/M1/M1_1.fq.gz
 │   ├── M1_2.fq.gz -> ../../raw.cluster.clean/M1/M1_2.fq.gz
 ...

 $ ref=ref/xxx.fa
 $ threads=25
 $ ls -d raw.cluster.clean.mapping/* \
     | rush -v ref=$ref -v j=$threads \
         'bwa mem -t {j} -M -a {ref} {}/{%}_1.fq.gz {}/{%}_2.fq.gz > {}/{%}.sam; \
         samtools view -bS {}/{%}.sam > {}/{%}.bam; \
         samtools sort -T {}/{%}.tmp -@ {j} {}/{%}.bam -o {}/{%}.sorted.bam; \
         samtools index {}/{%}.sorted.bam; \
         samtools flagstat {}/{%}.sorted.bam > {}/{%}.sorted.bam.flagstat; \
         /bin/rm {}/{%}.bam {}/{%}.sam;' \
         -j 2 --verbose -c -C mapping.rush
Since {}/{%} appears many times, we can use preset variable (macro) to simplify it:

 $ ls -d raw.cluster.clean.mapping/* \
     | rush -v ref=$ref -v j=$threads -v p='{}/{%}' \
         'bwa mem -t {j} -M -a {ref} {p}_1.fq.gz {p}_2.fq.gz > {p}.sam; \
         samtools view -bS {p}.sam > {p}.bam; \
         samtools sort -T {p}.tmp -@ {j} {p}.bam -o {p}.sorted.bam; \
         samtools index {p}.sorted.bam; \
         samtools flagstat {p}.sorted.bam > {p}.sorted.bam.flagstat; \
         /bin/rm {p}.bam {p}.sam;' \
         -j 2 --verbose -c -C mapping.rush

其它

在 GNU parallel 的主页有一个详细的其它并行工具和 GNU parallel 的 比较说明,如果闲来没事可以仔细研究,如果想进一步学习,可以参考 GNU parallel 的 详细官方说明

如果要推荐一种方法,十几万行的数据挑出几万行的内容,在 R 里 merge 一下或者 left_join 一下,基本没有感觉。


one more thing

以为文章到这里就结束了么,其实还没有……

如果你也感觉 grep 单线程处理这样大规模的数据集太慢了,那可能是还不太会用 grep。比如同样的需求,多加一个参数就可以让其无感完成,比上述所有方法都快出几个数量级。

time grep -F -w -f need.id file.txt > filter2.out

real	0m0.026s
user	0m0.016s
sys	0m0.008s

本文作者:思考问题的熊

版权声明:本博客所有文章除特别声明外,均采用 知识共享署名 - 非商业性使用 - 禁止演绎 4.0 国际许可协议 (CC BY-NC-ND 4.0) 进行许可。

如果你对这篇文章感兴趣,欢迎通过邮箱或者微信订阅我的 「熊言熊语」会员通讯,我将第一时间与你分享肿瘤生物医药领域最新行业研究进展和我的所思所学所想点此链接即可进行免费订阅。


· 分享链接 https://kaopubear.top/blog/2019-05-17-linuxparallelbasic/