跳到主要内容

事件通知

很多复杂模板中,需要跨节点进行配置变更的同步与响应,这部分数据无法很好的通过流计算进行串联编排,因为需要协同的各个节点在数据流逻辑上没有关系,如果通过传统数据流,需要在多个节点增加更多的输入输出端口,并增加多条连线,用于配置数据的传递,导致模板复杂度大大增加。事件通知功能使组件通过通用的发布订阅的模式完成该功能。

!!! info "提示"

事件通知功能在 sdk 0.18.3 之后支持

事件发送

import suanpan
from suanpan.app import app
from suanpan.app.arguments import String, Json

i = 0


@app.input(String(key="inputData1"))
@app.output(Json(key="outputData1"))
def HelloWorld(context):
args = context.args
global i

# 向其他组件发送事件
app.publish('hello', {'a': 'world', 'count': i})
i += 1

return "Hello World!"


if __name__ == "__main__":
suanpan.run(app)

app.publish 发送事件

事件订阅

import suanpan
from suanpan.app import app
from suanpan.app.arguments import String


@app.subscribe('hello')
def on_hello(event):
# 事件 hello 的处理函数
print('on event hello', event)


@app.input(String(key="inputData1"))
@app.output(String(key="outputData1"))
def HelloWorld(context):
args = context.args

return "Hello World!"


if __name__ == "__main__":
suanpan.run(app)

注册的时间在某些情况下想取消注册,可以调用 app.unsubscribe

import suanpan
from suanpan.app import app
from suanpan.app.arguments import String


@app.subscribe('hello')
def on_hello(event):
# 事件 hello 的处理函数
print('on event hello', event)
app.unsubscribe('hello')


@app.input(String(key="inputData1"))
@app.output(String(key="outputData1"))
def HelloWorld(context):
args = context.args

return "Hello World!"


if __name__ == "__main__":
suanpan.run(app)