Examples of aiozmq usage¶
There is a list of examples from aiozmq/examples
Every example is a correct tiny python program.
Simple DEALER-ROUTER pair implemented on Core level¶
import asyncio
import aiozmq
import zmq
class ZmqDealerProtocol(aiozmq.ZmqProtocol):
transport = None
def __init__(self, queue, on_close):
self.queue = queue
self.on_close = on_close
def connection_made(self, transport):
self.transport = transport
def msg_received(self, msg):
self.queue.put_nowait(msg)
def connection_lost(self, exc):
self.on_close.set_result(exc)
class ZmqRouterProtocol(aiozmq.ZmqProtocol):
transport = None
def __init__(self, on_close):
self.on_close = on_close
def connection_made(self, transport):
self.transport = transport
def msg_received(self, msg):
self.transport.write(msg)
def connection_lost(self, exc):
self.on_close.set_result(exc)
@asyncio.coroutine
def go():
router_closed = asyncio.Future()
dealer_closed = asyncio.Future()
router, _ = yield from aiozmq.create_zmq_connection(
lambda: ZmqRouterProtocol(router_closed),
zmq.ROUTER,
bind='tcp://127.0.0.1:*')
addr = list(router.bindings())[0]
queue = asyncio.Queue()
dealer, _ = yield from aiozmq.create_zmq_connection(
lambda: ZmqDealerProtocol(queue, dealer_closed),
zmq.DEALER,
connect=addr)
for i in range(10):
msg = (b'data', b'ask', str(i).encode('utf-8'))
dealer.write(msg)
answer = yield from queue.get()
print(answer)
dealer.close()
yield from dealer_closed
router.close()
yield from router_closed
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
DEALER-ROUTER pair implemented with streams¶
import asyncio
import aiozmq
import zmq
@asyncio.coroutine
def go():
router = yield from aiozmq.create_zmq_stream(
zmq.ROUTER,
bind='tcp://127.0.0.1:*')
addr = list(router.transport.bindings())[0]
dealer = yield from aiozmq.create_zmq_stream(
zmq.DEALER,
connect=addr)
for i in range(10):
msg = (b'data', b'ask', str(i).encode('utf-8'))
dealer.write(msg)
data = yield from router.read()
router.write(data)
answer = yield from dealer.read()
print(answer)
dealer.close()
router.close()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
Remote Procedure Call¶
import asyncio
import aiozmq.rpc
class ServerHandler(aiozmq.rpc.AttrHandler):
@aiozmq.rpc.method
def remote_func(self, a: int, b: int) -> int:
return a + b
@asyncio.coroutine
def go():
server = yield from aiozmq.rpc.serve_rpc(
ServerHandler(), bind='tcp://*:*')
server_addr = list(server.transport.bindings())[0]
client = yield from aiozmq.rpc.connect_rpc(
connect=server_addr)
ret = yield from client.call.remote_func(1, 2)
assert 3 == ret
server.close()
yield from server.wait_closed()
client.close()
yield from client.wait_closed()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
Pipeline aka Notifier¶
import asyncio
import aiozmq.rpc
from itertools import count
class Handler(aiozmq.rpc.AttrHandler):
def __init__(self):
self.connected = False
@aiozmq.rpc.method
def remote_func(self, step, a: int, b: int):
self.connected = True
print("HANDLER", step, a, b)
@asyncio.coroutine
def go():
handler = Handler()
listener = yield from aiozmq.rpc.serve_pipeline(
handler, bind='tcp://*:*')
listener_addr = list(listener.transport.bindings())[0]
notifier = yield from aiozmq.rpc.connect_pipeline(
connect=listener_addr)
for step in count(0):
yield from notifier.notify.remote_func(step, 1, 2)
if handler.connected:
break
else:
yield from asyncio.sleep(0.01)
listener.close()
yield from listener.wait_closed()
notifier.close()
yield from notifier.wait_closed()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
Publish-Subscribe¶
import asyncio
import aiozmq.rpc
from itertools import count
class Handler(aiozmq.rpc.AttrHandler):
def __init__(self):
self.connected = False
@aiozmq.rpc.method
def remote_func(self, step, a: int, b: int):
self.connected = True
print("HANDLER", step, a, b)
@asyncio.coroutine
def go():
handler = Handler()
subscriber = yield from aiozmq.rpc.serve_pubsub(
handler, subscribe='topic', bind='tcp://127.0.0.1:*',
log_exceptions=True)
subscriber_addr = list(subscriber.transport.bindings())[0]
print("SERVE", subscriber_addr)
publisher = yield from aiozmq.rpc.connect_pubsub(
connect=subscriber_addr)
for step in count(0):
yield from publisher.publish('topic').remote_func(step, 1, 2)
if handler.connected:
break
else:
yield from asyncio.sleep(0.1)
subscriber.close()
yield from subscriber.wait_closed()
publisher.close()
yield from publisher.wait_closed()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
Translation RPC exceptions back to client¶
import asyncio
import aiozmq.rpc
class CustomError(Exception):
def __init__(self, val):
self.val = val
super().__init__(val)
exc_name = CustomError.__module__+'.'+CustomError.__name__
error_table = {exc_name: CustomError}
class ServerHandler(aiozmq.rpc.AttrHandler):
@aiozmq.rpc.method
def remote(self, val):
raise CustomError(val)
@asyncio.coroutine
def go():
server = yield from aiozmq.rpc.serve_rpc(
ServerHandler(), bind='tcp://*:*')
server_addr = list(server.transport.bindings())[0]
client = yield from aiozmq.rpc.connect_rpc(
connect=server_addr,
error_table=error_table)
try:
yield from client.call.remote('value')
except CustomError as exc:
exc.val == 'value'
server.close()
client.close()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
Translation instances of custom classes via RPC¶
import asyncio
import aiozmq.rpc
import msgpack
class Point:
def __init__(self, x, y):
self.x = x
self.y = y
def __eq__(self, other):
if isinstance(other, Point):
return (self.x, self.y) == (other.x, other.y)
return NotImplemented
translation_table = {
0: (Point,
lambda value: msgpack.packb((value.x, value.y)),
lambda binary: Point(*msgpack.unpackb(binary))),
}
class ServerHandler(aiozmq.rpc.AttrHandler):
@aiozmq.rpc.method
def remote(self, val):
return val
@asyncio.coroutine
def go():
server = yield from aiozmq.rpc.serve_rpc(
ServerHandler(), bind='tcp://*:*',
translation_table=translation_table)
server_addr = list(server.transport.bindings())[0]
client = yield from aiozmq.rpc.connect_rpc(
connect=server_addr,
translation_table=translation_table)
ret = yield from client.call.remote(Point(1, 2))
assert ret == Point(1, 2)
server.close()
yield from server.wait_closed()
client.close()
yield from client.wait_closed()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
Validation of RPC methods¶
import asyncio
import aiozmq.rpc
class ServerHandler(aiozmq.rpc.AttrHandler):
@aiozmq.rpc.method
def remote_func(self, a: int, b: int) -> int:
return a + b
@asyncio.coroutine
def go():
server = yield from aiozmq.rpc.serve_rpc(
ServerHandler(), bind='tcp://*:*')
server_addr = list(server.transport.bindings())[0]
client = yield from aiozmq.rpc.connect_rpc(
connect=server_addr)
try:
yield from client.call.unknown_function()
except aiozmq.rpc.NotFoundError as exc:
print("client.rpc.unknown_function(): {}".format(exc))
try:
yield from client.call.remote_func(bad_arg=1)
except aiozmq.rpc.ParametersError as exc:
print("client.rpc.remote_func(bad_arg=1): {}".format(exc))
try:
yield from client.call.remote_func(1)
except aiozmq.rpc.ParametersError as exc:
print("client.rpc.remote_func(1): {}".format(exc))
try:
yield from client.call.remote_func('a', 'b')
except aiozmq.rpc.ParametersError as exc:
print("client.rpc.remote_func('a', 'b'): {}".format(exc))
server.close()
yield from server.wait_closed()
client.close()
yield from client.wait_closed()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
RPC lookup in nested namespaces¶
import asyncio
import aiozmq.rpc
class Handler(aiozmq.rpc.AttrHandler):
def __init__(self, ident):
self.ident = ident
self.subhandler = SubHandler(self.ident, 'subident')
@aiozmq.rpc.method
def a(self):
return (self.ident, 'a')
class SubHandler(aiozmq.rpc.AttrHandler):
def __init__(self, ident, subident):
self.ident = ident
self.subident = subident
@aiozmq.rpc.method
def b(self):
return (self.ident, self.subident, 'b')
@asyncio.coroutine
def go():
server = yield from aiozmq.rpc.serve_rpc(
Handler('ident'), bind='tcp://*:*')
server_addr = list(server.transport.bindings())[0]
client = yield from aiozmq.rpc.connect_rpc(
connect=server_addr)
ret = yield from client.call.a()
assert ('ident', 'a') == ret
ret = yield from client.call.subhandler.b()
assert ('ident', 'subident', 'b') == ret
server.close()
yield from server.wait_closed()
client.close()
yield from client.wait_closed()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
Use dict as RPC lookup table¶
import asyncio
import aiozmq.rpc
@aiozmq.rpc.method
def a():
return 'a'
@aiozmq.rpc.method
def b():
return 'b'
handlers_dict = {'a': a,
'subnamespace': {'b': b}}
@asyncio.coroutine
def go():
server = yield from aiozmq.rpc.serve_rpc(
handlers_dict, bind='tcp://*:*')
server_addr = list(server.transport.bindings())[0]
client = yield from aiozmq.rpc.connect_rpc(
connect=server_addr)
ret = yield from client.call.a()
assert 'a' == ret
ret = yield from client.call.subnamespace.b()
assert 'b' == ret
server.close()
yield from server.wait_closed()
client.close()
yield from client.wait_closed()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()
Use dynamic RPC lookup¶
import asyncio
import aiozmq.rpc
class DynamicHandler(aiozmq.rpc.AttrHandler):
def __init__(self, namespace=()):
self.namespace = namespace
def __getitem__(self, key):
try:
return getattr(self, key)
except AttributeError:
return DynamicHandler(self.namespace + (key,))
@aiozmq.rpc.method
def func(self):
return (self.namespace, 'val')
@asyncio.coroutine
def go():
server = yield from aiozmq.rpc.serve_rpc(
DynamicHandler(), bind='tcp://*:*')
server_addr = list(server.transport.bindings())[0]
client = yield from aiozmq.rpc.connect_rpc(
connect=server_addr)
ret = yield from client.call.func()
assert ((), 'val') == ret, ret
ret = yield from client.call.a.func()
assert (('a',), 'val') == ret, ret
ret = yield from client.call.a.b.func()
assert (('a', 'b'), 'val') == ret, ret
server.close()
yield from server.wait_closed()
client.close()
yield from client.wait_closed()
def main():
asyncio.get_event_loop().run_until_complete(go())
print("DONE")
if __name__ == '__main__':
main()