Source code for agentzero.errors

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import unicode_literals


[docs]class AgentZeroSocketError(BaseException): """Base exception class for errors originated in :py:class:`~agentzero.core.SocketManager`""" pass
[docs]class SocketAlreadyExists(AgentZeroSocketError): """raised by :py:class:`~agentzero.core.SocketManager` when trying to create a named socket that already exists :: >>> from agentzero.core import zmq >>> from agentzero.core import SocketManager >>> sockets = SocketManager() >>> sockets.create('foo', zmq.REP) >>> sockets.create('foo', zmq.REP) Traceback (most recent call last): ... SocketAlreadyExists: SocketManager(sockets=['foo']) already has a socket named 'foo'. """ def __init__(self, manager, socket_name): msg = "{0} already has a socket named {1}.".format( manager, repr(socket_name) ) super(SocketAlreadyExists, self).__init__(msg)
[docs]class SocketNotFound(AgentZeroSocketError): """raised by :py:class:`~agentzero.core.SocketManager` when trying to retrieve an unexisting socket :: >>> from agentzero.core import zmq >>> from agentzero.core import SocketManager >>> sockets = SocketManager() >>> sockets.get_by_name('some-name', zmq.PUB) Traceback (most recent call last): ... SocketNotFound: SocketManager(sockets=['']) has no sockets named 'some-name'. """ def __init__(self, manager, socket_name): msg = "{0} has no sockets named {1}.".format(manager, repr(socket_name)) super(SocketNotFound, self).__init__(msg)
[docs]class SocketBindError(AgentZeroSocketError): """raised by :py:class:`~agentzero.core.SocketManager` when a :py:meth:`~agentzero.core.SocketManager.bind` operation fails. """
[docs]class SocketConnectError(AgentZeroSocketError): """raised by :py:class:`~agentzero.core.SocketManager` when a :py:meth:`~agentzero.core.SocketManager.connect` operation fails. """