import asyncio
from collections import Iterable
from functools import partial
import zmq
from aiozmq import create_zmq_connection
from .base import (
NotFoundError,
ParametersError,
Service,
ServiceClosedError,
_BaseProtocol,
_BaseServerProtocol,
)
from .log import logger
@asyncio.coroutine
[docs]def connect_pubsub(*, connect=None, bind=None, loop=None,
translation_table=None):
"""A coroutine that creates and connects/binds pubsub client.
Usually for this function you need to use connect parameter, but
ZeroMQ does not forbid to use bind.
translation_table -- an optional table for custom value translators.
loop -- an optional parameter to point ZmqEventLoop. If loop is
None then default event loop will be given by
asyncio.get_event_loop() call.
Returns PubSubClient instance.
"""
if loop is None:
loop = asyncio.get_event_loop()
transp, proto = yield from create_zmq_connection(
lambda: _ClientProtocol(loop, translation_table=translation_table),
zmq.PUB, connect=connect, bind=bind, loop=loop)
return PubSubClient(loop, proto)
@asyncio.coroutine
[docs]def serve_pubsub(handler, *, subscribe=None, connect=None, bind=None,
loop=None, translation_table=None, log_exceptions=False,
exclude_log_exceptions=(), timeout=None):
"""A coroutine that creates and connects/binds pubsub server instance.
Usually for this function you need to use *bind* parameter, but
ZeroMQ does not forbid to use *connect*.
handler -- an object which processes incoming pipeline calls.
Usually you like to pass AttrHandler instance.
log_exceptions -- log exceptions from remote calls if True.
subscribe -- subscription specification. Subscribe server to
topics. Allowed parameters are str, bytes, iterable
of str or bytes.
translation_table -- an optional table for custom value translators.
exclude_log_exceptions -- sequence of exception classes than should not
be logged.
timeout -- timeout for performing handling of async server calls.
loop -- an optional parameter to point ZmqEventLoop. If loop is
None then default event loop will be given by
asyncio.get_event_loop() call.
Returns PubSubService instance.
Raises OSError on system error.
Raises TypeError if arguments have inappropriate type.
"""
if loop is None:
loop = asyncio.get_event_loop()
transp, proto = yield from create_zmq_connection(
lambda: _ServerProtocol(loop, handler,
translation_table=translation_table,
log_exceptions=log_exceptions,
exclude_log_exceptions=exclude_log_exceptions,
timeout=timeout),
zmq.SUB, connect=connect, bind=bind, loop=loop)
serv = PubSubService(loop, proto)
if subscribe is not None:
if isinstance(subscribe, (str, bytes)):
subscribe = [subscribe]
else:
if not isinstance(subscribe, Iterable):
raise TypeError('bind should be str, bytes or iterable')
for topic in subscribe:
serv.subscribe(topic)
return serv
class _ClientProtocol(_BaseProtocol):
def call(self, topic, name, args, kwargs):
if self.transport is None:
raise ServiceClosedError()
if topic is None:
btopic = b''
elif isinstance(topic, str):
btopic = topic.encode('utf-8')
elif isinstance(topic, bytes):
btopic = topic
else:
raise TypeError('topic argument should be None, str or bytes '
'({!r})'.format(topic))
bname = name.encode('utf-8')
bargs = self.packer.packb(args)
bkwargs = self.packer.packb(kwargs)
self.transport.write([btopic, bname, bargs, bkwargs])
fut = asyncio.Future(loop=self.loop)
fut.set_result(None)
return fut
class PubSubClient(Service):
def __init__(self, loop, proto):
super().__init__(loop, proto)
def publish(self, topic):
"""Return object for dynamic PubSub calls.
The usage is:
yield from client.publish('my_topic').ns.func(1, 2)
topic argument may be None otherwise must be isntance of str or bytes
"""
return _MethodCall(self._proto, topic)
class PubSubService(Service):
def subscribe(self, topic):
"""Subscribe to the topic.
topic argument must be str or bytes.
Raises TypeError in other cases
"""
if isinstance(topic, bytes):
btopic = topic
elif isinstance(topic, str):
btopic = topic.encode('utf-8')
else:
raise TypeError('topic should be str or bytes, got {!r}'
.format(topic))
self.transport.subscribe(btopic)
def unsubscribe(self, topic):
"""Unsubscribe from the topic.
topic argument must be str or bytes.
Raises TypeError in other cases
"""
if isinstance(topic, bytes):
btopic = topic
elif isinstance(topic, str):
btopic = topic.encode('utf-8')
else:
raise TypeError('topic should be str or bytes, got {!r}'
.format(topic))
self.transport.unsubscribe(btopic)
class _MethodCall:
__slots__ = ('_proto', '_topic', '_names')
def __init__(self, proto, topic, names=()):
self._proto = proto
self._topic = topic
self._names = names
def __getattr__(self, name):
return self.__class__(self._proto, self._topic,
self._names + (name,))
def __call__(self, *args, **kwargs):
if not self._names:
raise ValueError("PubSub method name is empty")
return self._proto.call(self._topic, '.'.join(self._names),
args, kwargs)
class _ServerProtocol(_BaseServerProtocol):
def msg_received(self, data):
btopic, bname, bargs, bkwargs = data
args = self.packer.unpackb(bargs)
kwargs = self.packer.unpackb(bkwargs)
try:
name = bname.decode('utf-8')
func = self.dispatch(name)
args, kwargs, ret_ann = self.check_args(func, args, kwargs)
except (NotFoundError, ParametersError) as exc:
fut = asyncio.Future(loop=self.loop)
fut.set_exception(exc)
else:
if asyncio.iscoroutinefunction(func):
fut = self.add_pending(func(*args, **kwargs))
else:
fut = asyncio.Future(loop=self.loop)
try:
fut.set_result(func(*args, **kwargs))
except Exception as exc:
fut.set_exception(exc)
fut.add_done_callback(partial(self.process_call_result,
name=name, args=args, kwargs=kwargs))
def process_call_result(self, fut, *, name, args, kwargs):
self.discard_pending(fut)
try:
if fut.result() is not None:
logger.warning("PubSub handler %r returned not None", name)
except asyncio.CancelledError:
return
except (NotFoundError, ParametersError) as exc:
logger.exception("Call to %r caused error: %r", name, exc)
except Exception:
self.try_log(fut, name, args, kwargs)