SocketManager¶
-
class
agentzero.
SocketManager
(zmq, context, serialization_backend=None, polling_timeout=10000, timeout=10)[source]¶ High-level abstraction for zeromq’s non-blocking api.
This component provides utility methods to create, retrieve, connect and bind sockets by name.
It can wait for a socket to become available in either receiving data, sending data or both at the same time.
Parameters: - zmq – a reference to the zmq module (either from
import zmq
orimport zmq.green as zmq
) - context – the context where the sockets will be created
- serialization_backend – an instance of a valid
agentzero.serializers.BaseSerializer
. This is completely optional safe for the cases where you utilize the methodssend_safe
andrecv_safe
when communicating to other nodes. - polling_timeout – a float - how long to wait for the socket to become available, in miliseconds
- timeout – default value passed to
engage()
Note
An extra useful feature that comes with using a
SocketManager
is that you can use a SocketManager to create an application that dynamically connects to new nodes based on scaling instructions coming from other nodesWarning
Always use the same context per each thread. If you are using gevent, please using a single instance for your whole main process, across all greenlets that you manage.
import zmq from agentzero.core import SocketManager from agentzero.serializers import JSON context = zmq.Context() sockets = SocketManager(zmq, context, serialization_backend=JSON()) sockets.ensure_and_connect( "requester", zmq.REQ, 'tcp://192.168.2.42:5051', zmq.POLLIN | zmq.POLLOUT )
- zmq – a reference to the zmq module (either from
create¶
get_by_name¶
get_or_create¶
-
SocketManager.
get_or_create
(name, socket_type, polling_mechanism)[source]¶ ensure that a socket exists and is registered with a given polling_mechanism (POLLIN, POLLOUT or both)
returns the socket itself.
Parameters: - name – the socket name
- socket_type – a valid socket type (i.e:
zmq.REQ
,zmq.PUB
,zmq.PAIR
, …) - polling_mechanism – one of (
zmq.POLLIN
,zmq.POLLOUT
,zmq.POLLIN | zmq.POLLOUT
)
import zmq from agentzero.core import SocketManager from agentzero.serializers import JSON context = zmq.Context() sockets = SocketManager(zmq, context, serialization_backend=JSON()) sockets.get_or_create( "requester", zmq.REQ, zmq.POLLIN | zmq.POLLOUT )
register_socket¶
bind¶
-
SocketManager.
bind
(socket_name, address, polling_mechanism)[source]¶ binds a socket to an address and automatically registers it with the given polling mechanism.
returns the socket itself.
Parameters: - socket_name – the socket name
- address – a valid zeromq address (i.e: inproc://whatevs)
- polling_mechanism –
zmq.POLLIN
,zmq.POLLOUT
orzmq.POLLIN | zmq.POLLOUT
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.create('pipe-in', zmq.PULL) >>> sockets.bind('pipe-in', 'tcp://*:6000', zmq.POLLIN)
ensure_and_bind¶
-
SocketManager.
ensure_and_bind
(socket_name, socket_type, address, polling_mechanism)[source]¶ Ensure that a socket exists, that is binded to the given address and that is registered with the given polling mechanism.
Tip
This method is a handy replacement for calling
get_or_create()
,bind()
and thenengage()
.returns the socket itself.
Parameters: - socket_name – the socket name
- socket_type – a valid socket type (i.e:
zmq.REQ
,zmq.PUB
,zmq.PAIR
, …) - address – a valid zeromq address (i.e: inproc://whatevs)
- polling_mechanism –
zmq.POLLIN
,zmq.POLLOUT
orzmq.POLLIN | zmq.POLLOUT
bind_to_random_port¶
-
SocketManager.
bind_to_random_port
(socket_name, polling_mechanism, local_address=u'tcp://0.0.0.0')[source]¶ binds the socket to a random port
returns a 2-item tuple with the socket instance and the address string
Parameters: - socket_name – the socket name
- polling_mechanism –
zmq.POLLIN
,zmq.POLLOUT
orzmq.POLLIN | zmq.POLLOUT
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.create('api-server', zmq.REP) >>> _, address = sockets.bind_to_random_port( ... 'api-server', ... zmq.POLLIN | zmq.POLLOUT, ... local_address='tcp://192.168.10.24 ... ) >>> address 'tcp://192.168.10.24:61432'
connect¶
-
SocketManager.
connect
(socket_name, address, polling_mechanism)[source]¶ connects a socket to an address and automatically registers it with the given polling mechanism.
returns the socket itself.
Parameters: - socket_name – the socket name
- address – a valid zeromq address (i.e: inproc://whatevs)
- polling_mechanism –
zmq.POLLIN
,zmq.POLLOUT
orzmq.POLLIN | zmq.POLLOUT
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.ensure_and_connect( ... socket_name='logs', ... zmq.PUB, ... 'tcp://192.168.10.24:6000', ... zmq.POLLOUT ... ) >>> sockets.publish_safe('logs', 'output', 'some data')
ensure_and_connect¶
-
SocketManager.
ensure_and_connect
(socket_name, socket_type, address, polling_mechanism)[source]¶ Ensure that a socket exists, that is connected to the given address and that is registered with the given polling mechanism.
Tip
This method is a handy replacement for calling
get_or_create()
,connect()
and thenengage()
.returns the socket itself.
Parameters: - socket_name – the socket name
- socket_type – a valid socket type (i.e:
zmq.REQ
,zmq.PUB
,zmq.PAIR
, …) - address – a valid zeromq address (i.e: inproc://whatevs)
- polling_mechanism –
zmq.POLLIN
,zmq.POLLOUT
orzmq.POLLIN | zmq.POLLOUT
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.ensure_and_connect( ... socket_name='logs', ... zmq.REQ, ... 'tcp://192.168.10.24:7000', ... zmq.POLLIN | zmq.POLLOUT ... )
engage¶
-
SocketManager.
engage
(timeout=None)[source]¶ polls all registered sockets with the given timeout in miliseconds
returns a dictionary with the sockets that are ready to be used in their respective state (
zmq.POLLIN
orzmq.POLLOUT
)Parameters: timeout – how long should it poll until a socket becomes available. defaults to agentzero.core.DEFAULT_POLLING_TIMEOUT
send_safe¶
-
SocketManager.
send_safe
(name, data, *args, **kw)[source]¶ serializes the data with the configured
serialization_backend
, waits for the socket to become available, then sends it over through the provided socket name.returns
True
if the message was sent, orFalse
if the socket never became available.Note
you can safely use this function without waiting for a socket to become ready, as it already does it for you.
raises SocketNotFound when the socket name is wrong.
Parameters: - name – the name of the socket where data should be sent through
- data – the data to be serialized then sent
- *args – args to be passed to wait_until_ready
- **kw – kwargs to be passed to wait_until_ready
recv_safe¶
-
SocketManager.
recv_safe
(name, *args, **kw)[source]¶ waits for the socket to become available then receives data through it and deserializes the result using the configured
serialization_backend
before returning.Note
you can safely use this function without waiting for a socket to become ready, as it already does it for you.
raises SocketNotFound when the socket name is wrong.
returns the deserialized data, or
None
if the socket never became availableParameters: - name – the name of the socket where data will pad through
- *args – args to be passed to wait_until_ready
- **kw – kwargs to be passed to wait_until_ready
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.ensure_and_bind('pipe-in', zmq.PULL, 'tcp://*:6000', zmq.POLLIN) >>> sockets.recv_safe('pipe-in') { "pipeline": "video-download", "instructions": { "url": "https://www.youtube.com/watch?v=FPZ6mVsv4EI" } }
recv_event_safe¶
-
SocketManager.
recv_event_safe
(name, topic=False, *args, **kw)[source]¶ waits for the socket to become available then receives multipart data assuming that it’s a pub/sub event, thus it parses the topic and the serialized data, then it deserializes the result using the configured
serialization_backend
before returning.Note
you can safely use this function without waiting for a socket to become ready, as it already does it for you.
raises SocketNotFound when the socket name is wrong.
returns the deserialized data, or
None
if the socket never became availableParameters: - name – the name of the socket where data will pad through
- *args – args to be passed to wait_until_ready
- **kw – kwargs to be passed to wait_until_ready
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.ensure_and_bind('events', zmq.SUB, 'tcp://*:6000', zmq.POLLIN) >>> >>> # subscribe only to topics beginning with "logs" >>> sockets.set_topic('events', 'logs') >>> event = sockets.recv_event_safe('events') >>> event.topic, event.data 'logs:2016-06-20', {'stdout': 'hello world'}
subscribe¶
-
SocketManager.
subscribe
(name, topic=None, keep_polling=None, *args, **kw)[source]¶ waits for the socket to become available then receives data through it and deserializes the result using the configured
serialization_backend
before returning.Note
you can safely use this function without waiting for a socket to become ready, as it already does it for you.
raises SocketNotFound when the socket name is wrong.
returns an :py:class`~agentzero.core.Event`, or
None
if the socket never became availableParameters: Tip
pass a function to the keep_polling to control the finality of the loop
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.ensure_and_bind('logs', zmq.SUB, 'tcp://*:6000', zmq.POLLIN) >>> for topic, data in sockets.subscribe('logs', 'output'): ... print topic, '==>', data ... output:0 ==> some data output:1 ==> more data ...
set_socket_option¶
-
SocketManager.
set_socket_option
(name, option, value)[source]¶ calls
zmq.setsockopt
on the given socket.Parameters: - name – the name of the socket where data will pad through
- option – the option from the
zmq
module - value –
Here are some examples of options:
zmq.HWM
: Set high water markzmq.AFFINITY
: Set I/O thread affinityzmq.IDENTITY
: Set socket identityzmq.SUBSCRIBE
: Establish message filterzmq.UNSUBSCRIBE
: Remove message filterzmq.SNDBUF
: Set kernel transmit buffer sizezmq.RCVBUF
: Set kernel receive buffer sizezmq.LINGER
: Set linger period for socket shutdownzmq.BACKLOG
: Set maximum length of the queue of outstanding connections- for the full list go to
http://api.zeromq.org/4-0:zmq-setsockopt
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.create('pipe-in', zmq.PULL) >>> >>> # block after 10 messages are queued >>> sockets.set_socket_option('pipe-in', zmq.HWM, 10)
set_topic¶
-
SocketManager.
set_topic
(name, topic)[source]¶ shortcut to
SocketManager.set_socket_option()
with(name, zmq.SUBSCRIBE, topic)
Parameters: - name – the name of the socket where data will pad through
- topic – the option from the
zmq
module
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.ensure_and_bind('events', zmq.SUB, 'tcp://*:6000', zmq.POLLIN) >>> >>> # subscribe only to topics beginning with "logs" >>> sockets.set_topic('events', 'logs') >>> event = sockets.recv_event_safe('events') >>> event.topic, event.data 'logs:2016-06-20', {'stdout': 'hello world'}
publish_safe¶
-
SocketManager.
publish_safe
(name, topic, data)[source]¶ serializes the data with the configured
serialization_backend
, waits for the socket to become available, then sends it to the given topic throughsocket.send_multipart
.returns
True
if the message was sent, orFalse
if the socket never became available.Note
you can safely use this function without waiting for a socket to become ready, as it already does it for you.
raises SocketNotFound when the socket name is wrong.
Parameters: - name – the name of the socket where data should be sent through
- topic – the name of the topic
- data – the data to be serialized then sent
ready¶
-
SocketManager.
ready
(name, polling_mechanism, timeout=None)[source]¶ Polls all sockets and checks if the socket with the given name is ready for either
zmq.POLLIN
orzmq.POLLOUT
.returns the socket if available, or
None
Parameters: - socket_name – the socket name
- polling_mechanism – either
zmq.POLLIN
orzmq.POLLOUT
- timeout – the polling timeout in miliseconds that will
be passed to
zmq.Poller().poll()
(optional, defaults tocore.DEFAULT_POLLING_TIMEOUT
)
wait_until_ready¶
-
SocketManager.
wait_until_ready
(name, polling_mechanism, timeout=None, polling_timeout=None)[source]¶ Briefly waits until the socket is ready to be used, yields to other greenlets until the socket becomes available.
returns the socket if available within the given timeout, or
None
Parameters: - socket_name – the socket name
- polling_mechanism – either
zmq.POLLIN
orzmq.POLLOUT
- timeout – the timeout in seconds (accepts float) in which it
should wait for the socket to become available
(optional, defaults to
core.DEFAULT_TIMEOUT_IN_SECONDS
) - polling_timeout – the polling timeout in miliseconds that will
be passed to
zmq.Poller().poll()
. (optional, defaults tocore.DEFAULT_POLLING_TIMEOUT
)
get_log_handler¶
-
SocketManager.
get_log_handler
(socket_name, topic_name=u'logs')[source]¶ returns an instance of
ZMQPubHandler
attached to a previously-created socket.Parameters: - socket_name – the name of the socket, previously created with
SocketManager.create()
- topic_name – the name of the topic in which the logs will be PUBlished
Example:
import logging import zmq.green as zmq from agentzero.core import SocketManager context = zmq.Context() sockets = SocketManager(zmq, context) handler = sockets.get_log_handler('logs', topic_name='app_logs') logger = logging.getLogger() logger.addHandler(handler) logger.info("Server is up!")
- socket_name – the name of the socket, previously created with
get_logger¶
-
SocketManager.
get_logger
(socket_name, topic_name=u'logs', logger_name=None)[source]¶ returns an instance of
Logger
that contains aZMQPubHandler
attached to.Parameters: - socket_name – the name of the socket, previously created with
create()
- topic_name – (optional) the name of the topic in which the logs will be PUBlished, defaults to “logs”
- logger_name – (optional) defaults to the given socket name
Example:
import logging import zmq.green as zmq from agentzero.core import SocketManager context = zmq.Context() sockets = SocketManager(zmq, context) logger = sockets.get_logger('logs', topic_name='logs', logger_name=__name__) logger.info("Server is up!")
- socket_name – the name of the socket, previously created with
close¶
-
SocketManager.
close
(socket_name)[source]¶ closes a socket if it exists
Parameters: - socket_name – the socket name
- address – a valid zeromq address (i.e: inproc://whatevs)
- polling_mechanism –
zmq.POLLIN
,zmq.POLLOUT
orzmq.POLLIN | zmq.POLLOUT
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.create('logs', zmq.SUB) >>> sockets.bind('logs', 'tcp://*:6000', zmq.POLLIN) >>> sockets.close('logs')
Utility Functions¶
get_free_tcp_port¶
extract_hostname_from_tcp_address¶
Exceptions¶
AgentZeroSocketError¶
SocketAlreadyExists¶
-
exception
agentzero.errors.
SocketAlreadyExists
(manager, socket_name)[source]¶ raised by
SocketManager
when trying to create a named socket that already exists>>> from agentzero.core import zmq >>> from agentzero.core import SocketManager >>> sockets = SocketManager() >>> sockets.create('foo', zmq.REP) >>> sockets.create('foo', zmq.REP) Traceback (most recent call last): ... SocketAlreadyExists: SocketManager(sockets=['foo']) already has a socket named 'foo'.
SocketNotFound¶
-
exception
agentzero.errors.
SocketNotFound
(manager, socket_name)[source]¶ raised by
SocketManager
when trying to retrieve an unexisting socket>>> from agentzero.core import zmq >>> from agentzero.core import SocketManager >>> sockets = SocketManager() >>> sockets.get_by_name('some-name', zmq.PUB) Traceback (most recent call last): ... SocketNotFound: SocketManager(sockets=['']) has no sockets named 'some-name'.