跳到主要内容

在组件中使用线程和进程

在某些复杂情况先,我们希望在组件中使用多线程或多进程。由于算盘 sdk 基于 gevent 开发,所有不能直接使用 python 自带的 threading 和 subprocess 库,需要用 gevent 的 threading 和 subprocess 库代替。

gevent 的 threading 和 subprocess 库与 python 标准的 threading 和 subprocess 接口基本兼容,只需要修改 import 就能支持。

!!! warning "警告"

直接使用 python 标准的 threading 和 subprocess 会造成组件的阻塞或崩溃。

使用多线程

直接看代码

import suanpan
import gevent
from suanpan.app import app
from suanpan.app.arguments import String
from suanpan.log import logger
# 使用 gevent 的 threadpool 库
from gevent.threadpool import ThreadPool

# 初始化一个 ThreadPool
pool = ThreadPool(4)
start = 0


def long_time_task(t, desc=None):
# 一个耗时比较长的任务
logger.debug(f'task: {desc}')
gevent.sleep(t)


@app.input(String(key="inputData1"))
@app.output(String(key="outputData1"))
def HelloWorld(context):
global start

jobs = []
for _ in range(4):
# 使用线程池执行 long_time_task,4 个任务将在 10 秒后执行完
job = pool.spawn(long_time_task, 10, f"job {start}")
start += 1
# 保存派生的任务对象
jobs.append(job)

# 等待任务完成,这里请明确等待上面线程池派生的任务
gevent.wait(jobs)

return "Hello world"


if __name__ == "__main__":
suanpan.run(app)

使用进程

代码如下

import suanpan
from suanpan.app import app
from suanpan.app.arguments import String
# 使用 gevent 的 subprocess 库
from gevent import subprocess


@app.input(String(key="inputData1"))
@app.output(String(key="outputData1"))
def HelloWorld(context):
args = context.args

# 派生 2 个进程
sub = subprocess.Popen(['sleep 10 && echo finish'], stdout=subprocess.PIPE, shell=True)
sub2 = subprocess.Popen(['sleep 12 && echo finish2'], stdout=subprocess.PIPE, shell=True)

# 打印输出
out, err = sub.communicate()
print('output of process1', out.rstrip())
out2, err = sub2.communicate()
print('output of process2', out2.rstrip())

return "Hello world"


if __name__ == "__main__":
suanpan.run(app)