用户指南
基本概念
算盘基于雪浪团队首创自研的全混合流计算技术,采用数据流编程方式,组件间的单向连线即代表数据流向,决定了组件的执行顺序。组件 SDK 的主要作用就是包装流处理过程,为组件开发者屏蔽算盘平台底层的细节,让组件开发者能够聚焦在组件功能的实现,简化组件开发。
基于组件 SDK,组件开发者的主要工作就是根据收到的上游消息,编写自己的逻辑代码,将产生的结果输出。
安装
如果你使用都是线上算盘的 VS Code - Python
组件来编辑组件代码,此环境中已经包含了组件开发需要的包,不需要另外安装 sdk;其他环境的话,首先需要安装 suanpan sdk。
pip install suanpan-core
!!! info "提示" suanpan-core 支持 python 3.7 ~ 3.9,同时支持 Linux 和 Windows 环境。我们开发的组件一般基于 python 3.7,所以推荐也使用 python 3.7 开发。
组件代码
下面是一个简单组件的代码,虽然简单,但包含了组件要处理的基本内容。
welcome.py
import suanpan
from suanpan.app import app
from suanpan.app.arguments import String, Json
@app.input(Json(key="inputData1"))
@app.param(String(key="param_prefix", alias="prefix"))
@app.output(String(key="outputData1", alias="result"))
def hello_world(context):
args = context.args
return f'Welcome {args.prefix} {args.inputData1}!'
if __name__ == "__main__":
suanpan.run(app)
代码说明
import suanpan
import 算盘 SDK,由于suanpan
是基于 gevent 实现的,import suanpan
的过程中会自动调用gevent.monkey.patch_all
,{==所以请确保在代码入口最开始的地方import suanpan
,以免遇到问题。==}@app.input
定义了一个 Json 类型的输入桩,组件中定义的输入桩会自动接收上游消息,将消息转化为 python 内部数据格式。sdk 使用装饰器的方式,将输入桩信息加到 app 里;组件的输入桩有 String、Json 等多种类型,一个组件也可以有多个输入桩。@app.output
定义了一个 String 类型的输出桩,组件中定义的输出桩会自动转换hello_world
返回的结果,传输到下游节点 。sdk 使用装饰器的方式,将输出桩信息加到 app 里;组件的输出桩有 String、Json 等多种类型,一个组件也可以有多个输出桩。@app.param
定义了组件需要的配置参数,这些参数在组件执行之前配置,可以定义组件不同的执行逻辑。hello_world
是用户定义的消息处理函数,用户只需要专注于实现这个函数。在这里可以实现简单如上的字符串的拼接,也可以做出基于深度神经网络的降阶算法。- 参数
context
是一个字典类型,包含了每次传入输入消息和配置的参数。args.inputData1
是输入的消息数据,args.prefix
是在算盘右面板配置参数值。
组件的生命周期
除了上面定义的 hello_world
消息处理逻辑,python sdk 还有其他的接口,可以让用户在组件不同的生命周期根据业务定义不同的处理函数。
Hook 接口 | Hook 说明 |
---|---|
app.beforeInit | 定义组件初始化之前的执行方法 |
app.afterInit | 定义组件初始化之后的执行方法 |
app.beforeCall | 定义消息处理之前的执行方法 |
app.afterCall | 定义消息处理之后的执行方法 |
示例代码:
import suanpan
from suanpan.app import app
from suanpan.arguments import String
@app.beforeInit
def app_before_init():
# 定义组件初始化之前的执行方法
print("app_before_init")
@app.afterInit
def app_after_init(context):
# 定义组件初始化之后的执行方法
print("app_after_init")
@app.beforeCall
def app_before_call(context):
# 定义消息处理之前的执行方法
print("app_before_call")
@app.afterCall
def app_after_call(context):
# 定义消息处理之后的执行方法
print("app_after_call")
if __name__ == "__main__":
suanpan.run(app)