Building a distributed worker pipeline

Let’s build a worker pipeline where Steps that will execute specific job types, and can be scaled individually.

Here is an overview of the socket architecture:

../../_images/zmq-pipeline-manager.png

Coding the pipeline entity

The main pipeline contains the following sockets:

  • a SUB socket where it will bind() at the given bind_address and subscribe to Step events
  • a REP socket where it will respond to pipeline execution bind() at the given reply_address and reply with a job id for later status querying
  • a REP socket where it will respond to pipeline execution bind() at the given reply_address
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import zmq.green as zmq
from agentzero.core import SocketManager


class Pipeline(object):
    steps = []

    def __init__(self):
        self.context = zmq.Context()
        self.sockets = SocketManager(zmq, self.context)
        self.sockets.create("pipe-sub", zmq.SUB)
        self.sockets.create("pipe-in", zmq.PULL)
        self.sockets.create("pipe-out", zmq.PUSH)
        self.children = []

Coding the step entity

A Step contains a PUB socket where it will send the following events:

  • announce its JobType as well as its PUSH/PULL address pair
  • announce failed jobs, so that they can be auto-recovered later
  • announce succeeded jobs
  • announce exceptions and auto-schedule a later retry
  • live metrics
  • live logging output