“太乙”Parsl使用介绍:基于Python编写面向数据的大规模并行工作流

目录

“太乙”Parsl使用介绍:基于Python编写面向数据的大规模并行工作流

引言

Parsl是一个基于Python的开源并行编程库,赋能用户在各类不同的集群上便捷地并行运行面向数据的大规模工作流。Parsl可以并行运行纯Python的工作流,亦可并行调用其它命令行工具或软件。Parsl完全基于Python,无需使用其它语言来构建工作流(例如YAML),易于安装,并能自动与超算的作业管理器进行交互,无需手动编写作业脚本。在Parsl中,工作流的构建和资源的管理完全分开,赋能用户便捷地在不同的计算集群上移植和运行相同的工作流,实现"Write once, run anywhere",如图1所示。Parsl已应用于多个科学领域的工作流中,在多个大型超算集群部署和验证,如美国国家能源研究科学计算中心(NESRC)等。


图1:Parsl可支持多种类型的超算集群,包括太乙、启明及基于Slurm和PBS的集群等。


本文将通过一个例子介绍如何在南方科技大学计算中心集群上(以太乙为例)使用Parsl进行并行编程,太乙是基于LSF作业管理器的集群。除了LSF,Parsl支持在Slurm和PBS等作业管理器上运行。更多复杂工作流创建、作业管理器支持的介绍可在Parsl完整文档Parsl主页、或者HPDC 2019论文(获HPDC 2019最佳论文提名)上查看。如有Parsl在太乙使用的相关问题或者有兴趣基于Parsl展开进一步研究,欢迎通过以下方式联系:

在太乙上使用Parsl创建和运行一个简单的并行工作流

通常情况下,在太乙上使用Parsl可分为以下四个步骤:

  1. 安装Parsl:在登录节点配置Parsl所需的运行环境,注意配置的环境需在计算节点也能使用。
  2. 计算资源配置:配置所需CPU核的数目,Parsl将自动与LSF进行交互,请求计算资源。
  3. 创建Parsl apps:通过Python的装饰器标注哪些函数可以在计算节点并行运行。
  4. 创建Parsl工作流:基于上一步的Parsl apps构建DAG工作流。

Parsl自动将工作流的各个函数任务在计算节点上并行运行,每一个步骤具体如下:

步骤一:安装Parsl

Parsl 可以像其他 Python 包一样使用 pip 或 conda 安装。例如,在太乙上,我们可以通过Anaconda来管理对应的Python环境:

module load python/anaconda3/2020.11  # 读取太乙上的Anaconda
conda create --name parsl python=3.7  # 使用conda创建Python3.7环境
source activate parsl  # 激活conda环境
pip install git+https://github.com/Parsl/parsl.git  # Parsl最新版
注1:请务必安装parsl最新版。当前发行版(Release版本)运行在LSF控制的集群下可能会存在bug,因此务必按照上述指令进行安装。

注2:计算节点也需要安装有Parsl,所以请确保计算节点也可以访问上述Python环境。请参考步骤二配置计算节点的Python环境。

步骤二:计算资源配置

用户通过Parsl的 Config 配置应用需要的计算资源,例如需求的CPU核数量、每个节点的CPU核数量、单个任务需求的CPU核、walltime等,并通过parsl.load(config)读入该配置,Parsl会自动根据配置提交LSF作业(无需自己编写),请求计算节点。当获得计算节点时,Parsl会自动在各个节点上部署worker,每个worker将会负责接收并执行任务,一个worker只能同时执行一个任务,任务完成后返回结果并请求下一个任务,如图2所示。

图2:Parsl自动地在各个计算节点部署和执行任务。

下面是一个可在太乙上部署的配置例子:

import parsl
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.providers import LSFProvider
from parsl.launchers import MpiRunLauncher

taiyi_config = Config(
    executors=[
        HighThroughputExecutor(
            label="taiyi",
            cores_per_worker = 1,
            provider=LSFProvider(
                queue='debug',  # Use debug queue
                init_blocks=1,
                max_blocks=1,
                min_blocks=1,
                cores_per_block=80,  # Number of cores to request
                cores_per_node=40,  # Number of cores per node
                #  Start the Anaconda Environment on Compute Nodes
                worker_init="module load python/anaconda3/2020.11 mpi;source activate parsl",
                # Other LSF options, for exmaple, GPU
                scheduler_options='',
                walltime="00:20:00",
                request_by_nodes=False,
                bsub_redirection=True,
                # Use MPI to launch workers on each node
                # cpus-per-proc determines how many cpu cores on each node will be used
                # in general, cpus-per-proc is equal to cores_per_node above
                launcher=MpiRunLauncher(overrides="--map-by node --cpus-per-proc 40"),  #default to 40, since there are 40 cores on each node for Taiyi
                cmd_timeout=180,
            ),
        )
    ],
)

parsl.load(taiyi_config)

注:

  1. init_blocks,min_blocks,max_blocks 是三个可选类型参数。Parsl提供一个弹性请求资源的功能,一个block对应一个太乙作业。在应用在执行过程中,不同计算阶段可能需求不同的计算资源,这种情况下Parsl可自动根据计算负载的多少,弹性地请求或者取消作业。
  2. 因为太乙使用的LSF作业管理器,所以在配置文件中,我们使用 LSFProvider 来进行配置。如果用户期望在本地机器或者基于 Slurm 等作业管理器的计算集群,可以使用 LocalProvider 和 SlurmProvider 进行配置。更多类型集群配置参考Parsl官方配置文档
  3. cores_per_block 和 cores_per_node 分别对应一个 block(即一个作业)内所请求的CPU总核数和单个节点请求的CPU核数。这两个参数均为必填,其中cores_per_node不能为0。
  4. 在MpiRunLauncher中cpus-per-proc默认为40,即程序将利用每一个节点上至多40个CPU核,这也是太乙单个计算节点的CPU核数量。如有调整每个节点所请求的CPU核数上限的需求,请更改MpiRunLauncher参数cpus-per-proc的数量,通常与Config中cores_per_node的值一致。
  5. 计算节点也需要使用Parsl,因此我们需要在Config配置中声明计算节点使用的计算环境,例如这里我们需要激活之前创建的conda环境。
  6.  worker_init="module load python/anaconda3/2020.11 mpi;source activate parsl"
  7. 在 parsl.load(taiyi_config) 后,Parsl会在太乙上提交作业,并在所请求的计算节点上部署对应的workers并行运行任务。

