Building a distributed worker pipeline¶
Let’s build a worker pipeline where Steps
that will execute
specific job types, and can be scaled individually.
Here is an overview of the socket architecture:
Coding the pipeline entity¶
The main pipeline contains the following sockets:
- a
SUB
socket where it willbind()
at the givenbind_address
and subscribe toStep
events - a
REP
socket where it will respond to pipeline executionbind()
at the givenreply_address
and reply with a job id for later status querying - a
REP
socket where it will respond to pipeline executionbind()
at the givenreply_address
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | import zmq.green as zmq
from agentzero.core import SocketManager
class Pipeline(object):
steps = []
def __init__(self):
self.context = zmq.Context()
self.sockets = SocketManager(zmq, self.context)
self.sockets.create("pipe-sub", zmq.SUB)
self.sockets.create("pipe-in", zmq.PULL)
self.sockets.create("pipe-out", zmq.PUSH)
self.children = []
|
Coding the step entity¶
A Step
contains a PUB socket where it will send the following events:
- announce its
JobType
as well as its PUSH/PULLaddress
pair - announce failed jobs, so that they can be auto-recovered later
- announce succeeded jobs
- announce exceptions and auto-schedule a later retry
- live metrics
- live logging output