跳到主要内容

用户指南

基本概念

算盘基于雪浪团队首创自研的全混合流计算技术,采用数据流编程方式,组件间的单向连线即代表数据流向,决定了组件的执行顺序。组件 SDK 的主要作用就是包装流处理过程,为组件开发者屏蔽算盘平台底层的细节,让组件开发者能够聚焦在组件功能的实现,简化组件开发。

基于组件 SDK,组件开发者的主要工作就是根据收到的上游消息,编写自己的逻辑代码,将产生的结果输出。

安装

如果你使用都是线上算盘的 VS Code - Python 组件来编辑组件代码,此环境中已经包含了组件开发需要的包,不需要另外安装 sdk;其他环境的话,首先需要安装 suanpan sdk。

pip install suanpan-core

!!! info "提示" suanpan-core 支持 python 3.7 ~ 3.9,同时支持 Linux 和 Windows 环境。我们开发的组件一般基于 python 3.7,所以推荐也使用 python 3.7 开发。

组件代码

下面是一个简单组件的代码,虽然简单,但包含了组件要处理的基本内容。

welcome.py
import suanpan
from suanpan.app import app
from suanpan.app.arguments import String, Json


@app.input(Json(key="inputData1"))
@app.param(String(key="param_prefix", alias="prefix"))
@app.output(String(key="outputData1", alias="result"))
def hello_world(context):
args = context.args
return f'Welcome {args.prefix} {args.inputData1}!'


if __name__ == "__main__":
suanpan.run(app)

代码说明

  1. import suanpan import 算盘 SDK,由于 suanpan 是基于 gevent 实现的,import suanpan 的过程中会自动调用 gevent.monkey.patch_all,{==所以请确保在代码入口最开始的地方 import suanpan,以免遇到问题。==}
  2. @app.input 定义了一个 Json 类型的输入桩,组件中定义的输入桩会自动接收上游消息,将消息转化为 python 内部数据格式。sdk 使用装饰器的方式,将输入桩信息加到 app 里;组件的输入桩有 String、Json 等多种类型,一个组件也可以有多个输入桩。
  3. @app.output 定义了一个 String 类型的输出桩,组件中定义的输出桩会自动转换 hello_world 返回的结果,传输到下游节点 。sdk 使用装饰器的方式,将输出桩信息加到 app 里;组件的输出桩有 String、Json 等多种类型,一个组件也可以有多个输出桩。
  4. @app.param 定义了组件需要的配置参数,这些参数在组件执行之前配置,可以定义组件不同的执行逻辑。
  5. hello_world 是用户定义的消息处理函数,用户只需要专注于实现这个函数。在这里可以实现简单如上的字符串的拼接,也可以做出基于深度神经网络的降阶算法。
  6. 参数 context 是一个字典类型,包含了每次传入输入消息和配置的参数。args.inputData1 是输入的消息数据,args.prefix 是在算盘右面板配置参数值。

组件的生命周期

除了上面定义的 hello_world 消息处理逻辑,python sdk 还有其他的接口,可以让用户在组件不同的生命周期根据业务定义不同的处理函数。

Hook 接口Hook 说明
app.beforeInit定义组件初始化之前的执行方法
app.afterInit定义组件初始化之后的执行方法
app.beforeCall定义消息处理之前的执行方法
app.afterCall定义消息处理之后的执行方法

示例代码:

import suanpan
from suanpan.app import app
from suanpan.arguments import String


@app.beforeInit
def app_before_init():
# 定义组件初始化之前的执行方法
print("app_before_init")


@app.afterInit
def app_after_init(context):
# 定义组件初始化之后的执行方法
print("app_after_init")


@app.beforeCall
def app_before_call(context):
# 定义消息处理之前的执行方法
print("app_before_call")


@app.afterCall
def app_after_call(context):
# 定义消息处理之后的执行方法
print("app_after_call")


if __name__ == "__main__":
suanpan.run(app)