#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
"""
agentzero.core
~~~~~~~~~~~~~~~
* ``DEFAULT_TIMEOUT_IN_SECONDS``: 1 (seconds)
* ``DEFAULT_POLLING_TIMEOUT``: 1000 (miliseconds)
"""
import zmq
import time
import logging
import collections
from uuid import uuid4
from zmq.error import ZMQError
from agentzero import serializers
from agentzero.errors import SocketNotFound
from agentzero.errors import SocketAlreadyExists
from agentzero.errors import SocketBindError
from agentzero.errors import SocketConnectError
from agentzero.util import cast_bytes
DEFAULT_TIMEOUT_IN_SECONDS = 10
# in miliseconds
DEFAULT_POLLING_TIMEOUT = DEFAULT_TIMEOUT_IN_SECONDS * 1000
[docs]class SocketManager(object):
"""High-level abstraction for zeromq's non-blocking api.
This component provides utility methods to create, retrieve,
connect and bind sockets by name.
It can wait for a socket to become available in either receiving
data, sending data or both at the same time.
:param zmq: a reference to the zmq module (either from ``import zmq`` or ``import zmq.green as zmq``)
:param context: the context where the sockets will be created
:param serialization_backend: an instance of a valid
:py:class:`agentzero.serializers.BaseSerializer`. This is completely
optional safe for the cases where you utilize the methods
``send_safe`` and ``recv_safe`` when communicating to other
nodes.
:param polling_timeout: a **float** - how long to wait for the socket to become available, in miliseconds
:param timeout: default value passed to :py:meth:`~agentzero.core.SocketManager.engage`
.. note:: An extra useful feature that comes with using a
``SocketManager`` is that you can use a SocketManager to create an
application that dynamically connects to new nodes based on
scaling instructions coming from other nodes
.. warning:: Always use the same context per each thread. If you are
using gevent, please using a single instance for your whole main
process, across all greenlets that you manage.
.. testcode::
import zmq
from agentzero.core import SocketManager
from agentzero.serializers import JSON
context = zmq.Context()
sockets = SocketManager(zmq, context, serialization_backend=JSON())
sockets.ensure_and_connect(
"requester",
zmq.REQ,
'tcp://192.168.2.42:5051',
zmq.POLLIN | zmq.POLLOUT
)
"""
def __init__(
self,
zmq,
context,
serialization_backend=None,
polling_timeout=DEFAULT_POLLING_TIMEOUT,
timeout=DEFAULT_TIMEOUT_IN_SECONDS,
):
self.zmq = zmq
self.context = context
# book-keeping of the the sockets themselves
self.sockets = collections.OrderedDict()
self.addresses = collections.OrderedDict()
self.poller = self.zmq.Poller()
# book-keeping of sockets registered with the poller
self.registry = collections.OrderedDict()
self.serialization_backend = serialization_backend or serializers.JSON()
self.polling_timeout = polling_timeout
self.timeout = timeout
def __repr__(self):
return "SocketManager(sockets={0})".format(
repr(list(self.sockets.keys()))
)
def __del__(self):
for socket in list(self.sockets.values()):
try:
socket.close()
except (Exception, BaseException):
pass
self.addresses.clear()
self.registry.clear()
# self.context.destroy()
[docs] def send_safe(self, name, data, *args, **kw):
"""serializes the data with the configured ``serialization_backend``,
waits for the socket to become available, then sends it over
through the provided socket name.
returns ``True`` if the message was sent, or ``False`` if the
socket never became available.
.. note::
you can safely use this function without waiting for a
socket to become ready, as it already does it for you.
raises SocketNotFound when the socket name is wrong.
:param name: the name of the socket where data should be sent through
:param data: the data to be serialized then sent
:param ``*args``: args to be passed to wait_until_ready
:param ``**kw``: kwargs to be passed to wait_until_ready
"""
socket = self.wait_until_ready(name, self.zmq.POLLOUT, *args, **kw)
if not socket:
return False
payload = self.serialization_backend.pack(data)
socket.send_string(payload)
return True
[docs] def publish_safe(self, name, topic, data):
"""serializes the data with the configured ``serialization_backend``,
waits for the socket to become available, then sends it to the
given topic through ``socket.send_multipart``.
returns ``True`` if the message was sent, or ``False`` if the
socket never became available.
.. note::
you can safely use this function without waiting for a
socket to become ready, as it already does it for you.
raises SocketNotFound when the socket name is wrong.
:param name: the name of the socket where data should be sent through
:param topic: the name of the topic
:param data: the data to be serialized then sent
"""
socket = self.get_by_name(name)
payload = self.serialization_backend.pack(data)
socket.send_multipart([cast_bytes(topic), cast_bytes(payload)])
[docs] def recv_event_safe(self, name, topic=False, *args, **kw):
"""waits for the socket to become available then receives multipart
data assuming that it's a pub/sub event, thus it parses the
topic and the serialized data, then it deserializes the result
using the configured ``serialization_backend`` before
returning.
.. note::
you can safely use this function without waiting for a
socket to become ready, as it already does it for you.
raises SocketNotFound when the socket name is wrong.
returns the deserialized data, or ``None`` if the socket never became available
:param name: the name of the socket where data will pad through
:param ``*args``: args to be passed to wait_until_ready
:param ``**kw``: kwargs to be passed to wait_until_ready
**Example:**
::
>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.ensure_and_bind('events', zmq.SUB, 'tcp://*:6000', zmq.POLLIN)
>>>
>>> # subscribe only to topics beginning with "logs"
>>> sockets.set_topic('events', 'logs')
>>> event = sockets.recv_event_safe('events')
>>> event.topic, event.data
'logs:2016-06-20', {'stdout': 'hello world'}
"""
topic = topic or ""
if not isinstance(topic, bytes):
msg = (
"recv_event_safe() takes a string, "
"None or False as argument, "
"received {1}({0}) instead".format(type(topic), topic)
)
raise TypeError(msg)
self.set_topic(name, topic)
socket = self.wait_until_ready(name, self.zmq.POLLIN, *args, **kw)
if not socket:
return None
topic, raw = socket.recv_multipart()
payload = self.serialization_backend.unpack(raw)
return Event(topic=topic, data=payload)
[docs] def set_socket_option(self, name, option, value):
"""calls ``zmq.setsockopt`` on the given socket.
:param name: the name of the socket where data will pad through
:param option: the option from the ``zmq`` module
:param value:
Here are some examples of options:
* ``zmq.HWM``: Set high water mark
* ``zmq.AFFINITY``: Set I/O thread affinity
* ``zmq.IDENTITY``: Set socket identity
* ``zmq.SUBSCRIBE``: Establish message filter
* ``zmq.UNSUBSCRIBE``: Remove message filter
* ``zmq.SNDBUF``: Set kernel transmit buffer size
* ``zmq.RCVBUF``: Set kernel receive buffer size
* ``zmq.LINGER``: Set linger period for socket shutdown
* ``zmq.BACKLOG``: Set maximum length of the queue of outstanding connections
* for the full list go to ``http://api.zeromq.org/4-0:zmq-setsockopt``
**Example:**
::
>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.create('pipe-in', zmq.PULL)
>>>
>>> # block after 10 messages are queued
>>> sockets.set_socket_option('pipe-in', zmq.HWM, 10)
"""
socket = self.get_by_name(name)
socket.setsockopt(option, value)
[docs] def set_topic(self, name, topic):
"""shortcut to :py:meth:`SocketManager.set_socket_option` with ``(name, zmq.SUBSCRIBE, topic)``
:param name: the name of the socket where data will pad through
:param topic: the option from the ``zmq`` module
**Example:**
::
>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.ensure_and_bind('events', zmq.SUB, 'tcp://*:6000', zmq.POLLIN)
>>>
>>> # subscribe only to topics beginning with "logs"
>>> sockets.set_topic('events', 'logs')
>>> event = sockets.recv_event_safe('events')
>>> event.topic, event.data
'logs:2016-06-20', {'stdout': 'hello world'}
"""
safe_topic = cast_bytes(topic)
self.set_socket_option(name, self.zmq.SUBSCRIBE, safe_topic)
[docs] def recv_safe(self, name, *args, **kw):
"""waits for the socket to become available then receives data through
it and deserializes the result using the configured
``serialization_backend`` before returning.
.. note::
you can safely use this function without waiting for a
socket to become ready, as it already does it for you.
raises SocketNotFound when the socket name is wrong.
returns the deserialized data, or ``None`` if the socket never became available
:param name: the name of the socket where data will pad through
:param ``*args``: args to be passed to wait_until_ready
:param ``**kw``: kwargs to be passed to wait_until_ready
**Example:**
::
>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.ensure_and_bind('pipe-in', zmq.PULL, 'tcp://*:6000', zmq.POLLIN)
>>> sockets.recv_safe('pipe-in')
{
"pipeline": "video-download",
"instructions": {
"url": "https://www.youtube.com/watch?v=FPZ6mVsv4EI"
}
}
"""
socket = self.wait_until_ready(name, self.zmq.POLLIN, *args, **kw)
if not socket:
return
raw = socket.recv_string()
payload = self.serialization_backend.unpack(raw)
return payload
[docs] def subscribe(self, name, topic=None, keep_polling=None, *args, **kw):
"""waits for the socket to become available then receives data through
it and deserializes the result using the configured
``serialization_backend`` before returning.
.. note::
you can safely use this function without waiting for a
socket to become ready, as it already does it for you.
raises SocketNotFound when the socket name is wrong.
returns an :py:class`~agentzero.core.Event`, or ``None`` if the socket never became available
:param name: :py:class:`str` - the name of the socket where data will pad through
:param topic: :py:class:`str` - the name of the socket where data will pad through
:param keep_polling: *(optional)* - a callable that must return :py:class:`bool`
:param ``*args``: args to be passed to wait_until_ready
:param ``**kw``: kwargs to be passed to wait_until_ready
.. tip:: pass a function to the **keep_polling** to control the finality of the loop
**Example:**
::
>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.ensure_and_bind('logs', zmq.SUB, 'tcp://*:6000', zmq.POLLIN)
>>> for topic, data in sockets.subscribe('logs', 'output'):
... print topic, '==>', data
...
output:0 ==> some data
output:1 ==> more data
...
"""
socket = self.get_by_name(name)
socket.setsockopt(self.zmq.SUBSCRIBE, cast_bytes(topic or ""))
def socket_exists():
return self.get_by_name(name) is not None
keep_polling = keep_polling or socket_exists
if not isinstance(keep_polling, collections.Callable):
raise TypeError(
"SocketManager.subscribe parameter keep_polling must be a function or callable that returns a boolean"
)
while keep_polling():
topic, raw = socket.recv_multipart()
payload = self.serialization_backend.unpack(raw)
yield Event(topic, payload)
def disconnect(self, socket_name):
"""disconnects a socket
:param socket_name: the socket name
**Example:**
::
>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.ensure_and_connect(
... socket_name='logs',
... zmq.PUB,
... 'tcp://192.168.10.24:6000',
... zmq.POLLOUT
... )
>>>
>>> sockets.disconnect('logs')
"""
socket = self.get_by_name(socket_name)
if not socket:
return False
address = self.addresses.pop(socket_name, None)
if address:
socket.disconnect(address)
self.registry.pop(socket, None)
try:
self.poller.unregister(socket)
except Exception:
pass
return True
[docs] def connect(self, socket_name, address, polling_mechanism):
"""connects a socket to an address and automatically registers it with
the given polling mechanism.
returns the socket itself.
:param socket_name: the socket name
:param address: a valid zeromq address (i.e: inproc://whatevs)
:param polling_mechanism: ``zmq.POLLIN``, ``zmq.POLLOUT`` or ``zmq.POLLIN | zmq.POLLOUT``
**Example:**
::
>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.ensure_and_connect(
... socket_name='logs',
... zmq.PUB,
... 'tcp://192.168.10.24:6000',
... zmq.POLLOUT
... )
>>> sockets.publish_safe('logs', 'output', 'some data')
"""
if not address:
raise SocketConnectError(
'socket "{0}" received an empty address and cannot connect'.format(
socket_name
)
)
self.addresses[socket_name] = address
socket = self.get_by_name(socket_name)
self.register_socket(socket, polling_mechanism)
self.engage(0)
try:
socket.connect(address)
except ZMQError as e:
msg = "could not connect to address {0}: {1}".format(address, e)
raise SocketConnectError(msg)
return socket
[docs] def close(self, socket_name):
"""closes a socket if it exists
:param socket_name: the socket name
:param address: a valid zeromq address (i.e: inproc://whatevs)
:param polling_mechanism: ``zmq.POLLIN``, ``zmq.POLLOUT`` or ``zmq.POLLIN | zmq.POLLOUT``
**Example:**
::
>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.create('logs', zmq.SUB)
>>> sockets.bind('logs', 'tcp://*:6000', zmq.POLLIN)
>>> sockets.close('logs')
"""
socket = self.get_by_name(socket_name)
if not socket:
return
try:
self.poller.unregister(socket)
except KeyError:
pass
self.addresses.pop(socket_name, None)
socket.close()
[docs] def bind(self, socket_name, address, polling_mechanism):
"""binds a socket to an address and automatically registers it with
the given polling mechanism.
returns the socket itself.
:param socket_name: the socket name
:param address: a valid zeromq address (i.e: inproc://whatevs)
:param polling_mechanism: ``zmq.POLLIN``, ``zmq.POLLOUT`` or ``zmq.POLLIN | zmq.POLLOUT``
**Example:**
::
>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.create('pipe-in', zmq.PULL)
>>> sockets.bind('pipe-in', 'tcp://*:6000', zmq.POLLIN)
"""
if not address:
raise SocketBindError(
'socket "{0}" received an empty address and cannot bind'.format(
socket_name
)
)
self.addresses[socket_name] = address
socket = self.get_by_name(socket_name)
self.register_socket(socket, polling_mechanism)
self.engage(0)
try:
socket.bind(address)
except ZMQError as e:
msg = "could not bind to address {0}: {1}".format(address, e)
raise SocketBindError(msg)
return socket
[docs] def bind_to_random_port(
self, socket_name, polling_mechanism, local_address="tcp://0.0.0.0"
):
"""binds the socket to a random port
returns a 2-item tuple with the socket instance and the address string
:param socket_name: the socket name
:param polling_mechanism: ``zmq.POLLIN``, ``zmq.POLLOUT`` or ``zmq.POLLIN | zmq.POLLOUT``
**Example:**
::
>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.create('api-server', zmq.REP)
>>> _, address = sockets.bind_to_random_port(
... 'api-server',
... zmq.POLLIN | zmq.POLLOUT,
... local_address='tcp://192.168.10.24
... )
>>> address
'tcp://192.168.10.24:61432'
"""
socket = self.get_by_name(socket_name)
self.register_socket(socket, polling_mechanism)
self.engage(0)
port = socket.bind_to_random_port(local_address)
address = ":".join(list(map(str, [local_address, port])))
self.addresses[socket_name] = address
return socket, address
[docs] def ensure_and_connect(
self, socket_name, socket_type, address, polling_mechanism
):
"""Ensure that a socket exists, that is *connected* to the given address
and that is registered with the given polling mechanism.
.. tip:: This method is a handy replacement for calling
:py:meth:`~agentzero.core.SocketManager.get_or_create`,
:py:meth:`~agentzero.core.SocketManager.connect` and then
:py:meth:`~agentzero.core.SocketManager.engage`.
returns the socket itself.
:param socket_name: the socket name
:param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...)
:param address: a valid zeromq address (i.e: inproc://whatevs)
:param polling_mechanism: ``zmq.POLLIN``, ``zmq.POLLOUT`` or ``zmq.POLLIN | zmq.POLLOUT``
**Example:**
::
>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.ensure_and_connect(
... socket_name='logs',
... zmq.REQ,
... 'tcp://192.168.10.24:7000',
... zmq.POLLIN | zmq.POLLOUT
... )
"""
self.get_or_create(socket_name, socket_type, polling_mechanism)
socket = self.connect(socket_name, address, polling_mechanism)
self.engage()
return socket
[docs] def ensure_and_bind(
self, socket_name, socket_type, address, polling_mechanism
):
"""Ensure that a socket exists, that is *binded* to the given address
and that is registered with the given polling mechanism.
.. tip:: This method is a handy replacement for calling
:py:meth:`~agentzero.core.SocketManager.get_or_create`,
:py:meth:`~agentzero.core.SocketManager.bind` and then
:py:meth:`~agentzero.core.SocketManager.engage`.
returns the socket itself.
:param socket_name: the socket name
:param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...)
:param address: a valid zeromq address (i.e: inproc://whatevs)
:param polling_mechanism: ``zmq.POLLIN``, ``zmq.POLLOUT`` or ``zmq.POLLIN | zmq.POLLOUT``
"""
self.get_or_create(socket_name, socket_type, polling_mechanism)
socket = self.bind(socket_name, address, polling_mechanism)
self.engage()
return socket
[docs] def ready(self, name, polling_mechanism, timeout=None):
"""Polls all sockets and checks if the socket with the given name is ready for either ``zmq.POLLIN`` or ``zmq.POLLOUT``.
returns the socket if available, or ``None``
:param socket_name: the socket name
:param polling_mechanism: either ``zmq.POLLIN`` or ``zmq.POLLOUT``
:param timeout: the polling timeout in miliseconds that will
be passed to ``zmq.Poller().poll()`` (optional, defaults to
``core.DEFAULT_POLLING_TIMEOUT``)
"""
socket = self.get_by_name(name)
available_mechanism = self.engage(
timeout is None and self.timeout or timeout
).pop(socket, None)
if polling_mechanism == available_mechanism:
return socket
[docs] def wait_until_ready(
self, name, polling_mechanism, timeout=None, polling_timeout=None
):
"""Briefly waits until the socket is ready to be used, yields to other
greenlets until the socket becomes available.
returns the socket if available within the given timeout, or ``None``
:param socket_name: the socket name
:param polling_mechanism: either ``zmq.POLLIN`` or ``zmq.POLLOUT``
:param timeout: the timeout in seconds (accepts float) in which it
should wait for the socket to become available
(optional, defaults to ``core.DEFAULT_TIMEOUT_IN_SECONDS``)
:param polling_timeout: the polling timeout in miliseconds that will
be passed to ``zmq.Poller().poll()``.
(optional, defaults to ``core.DEFAULT_POLLING_TIMEOUT``)
"""
timeout = timeout is None and self.timeout or timeout
polling_timeout = (
polling_timeout is None and self.polling_timeout or polling_timeout
)
start = time.time()
current = start
while current - start < timeout:
self.engage(polling_timeout)
socket = self.ready(name, polling_mechanism, timeout=timeout)
current = time.time()
if socket:
return socket
[docs] def get_by_name(self, name):
"""Returns an existing socket by name. It can raise a SocketNotFound
exception.
Returns the socket
:param name: the socket name
"""
if name not in self.sockets:
raise SocketNotFound(self, name)
return self.sockets.get(name)
[docs] def create(self, name, socket_type):
"""Creates a named socket by type. Can raise a SocketAlreadyExists.
Returns the socket itself
:param name: the socket name
:param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...)
"""
if name in self.sockets:
raise SocketAlreadyExists(self, name)
self.sockets[name] = self.context.socket(socket_type)
self.set_socket_option(name, zmq.IDENTITY, cast_bytes(uuid4()))
return self.get_by_name(name)
[docs] def get_or_create(self, name, socket_type, polling_mechanism):
"""ensure that a socket exists and is registered with a given
polling_mechanism (POLLIN, POLLOUT or both)
returns the socket itself.
:param name: the socket name
:param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...)
:param polling_mechanism: one of (``zmq.POLLIN``, ``zmq.POLLOUT``, ``zmq.POLLIN | zmq.POLLOUT``)
.. testcode::
import zmq
from agentzero.core import SocketManager
from agentzero.serializers import JSON
context = zmq.Context()
sockets = SocketManager(zmq, context, serialization_backend=JSON())
sockets.get_or_create(
"requester",
zmq.REQ,
zmq.POLLIN | zmq.POLLOUT
)
"""
if name not in self.sockets:
self.create(name, socket_type)
socket = self.get_by_name(name)
self.register_socket(socket, polling_mechanism)
return socket
[docs] def register_socket(self, socket, polling_mechanism):
"""registers a socket with a given
polling_mechanism (POLLIN, POLLOUT or both)
returns the socket itself.
:param socket: the socket instance
:param polling_mechanism: one of (``zmq.POLLIN``, ``zmq.POLLOUT``, ``zmq.POLLIN | zmq.POLLOUT``)
"""
if socket not in self.registry:
self.poller.register(socket, polling_mechanism)
self.registry[socket] = polling_mechanism
return socket
[docs] def engage(self, timeout=None):
"""polls all registered sockets with the given timeout in miliseconds
returns a dictionary with the sockets that are ready to be used in their respective state (``zmq.POLLIN`` *or* ``zmq.POLLOUT``)
:param timeout: how long should it poll until a socket becomes available. defaults to :py:data:`agentzero.core.DEFAULT_POLLING_TIMEOUT`
"""
polling_timeout = timeout is None and self.polling_timeout or timeout
return collections.OrderedDict(self.poller.poll(polling_timeout))
[docs] def get_log_handler(self, socket_name, topic_name="logs"):
"""returns an instance of :py:class:`~zmq.ZMQPubHandler` attached to a previously-created socket.
:param socket_name: the name of the socket, previously created with :py:meth:`SocketManager.create`
:param topic_name: the name of the topic in which the logs will be PUBlished
**Example:**
.. testcode::
import logging
import zmq.green as zmq
from agentzero.core import SocketManager
context = zmq.Context()
sockets = SocketManager(zmq, context)
handler = sockets.get_log_handler('logs', topic_name='app_logs')
logger = logging.getLogger()
logger.addHandler(handler)
logger.info("Server is up!")
"""
return ZMQPubHandler(self, socket_name, topic_name)
[docs] def get_logger(self, socket_name, topic_name="logs", logger_name=None):
"""returns an instance of :py:class:`~logging.Logger` that contains a
:py:class:`~zmq.ZMQPubHandler` attached to.
:param socket_name: the name of the socket, previously created with :py:meth:`~agentzero.core.SocketManager.create`
:param topic_name: (optional) the name of the topic in which the logs will be PUBlished, defaults to **"logs"**
:param logger_name: (optional) defaults to the given socket name
**Example:**
.. testcode::
import logging
import zmq.green as zmq
from agentzero.core import SocketManager
context = zmq.Context()
sockets = SocketManager(zmq, context)
logger = sockets.get_logger('logs', topic_name='logs', logger_name=__name__)
logger.info("Server is up!")
"""
logger_name = logger_name or socket_name
handler = self.get_log_handler(socket_name, topic_name)
logger = logging.getLogger(logger_name)
logger.addHandler(handler)
return logger
class ZMQPubHandler(logging.Handler):
default_formatter = logging.Formatter(
"[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d - %(message)s\n",
datefmt="%Y-%m-%dT%H:%M:%SZ",
)
formatters = {
logging.DEBUG: default_formatter,
logging.INFO: default_formatter,
logging.WARN: default_formatter,
logging.ERROR: logging.Formatter(
"[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d - %(message)s - %(exc_info)s\n",
datefmt="%Y-%m-%dT%H:%M:%SZ",
),
logging.CRITICAL: default_formatter,
}
def __init__(self, socket_manager, socket_name="logs", topic_name="logs"):
super(ZMQPubHandler, self).__init__()
self.sockets = socket_manager
self.socket_name = socket_name
self.topic_name = cast_bytes(topic_name)
def format(self, record):
return self.formatters[record.levelno].format(record)
def emit(self, record):
msg = cast_bytes(self.format(record))
# except Exception:
# self.handleError(record)
# # return
data = {"msg": msg, "args": record.args, "level": record.levelno}
self.sockets.publish_safe(self.socket_name, self.topic_name, data)
class Event(object):
"""PUB/SUB event container
this is an opaque data structure that represents a single, entire
event: ``topic`` and ``data``
"""
def __init__(self, topic, data):
self.__topic = topic
self.__data = data
@property
def topic(self):
"""a string containing the topic name. zero-length in absence of topic."""
return self.__topic
@property
def data(self):
"""the deserialized python object containing the event payload."""
return self.__data