在组件中使用线程和进程
在某些复杂情况先,我们希望在组件中使用多线程或多进程。由于算盘 sdk 基于 gevent 开发,所有不能直接使用 python 自带的 threading 和 subprocess 库,需要用 gevent 的 threading 和 subprocess 库代替。
gevent 的 threading 和 subprocess 库与 python 标准的 threading 和 subprocess 接口基本兼容,只需要修改 import 就能支持。
!!! warning "警告"
直接使用 python 标准的 threading 和 subprocess 会造成组件的阻塞或崩溃。
使用多线程
直接看代码
import suanpan
import gevent
from suanpan.app import app
from suanpan.app.arguments import String
from suanpan.log import logger
# 使用 gevent 的 threadpool 库
from gevent.threadpool import ThreadPool
# 初始化一个 ThreadPool
pool = ThreadPool(4)
start = 0
def long_time_task(t, desc=None):
# 一个耗时比较长的任务
logger.debug(f'task: {desc}')
gevent.sleep(t)
@app.input(String(key="inputData1"))
@app.output(String(key="outputData1"))
def HelloWorld(context):
global start
jobs = []
for _ in range(4):
# 使用线程池执行 long_time_task,4 个任务将在 10 秒后执行完
job = pool.spawn(long_time_task, 10, f"job {start}")
start += 1
# 保存派生的任务对象
jobs.append(job)
# 等待任务完成,这里请明确等待上面线程池派生的任务
gevent.wait(jobs)
return "Hello world"
if __name__ == "__main__":
suanpan.run(app)
使用进程
代码如下
import suanpan
from suanpan.app import app
from suanpan.app.arguments import String
# 使用 gevent 的 subprocess 库
from gevent import subprocess
@app.input(String(key="inputData1"))
@app.output(String(key="outputData1"))
def HelloWorld(context):
args = context.args
# 派生 2 个进程
sub = subprocess.Popen(['sleep 10 && echo finish'], stdout=subprocess.PIPE, shell=True)
sub2 = subprocess.Popen(['sleep 12 && echo finish2'], stdout=subprocess.PIPE, shell=True)
# 打印输出
out, err = sub.communicate()
print('output of process1', out.rstrip())
out2, err = sub2.communicate()
print('output of process2', out2.rstrip())
return "Hello world"
if __name__ == "__main__":
suanpan.run(app)