MARO 三层框架
接下来展示的是第三层 Distibuted toolkit
MARO Distibuted Toolkit 遵循 message-passing 模式,即不同组件之间的协作基于消息发送和接收
典型的 master/worker 分布式程序通常包含以下步骤
- master 会将任务(w/ or w/o data)发送到 worker
- worker 将在其本地计算环境或本地设备中完成任务
- worker 将计算结果返回到 master
根据实际需要,主控组件和工作组件之间的通信方式可以是同步的,也可以是异步的
关键部件
Comunication
大致功能预览
提供通用的消息传递接口
- send, receive
- broadcast
- scatter
通信组件使用 可替换 的通信协议驱动程序来适应不同的通信协议栈
- TCP/IP
- InfiniBand
Peer Discovering
部分故障恢复
条件事件自动调度
Proxy
Proxy 提供通信原语的实现,是通信操作接口,是通信组件的主要实体
Proxy 默认使用 ZeroMQ 框架
Proxy 为基于 Redis 的 peer discovering 提供支持
分布式通信原语常见操作如下
Broadcast
Scatter
Reduce (强调聚合之后处理)
Gather (单纯聚合没有额外处理)
All Reduce
All Gather
Reduce Scatter
All to All
Message
用于打包组件之间的通信内容,消息实例的主要属性包括
- tag:自定义属性,可用于通过 conditional event register table 实现自动调度逻辑
- source:message 发送者的别名
- destination:message 接收者的别名
- payload:用于远程函数调用的 Python 对象
- session_id(自动生成):特定会话的 UUID ,一个会话可能包含多条消息
- message_id(自动生成):特定消息的 UUID
Example
1 2 3 4 5 6
from maro.communication import Message message = Message(tag="check_in", source="worker_001", destination="master", body="")
Session Message
MARO 为常见的分布式场景提供了两种预定义的会话类型
Task Session
存在 master 和 worker 关系
用于描述从 master 发送到 worker 的 computing task
- master 将 task 发送给 worker
- 一旦 worker 收到 task ,worker 就开始执行 task
- worker 将 computing result 返回给 master
Notification Session
sender 和 receiver 关系
用于信息同步
- sender 发送 notification message
- receiver 接收 notification message
session 的每个阶段由 proxy 在内部维护
Example
1 2 3 4 5 6 7 8 9 10 11 12 13
from maro.communication import SessionMessage, SessionType task_message = SessionMessage(tag="sum", source="master", destination="worker_001", body=[0, 1, 2, ...], session_type=SessionType.TASK) notification_message = SessionMessage(tag="check_out", source="worker_001", destination="master", body="", session_type=SessionType.NOTIFICATION)
MARO 通信原语实际接口
- receive:用于持续接收消息 receive_by_id:仅接收具有给定 session ID 的消息
- send:单播,这是一种阻塞、一对一的发送模式,监视并收集来自远程对等方的回复消息
- isend:非阻塞版的 send ,将立即返回 message session ID,该 ID 可由 receive_by_id 使用
- scatter:send 的高级版本,用于向 peer 发送消息,并监视和收集来自 peer 的回复消息,不是真正的多播,每条消息都会经过完整的 TCP/IP 堆栈(ZeroMQ driver),如果要发送的消息完全相同,并且想要更好的性能,请改用 broadcast 接口
- iscatter:非阻塞版本的 scatter ,message session ID 将立即返回,可由 receive_by_id 使用
- broadcast:阻塞,用于向所有订阅者广播消息,将监视并收集所有订阅者的回复消息
- ibroadcast:非阻塞版本的 broadcast ,相关 message session ID 将立即返回,可供 receive_by_id 使用
Conditional Event Register Table
- 提供消息自动发送机制
- 通过将 conditional event 和相关的 handler function 注册到注册表中,当 conditional event 满足时,handler function 将与接收到的消息一起自动执行
conditional event 用于声明自动触发相关 handler function 所需的消息组
unit event 是条件事件中的最小组件,声明格式分三段
- source:用于声明所需的消息源
- tag:消息实例的属性
- amount:所需的消息实例量
1 2 3
unit_event_abs = "worker:update:10" unit_event_rel = "worker:update:60%"
AND OR 操作支持更复杂的业务逻辑
1 2 3 4 5 6 7 8 9 10 11
combined_event_and = ("worker_01:update:2", "worker_02:update:3", "AND") combined_event_or = ("worker_03:update:1", "worker_04:update:5", "OR") combined_event_mix = (("worker_01:update:2", "worker_02:update:3", "AND"), "worker_03:update:1", "OR")
Handler function 是绑定到特定 conditional event 的用户定义的回调函数,当满足事件的条件时,相关消息将被发送到处理程序函数执行
1 2 3 4 5 6 7 8 9 10 11
# A common handler function signature def handler(that, proxy, messages): """ Conditional event handler function. Args: that: local instance reference, which needs to be operated. proxy: the proxy reference for remote communication. messages: received messages. """ pass
Distributed Decorator
从本地函数类生成分布式 worker 类的帮助程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
from maro.communication import dist, Proxy # Initialize proxy instance for remote communication. proxy = Proxy(group_name="master-worker", component_type="worker", expected_peers=[("master", 1)]) # Declare the trigger condition of rollout event. rollout_event = "master:rollout:1" # Implement rollout event handler logic. def on_rollout(that, proxy, messages): pass # Assemble event-handler directory. handler_dict = {rollout_event: on_rollout} # Convert a local functional class to a distributed one. @dist(proxy, handler_dict) class Worker: def __init__(self): pass