SocketManager

class agentzero.SocketManager(zmq, context, serialization_backend=None, polling_timeout=10000, timeout=10)[source]

High-level abstraction for zeromq’s non-blocking api.

This component provides utility methods to create, retrieve, connect and bind sockets by name.

It can wait for a socket to become available in either receiving data, sending data or both at the same time.

Parameters:
  • zmq – a reference to the zmq module (either from import zmq or import zmq.green as zmq)
  • context – the context where the sockets will be created
  • serialization_backend – an instance of a valid agentzero.serializers.BaseSerializer. This is completely optional safe for the cases where you utilize the methods send_safe and recv_safe when communicating to other nodes.
  • polling_timeout – a float - how long to wait for the socket to become available, in miliseconds
  • timeout – default value passed to engage()

Note

An extra useful feature that comes with using a SocketManager is that you can use a SocketManager to create an application that dynamically connects to new nodes based on scaling instructions coming from other nodes

Warning

Always use the same context per each thread. If you are using gevent, please using a single instance for your whole main process, across all greenlets that you manage.

import zmq
from agentzero.core import SocketManager
from agentzero.serializers import JSON

context = zmq.Context()

sockets = SocketManager(zmq, context, serialization_backend=JSON())
sockets.ensure_and_connect(
     "requester",
     zmq.REQ,
     'tcp://192.168.2.42:5051',
     zmq.POLLIN | zmq.POLLOUT
)

create

SocketManager.create(name, socket_type)[source]

Creates a named socket by type. Can raise a SocketAlreadyExists.

Returns the socket itself

Parameters:
  • name – the socket name
  • socket_type – a valid socket type (i.e: zmq.REQ, zmq.PUB, zmq.PAIR, …)

get_by_name

SocketManager.get_by_name(name)[source]

Returns an existing socket by name. It can raise a SocketNotFound exception.

Returns the socket

Parameters:name – the socket name

get_or_create

SocketManager.get_or_create(name, socket_type, polling_mechanism)[source]

ensure that a socket exists and is registered with a given polling_mechanism (POLLIN, POLLOUT or both)

returns the socket itself.

Parameters:
  • name – the socket name
  • socket_type – a valid socket type (i.e: zmq.REQ, zmq.PUB, zmq.PAIR, …)
  • polling_mechanism – one of (zmq.POLLIN, zmq.POLLOUT, zmq.POLLIN | zmq.POLLOUT)
import zmq
from agentzero.core import SocketManager
from agentzero.serializers import JSON

context = zmq.Context()

sockets = SocketManager(zmq, context, serialization_backend=JSON())
sockets.get_or_create(
     "requester",
     zmq.REQ,
     zmq.POLLIN | zmq.POLLOUT
)

register_socket

SocketManager.register_socket(socket, polling_mechanism)[source]

registers a socket with a given polling_mechanism (POLLIN, POLLOUT or both)

returns the socket itself.

Parameters:
  • socket – the socket instance
  • polling_mechanism – one of (zmq.POLLIN, zmq.POLLOUT, zmq.POLLIN | zmq.POLLOUT)

bind

SocketManager.bind(socket_name, address, polling_mechanism)[source]

binds a socket to an address and automatically registers it with the given polling mechanism.

returns the socket itself.

