MARO 三层框架


接下来展示的是第三层 Distibuted toolkit

  • MARO Distibuted Toolkit 遵循 message-passing 模式,即不同组件之间的协作基于消息发送接收

  • 典型的 master/worker 分布式程序通常包含以下步骤

    • master 会将任务(w/ or w/o data)发送到 worker
    • worker 将在其本地计算环境或本地设备中完成任务
    • worker 将计算结果返回到 master

  • 根据实际需要,主控组件和工作组件之间的通信方式可以是同步的,也可以是异步的


关键部件

  • Comunication

    • 大致功能预览

      • 提供通用的消息传递接口

        • send, receive
        • broadcast
        • scatter

      • 通信组件使用 可替换 的通信协议驱动程序来适应不同的通信协议栈

        • TCP/IP
        • InfiniBand

      • Peer Discovering

      • 部分故障恢复

      • 条件事件自动调度


    • Proxy


      • Proxy 提供通信原语的实现,是通信操作接口,是通信组件的主要实体

      • Proxy 默认使用 ZeroMQ 框架

      • Proxy 为基于 Redis 的 peer discovering 提供支持

      • 分布式通信原语常见操作如下


        • Broadcast





        • Scatter





        • Reduce (强调聚合之后处理)




        • Gather (单纯聚合没有额外处理)




        • All Reduce




        • All Gather




        • Reduce Scatter


        • All to All


    • Message

      • 用于打包组件之间的通信内容,消息实例的主要属性包括

        • tag:自定义属性,可用于通过 conditional event register table 实现自动调度逻辑
        • source:message 发送者的别名
        • destination:message 接收者的别名
        • payload:用于远程函数调用的 Python 对象
        • session_id(自动生成):特定会话的 UUID ,一个会话可能包含多条消息
        • message_id(自动生成):特定消息的 UUID

      • Example

        1
        2
        3
        4
        5
        6
        
        from maro.communication import Message
        
        message = Message(tag="check_in",
                          source="worker_001",
                          destination="master",
                          body="")
        

    • Session Message

      • MARO 为常见的分布式场景提供了两种预定义的会话类型

        • Task Session

          • 存在 master 和 worker 关系

          • 用于描述从 master 发送到 worker 的 computing task

            • master 将 task 发送给 worker
            • 一旦 worker 收到 task ,worker 就开始执行 task
            • worker 将 computing result 返回给 master

        • Notification Session

          • sender 和 receiver 关系

          • 用于信息同步

            • sender 发送 notification message
            • receiver 接收 notification message

      • session 的每个阶段由 proxy 在内部维护

      • Example

         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        
        from maro.communication import SessionMessage, SessionType
        
        task_message = SessionMessage(tag="sum",
                                      source="master",
                                      destination="worker_001",
                                      body=[0, 1, 2, ...],
                                      session_type=SessionType.TASK)
        
        notification_message = SessionMessage(tag="check_out",
                                              source="worker_001",
                                              destination="master",
                                              body="",
                                              session_type=SessionType.NOTIFICATION)
        

    • MARO 通信原语实际接口

      • receive:用于持续接收消息 receive_by_id:仅接收具有给定 session ID 的消息
      • send:单播,这是一种阻塞、一对一的发送模式,监视并收集来自远程对等方的回复消息
      • isend:非阻塞版的 send ,将立即返回 message session ID,该 ID 可由 receive_by_id 使用
      • scatter:send 的高级版本,用于向 peer 发送消息,并监视和收集来自 peer 的回复消息,不是真正的多播,每条消息都会经过完整的 TCP/IP 堆栈(ZeroMQ driver),如果要发送的消息完全相同,并且想要更好的性能,请改用 broadcast 接口
      • iscatter:非阻塞版本的 scatter ,message session ID 将立即返回,可由 receive_by_id 使用
      • broadcast:阻塞,用于向所有订阅者广播消息,将监视并收集所有订阅者的回复消息
      • ibroadcast:非阻塞版本的 broadcast ,相关 message session ID 将立即返回,可供 receive_by_id 使用

    • Conditional Event Register Table

      • 提供消息自动发送机制
      • 通过将 conditional event 和相关的 handler function 注册到注册表中,当 conditional event 满足时,handler function 将与接收到的消息一起自动执行

      ![](https://raw.githubusercontent.com/HCY-ASLEEP/picture-bed/main/picture-bed/register_table.register.svg)
      • conditional event 用于声明自动触发相关 handler function 所需的消息组

      • unit event 是条件事件中的最小组件,声明格式分三段

        • source:用于声明所需的消息源
        • tag:消息实例的属性
        • amount:所需的消息实例量
        1
        2
        3
        
        unit_event_abs = "worker:update:10"
        
        unit_event_rel = "worker:update:60%"
        
      • AND OR 操作支持更复杂的业务逻辑

         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        
        combined_event_and = ("worker_01:update:2",
                  "worker_02:update:3",
                  "AND")
        
        combined_event_or = ("worker_03:update:1",
                              "worker_04:update:5",
                              "OR")
        
        combined_event_mix = (("worker_01:update:2", "worker_02:update:3", "AND"),
                              "worker_03:update:1",
                              "OR")
        
      • Handler function 是绑定到特定 conditional event 的用户定义的回调函数,当满足事件的条件时,相关消息将被发送到处理程序函数执行

         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        
        # A common handler function signature
        def handler(that, proxy, messages):
            """
                Conditional event handler function.
        
                Args:
                    that: local instance reference, which needs to be operated.
                    proxy: the proxy reference for remote communication.
                    messages: received messages.
            """
            pass
        

    • Distributed Decorator

      • 从本地函数类生成分布式 worker 类的帮助程序

         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        
        from maro.communication import dist, Proxy
        
        # Initialize proxy instance for remote communication.
        proxy = Proxy(group_name="master-worker",
                      component_type="worker",
                      expected_peers=[("master", 1)])
        
        # Declare the trigger condition of rollout event.
        rollout_event = "master:rollout:1"
        
        # Implement rollout event handler logic.
        def on_rollout(that, proxy, messages):
            pass
        
        # Assemble event-handler directory.
        handler_dict = {rollout_event: on_rollout}
        
        # Convert a local functional class to a distributed one.
        @dist(proxy, handler_dict)
        class Worker:
            def __init__(self):
                pass