在组件中使用线程和进程
在某些复杂情况先,我们希望在组件中使用多线程或多进程。由于算盘 sdk 基于 gevent 开发,所有不能直接使用 python 自带的 threading 和 subprocess 库,需要用 gevent 的 threading 和 subprocess 库代替。
gevent 的 threading 和 subprocess 库与 python 标准的 threading 和 subprocess 接口基本兼容,只需要修改 import 就能支持。
!!! warning "警告"
直接使用 python 标准的 threading 和 subprocess 会造成组件的阻塞或崩溃。
使用多线程
直接看代码
import suanpan
import gevent
from suanpan.app import app
from suanpan.app.arguments import String
from suanpan.log import logger
# 使用 gevent 的 threadpool 库
from gevent.threadpool import ThreadPool
# 初始化一个 ThreadPool
pool = ThreadPool(4)
start = 0
def long_time_task(t, desc=None):
    # 一个耗时比较长的任务
    logger.debug(f'task: {desc}')
    gevent.sleep(t)
@app.input(String(key="inputData1"))
@app.output(String(key="outputData1"))
def HelloWorld(context):
    global start
    jobs = []
    for _ in range(4):
        # 使用线程池执行 long_time_task,4 个任务将在 10 秒后执行完
        job = pool.spawn(long_time_task, 10, f"job {start}")
        start += 1
        # 保存派生的任务对象
        jobs.append(job)
    # 等待任务完成,这里请明确等待上面线程池派生的任务
    gevent.wait(jobs)
    return "Hello world"
if __name__ == "__main__":
    suanpan.run(app)
使用进程
代码如下
import suanpan
from suanpan.app import app
from suanpan.app.arguments import String
# 使用 gevent 的 subprocess 库
from gevent import subprocess
@app.input(String(key="inputData1"))
@app.output(String(key="outputData1"))
def HelloWorld(context):
    args = context.args
    # 派生 2 个进程
    sub = subprocess.Popen(['sleep 10 && echo finish'], stdout=subprocess.PIPE, shell=True)
    sub2 = subprocess.Popen(['sleep 12 && echo finish2'], stdout=subprocess.PIPE, shell=True)
    
    # 打印输出
    out, err = sub.communicate()
    print('output of process1', out.rstrip())
    out2, err = sub2.communicate()
    print('output of process2', out2.rstrip())
    return "Hello world"
if __name__ == "__main__":
    suanpan.run(app)