步骤三:创建Parsl apps

用户可以通过Parsl提供的装饰器 python_app 和 bash_app 标注函数以创建Parsl apps,即表示这个函数是需要在计算节点上并行运行。其中python_app装饰python函数,bash_app可以通过命令行执行其他软件或者工具,并获取stdout。当用户使用装饰器标注并调用函数,Parsl将会自动地在各个计算节点上运行这些函数。例如,下面是一个基于Monte Carlo方法的Pi估计应用。本例子中只使用了python_app,如需执行命令行应用以及其他语言编写的应用,请参考Parsl官方文档关于bash_app的介绍。

from parsl.app.app import python_app, bash_app

@python_app
def pi(num_points):
    from random import random
    inside = 0
    for i in range(num_points):
        x, y = random(), random()
        if x**2 + y**2 < 1:
            inside += 1
    return (inside * 4 / num_points)

@python_app
def mean(inputs=[]):
    return sum(inputs) / len(inputs)

注意:由于python_app可能执行在不同的计算节点上,因此需要在python_app内部import依赖,并保证在计算节点上已安装有该依赖,即在计算节点配置好Python环境,详见步骤二。例如在该例子中,在pi函数内部import random。

步骤四:创建Parsl工作流

在装饰了需要并行的函数后,用户可以调用这些函数,调用被装饰的函数不会直接运行得到结果,而是立即返回一个Python的future object,表示的是尚在运行或者未完成的延迟计算。想要得到最后的结果,需要调用 result() 方法才会得到最终结果。

我们可以通过 future 的传递来构建可表示为有向无环图(Directed Acyclic Graph,DAG)的工作流,Parsl会自动地根据所构建的DAG工作流,将函数发送到有空闲资源的计算节点上并行运行。例如,下面这个例子中我们把 pi 函数返回的futures传递到mean函数中,也就是mean函数的运行将需要等待pi函数的完成。

# Execute 100 pi functions in parallel on the compute nodes
# Return a list of futures that can be passed into mean function
pi_futures = [pi(10**6) for i in range(100)]
mean_pi = mean(inputs=pi_futures)

# Fetch results by calling result()
print(mean_pi.result())

Parsl会根据futures的传递自动构建如下DAG(示意图,实际为100个pi函数调用):

总结:在安装好Parsl后,将上述所有代码(如下框)放在hello_parsl.py文件中,在太乙的登录节点上,通过 python hello_parsl.py 运行该程序。Parsl将在debug队列请求2个节点共80个CPU核,然后在这2个节点并行运行100个pi函数和最后的mean函数,并获取最后结果。

import parsl
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.providers import LSFProvider
from parsl.launchers import MpiRunLauncher

taiyi_config = Config(
    executors=[
        HighThroughputExecutor(
            label="taiyi",
            cores_per_worker = 1,
            provider=LSFProvider(
                queue='debug',  # Use debug queue
                init_blocks=1,
                max_blocks=1,
                min_blocks=1,
                cores_per_block=80,  # Number of cores to request
                cores_per_node=40,  # Number of cores per node
                #  Start the Anaconda Environment on Worker Nodes
                worker_init="module load python/anaconda3/2020.11 mpi;source activate parsl",
                # Other LSF options, for exmaple, GPU
                scheduler_options='',
                walltime="00:20:00",
                request_by_nodes=False,
                bsub_redirection=True,
                # Use MPI to launch workers on each node
                # cpus-per-proc determines how many cpu cores on each node will be used
                # in general, cpus-per-proc is equal to cores_per_node above
                launcher=MpiRunLauncher(overrides="--map-by node --cpus-per-proc 40"),  #default to 40, since there are 40 cores on each node for Taiyi
                cmd_timeout=180,
            ),
        )
    ],
)

parsl.load(taiyi_config)

from parsl.app.app import python_app, bash_app

@python_app
def pi(num_points):
    from random import random
    inside = 0
    for i in range(num_points):
        x, y = random(), random()
        if x**2 + y**2 < 1:
            inside += 1
    return (inside * 4 / num_points)

@python_app
def mean(inputs=[]):
    return sum(inputs) / len(inputs)

# Execute 100 pi functions in parallel on the compute nodes
# Return a list of futures that can be passed into mean function
pi_futures = [pi(10**6) for i in range(100)]
mean_pi = mean(inputs=pi_futures)

# Fetch results by calling result()
print(mean_pi.result())

Parsl的优势

Parsl具有以下优势:

比方说,上面同一个Pi估算的应用,我们既可以在太乙集群上运行,也可以在本地机器运行。如果我们想在本地机器运行,只需在步骤二:计算资源配置的时候,将Config内的LSFProvider更换为本地机器的LocalProvider即可,例如:

import parsl
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.providers import LocalProvider

local_config = Config(
    executors=[
        HighThroughputExecutor(
            label="local",
            max_workers=10,
            provider=LocalProvider(
                init_blocks=1,
                max_blocks=1,
                min_blocks=1,
                nodes_per_block=1,
                cmd_timeout=60,
            ),
        )
    ],
)

parsl.load(local_config)