Parameters:
  • socket_name – the socket name
  • address – a valid zeromq address (i.e: inproc://whatevs)
  • polling_mechanismzmq.POLLIN, zmq.POLLOUT or zmq.POLLIN | zmq.POLLOUT

Example:

>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.create('pipe-in', zmq.PULL)
>>> sockets.bind('pipe-in', 'tcp://*:6000', zmq.POLLIN)

ensure_and_bind

SocketManager.ensure_and_bind(socket_name, socket_type, address, polling_mechanism)[source]

Ensure that a socket exists, that is binded to the given address and that is registered with the given polling mechanism.

Tip

This method is a handy replacement for calling get_or_create(), bind() and then engage().

returns the socket itself.

Parameters:
  • socket_name – the socket name
  • socket_type – a valid socket type (i.e: zmq.REQ, zmq.PUB, zmq.PAIR, …)
  • address – a valid zeromq address (i.e: inproc://whatevs)
  • polling_mechanismzmq.POLLIN, zmq.POLLOUT or zmq.POLLIN | zmq.POLLOUT

bind_to_random_port

SocketManager.bind_to_random_port(socket_name, polling_mechanism, local_address=u'tcp://0.0.0.0')[source]

binds the socket to a random port

returns a 2-item tuple with the socket instance and the address string

Parameters:
  • socket_name – the socket name
  • polling_mechanismzmq.POLLIN, zmq.POLLOUT or zmq.POLLIN | zmq.POLLOUT

Example:

>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.create('api-server', zmq.REP)
>>> _, address = sockets.bind_to_random_port(
...     'api-server',
...     zmq.POLLIN | zmq.POLLOUT,
...     local_address='tcp://192.168.10.24
... )
>>> address
'tcp://192.168.10.24:61432'

connect

SocketManager.connect(socket_name, address, polling_mechanism)[source]

connects a socket to an address and automatically registers it with the given polling mechanism.

returns the socket itself.

Parameters:
  • socket_name – the socket name
  • address – a valid zeromq address (i.e: inproc://whatevs)
  • polling_mechanismzmq.POLLIN, zmq.POLLOUT or zmq.POLLIN | zmq.POLLOUT

Example:

>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.ensure_and_connect(
...   socket_name='logs',
...   zmq.PUB,
...   'tcp://192.168.10.24:6000',
...   zmq.POLLOUT
... )
>>> sockets.publish_safe('logs', 'output', 'some data')

ensure_and_connect

SocketManager.ensure_and_connect(socket_name, socket_type, address, polling_mechanism)[source]

Ensure that a socket exists, that is connected to the given address and that is registered with the given polling mechanism.

Tip

This method is a handy replacement for calling get_or_create(), connect() and then engage().

returns the socket itself.

Parameters:
  • socket_name – the socket name
  • socket_type – a valid socket type (i.e: zmq.REQ, zmq.PUB, zmq.PAIR, …)
  • address – a valid zeromq address (i.e: inproc://whatevs)
  • polling_mechanismzmq.POLLIN, zmq.POLLOUT or zmq.POLLIN | zmq.POLLOUT

Example:

>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.ensure_and_connect(
...   socket_name='logs',
...   zmq.REQ,
...   'tcp://192.168.10.24:7000',
...   zmq.POLLIN | zmq.POLLOUT
... )

engage

SocketManager.engage(timeout=None)[source]

polls all registered sockets with the given timeout in miliseconds

returns a dictionary with the sockets that are ready to be used in their respective state (zmq.POLLIN or zmq.POLLOUT)

Parameters:timeout – how long should it poll until a socket becomes available. defaults to agentzero.core.DEFAULT_POLLING_TIMEOUT

send_safe

SocketManager.send_safe(name, data, *args, **kw)[source]

serializes the data with the configured serialization_backend, waits for the socket to become available, then sends it over through the provided socket name.

returns True if the message was sent, or False if the socket never became available.

Note

you can safely use this function without waiting for a socket to become ready, as it already does it for you.

raises SocketNotFound when the socket name is wrong.

Parameters:
  • name – the name of the socket where data should be sent through
  • data – the data to be serialized then sent
  • *args – args to be passed to wait_until_ready
  • **kw – kwargs to be passed to wait_until_ready

recv_safe

SocketManager.recv_safe(name, *args, **kw)[source]

waits for the socket to become available then receives data through it and deserializes the result using the configured serialization_backend before returning.

Note

you can safely use this function without waiting for a socket to become ready, as it already does it for you.

raises SocketNotFound when the socket name is wrong.

returns the deserialized data, or None if the socket never became available

Parameters:
  • name – the name of the socket where data will pad through
  • *args – args to be passed to wait_until_ready
  • **kw – kwargs to be passed to wait_until_ready

Example:

>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.ensure_and_bind('pipe-in', zmq.PULL, 'tcp://*:6000', zmq.POLLIN)
>>> sockets.recv_safe('pipe-in')
{
    "pipeline": "video-download",
    "instructions": {
      "url": "https://www.youtube.com/watch?v=FPZ6mVsv4EI"
    }
}

recv_event_safe

SocketManager.recv_event_safe(name, topic=False, *args, **kw)[source]

waits for the socket to become available then receives multipart data assuming that it’s a pub/sub event, thus it parses the topic and the serialized data, then it deserializes the result using the configured serialization_backend before returning.

Note

you can safely use this function without waiting for a socket to become ready, as it already does it for you.

raises SocketNotFound when the socket name is wrong.

returns the deserialized data, or None if the socket never became available

Parameters:
  • name – the name of the socket where data will pad through
  • *args – args to be passed to wait_until_ready
  • **kw – kwargs to be passed to wait_until_ready

Example:

>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.ensure_and_bind('events', zmq.SUB, 'tcp://*:6000', zmq.POLLIN)
>>>
>>> # subscribe only to topics beginning with "logs"
>>> sockets.set_topic('events', 'logs')
>>> event = sockets.recv_event_safe('events')
>>> event.topic, event.data
'logs:2016-06-20', {'stdout': 'hello world'}

subscribe

SocketManager.subscribe(name, topic=None, keep_polling=None, *args, **kw)[source]

waits for the socket to become available then receives data through it and deserializes the result using the configured serialization_backend before returning.

Note

you can safely use this function without waiting for a socket to become ready, as it already does it for you.

raises SocketNotFound when the socket name is wrong.

returns an :py:class`~agentzero.core.Event`, or None if the socket never became available

Parameters:
  • namestr - the name of the socket where data will pad through
  • topicstr - the name of the socket where data will pad through
  • keep_polling(optional) - a callable that must return bool
  • *args – args to be passed to wait_until_ready
  • **kw – kwargs to be passed to wait_until_ready

Tip

pass a function to the keep_polling to control the finality of the loop

Example:

>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.ensure_and_bind('logs', zmq.SUB, 'tcp://*:6000', zmq.POLLIN)
>>> for topic, data in sockets.subscribe('logs', 'output'):
...     print topic, '==>', data
...
output:0 ==> some data
output:1 ==> more data
...

set_socket_option

SocketManager.set_socket_option(name, option, value)[source]

calls zmq.setsockopt on the given socket.

Parameters:
  • name – the name of the socket where data will pad through
  • option – the option from the zmq module
  • value

Here are some examples of options:

  • zmq.HWM: Set high water mark
  • zmq.AFFINITY: Set I/O thread affinity
  • zmq.IDENTITY: Set socket identity
  • zmq.SUBSCRIBE: Establish message filter
  • zmq.UNSUBSCRIBE: Remove message filter
  • zmq.SNDBUF: Set kernel transmit buffer size
  • zmq.RCVBUF: Set kernel receive buffer size
  • zmq.LINGER: Set linger period for socket shutdown
  • zmq.BACKLOG: Set maximum length of the queue of outstanding connections
  • for the full list go to http://api.zeromq.org/4-0:zmq-setsockopt

Example:

>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.create('pipe-in', zmq.PULL)
>>>
>>> # block after 10 messages are queued
>>> sockets.set_socket_option('pipe-in', zmq.HWM, 10)

set_topic

SocketManager.set_topic(name, topic)[source]

shortcut to SocketManager.set_socket_option() with (name, zmq.SUBSCRIBE, topic)

Parameters:
  • name – the name of the socket where data will pad through
  • topic – the option from the zmq module

Example:

>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.ensure_and_bind('events', zmq.SUB, 'tcp://*:6000', zmq.POLLIN)
>>>
>>> # subscribe only to topics beginning with "logs"
>>> sockets.set_topic('events', 'logs')
>>> event = sockets.recv_event_safe('events')
>>> event.topic, event.data
'logs:2016-06-20', {'stdout': 'hello world'}

publish_safe

SocketManager.publish_safe(name, topic, data)[source]

serializes the data with the configured serialization_backend, waits for the socket to become available, then sends it to the given topic through socket.send_multipart.

returns True if the message was sent, or False if the socket never became available.

Note

you can safely use this function without waiting for a socket to become ready, as it already does it for you.

raises SocketNotFound when the socket name is wrong.

Parameters:
  • name – the name of the socket where data should be sent through
  • topic – the name of the topic
  • data – the data to be serialized then sent

ready

SocketManager.ready(name, polling_mechanism, timeout=None)[source]

Polls all sockets and checks if the socket with the given name is ready for either zmq.POLLIN or zmq.POLLOUT.

returns the socket if available, or None

Parameters:
  • socket_name – the socket name
  • polling_mechanism – either zmq.POLLIN or zmq.POLLOUT
  • timeout – the polling timeout in miliseconds that will be passed to zmq.Poller().poll() (optional, defaults to core.DEFAULT_POLLING_TIMEOUT)

wait_until_ready

SocketManager.wait_until_ready(name, polling_mechanism, timeout=None, polling_timeout=None)[source]

Briefly waits until the socket is ready to be used, yields to other greenlets until the socket becomes available.

returns the socket if available within the given timeout, or None

Parameters:
  • socket_name – the socket name
  • polling_mechanism – either zmq.POLLIN or zmq.POLLOUT
  • timeout – the timeout in seconds (accepts float) in which it should wait for the socket to become available (optional, defaults to core.DEFAULT_TIMEOUT_IN_SECONDS)
  • polling_timeout – the polling timeout in miliseconds that will be passed to zmq.Poller().poll(). (optional, defaults to core.DEFAULT_POLLING_TIMEOUT)

get_log_handler

SocketManager.get_log_handler(socket_name, topic_name=u'logs')[source]

returns an instance of ZMQPubHandler attached to a previously-created socket.

Parameters:
  • socket_name – the name of the socket, previously created with SocketManager.create()
  • topic_name – the name of the topic in which the logs will be PUBlished

Example:

import logging
import zmq.green as zmq
from agentzero.core import SocketManager

context = zmq.Context()
sockets = SocketManager(zmq, context)

handler = sockets.get_log_handler('logs', topic_name='app_logs')
logger = logging.getLogger()
logger.addHandler(handler)

logger.info("Server is up!")

get_logger

SocketManager.get_logger(socket_name, topic_name=u'logs', logger_name=None)[source]

returns an instance of Logger that contains a ZMQPubHandler attached to.

Parameters:
  • socket_name – the name of the socket, previously created with create()
  • topic_name – (optional) the name of the topic in which the logs will be PUBlished, defaults to “logs”
  • logger_name – (optional) defaults to the given socket name

Example:

import logging
import zmq.green as zmq
from agentzero.core import SocketManager

context = zmq.Context()
sockets = SocketManager(zmq, context)

logger = sockets.get_logger('logs', topic_name='logs', logger_name=__name__)
logger.info("Server is up!")

close

SocketManager.close(socket_name)[source]

closes a socket if it exists

Parameters:
  • socket_name – the socket name
  • address – a valid zeromq address (i.e: inproc://whatevs)
  • polling_mechanismzmq.POLLIN, zmq.POLLOUT or zmq.POLLIN | zmq.POLLOUT

Example:

>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.create('logs', zmq.SUB)
>>> sockets.bind('logs', 'tcp://*:6000', zmq.POLLIN)
>>> sockets.close('logs')

Utility Functions

get_free_tcp_port

agentzero.util.get_free_tcp_port()[source]

returns a TCP port that can be used for listen in the host.

get_default_bind_address

agentzero.util.get_default_bind_address()[source]

get_public_ip_address

agentzero.util.get_public_ip_address(hostname=None)[source]

extract_hostname_from_tcp_address

agentzero.util.extract_hostname_from_tcp_address(address)[source]

resolve_hostname

agentzero.util.resolve_hostname(hostname)[source]

fix_zeromq_tcp_address

agentzero.util.fix_zeromq_tcp_address(address)[source]

get_public_zmq_address

agentzero.util.get_public_zmq_address()[source]

seconds_since

agentzero.util.seconds_since(timestamp)[source]

datetime_from_seconds

agentzero.util.datetime_from_seconds(timestamp)[source]

serialized_exception

agentzero.util.serialized_exception(e)[source]

Exceptions

AgentZeroSocketError

exception agentzero.errors.AgentZeroSocketError[source]

Base exception class for errors originated in SocketManager

SocketAlreadyExists

exception agentzero.errors.SocketAlreadyExists(manager, socket_name)[source]

raised by SocketManager when trying to create a named socket that already exists

>>> from agentzero.core import zmq
>>> from agentzero.core import SocketManager
>>> sockets = SocketManager()
>>> sockets.create('foo', zmq.REP)
>>> sockets.create('foo', zmq.REP)
Traceback (most recent call last):
    ...
SocketAlreadyExists: SocketManager(sockets=['foo']) already has a socket named 'foo'.

SocketNotFound

exception agentzero.errors.SocketNotFound(manager, socket_name)[source]

raised by SocketManager when trying to retrieve an unexisting socket

>>> from agentzero.core import zmq
>>> from agentzero.core import SocketManager
>>> sockets = SocketManager()
>>> sockets.get_by_name('some-name', zmq.PUB)
Traceback (most recent call last):
    ...
SocketNotFound: SocketManager(sockets=['']) has no sockets named 'some-name'.

SocketBindError

exception agentzero.errors.SocketBindError[source]

raised by SocketManager when a bind() operation fails.

SocketConnectError

exception agentzero.errors.SocketConnectError[source]

raised by SocketManager when a connect() operation fails.