ZeroMQ integration with asyncio (PEP 3156).
create_zmq_connection()coroutine for making 0MQ connections.
- Provides RPC Request-Reply, Push-Pull and Publish-Subscribe patterns for remote calls.
The library works on Linux, MacOS X and Windows.
But Windows is a second-class citizen in ZeroMQ world, sorry.
Thus aiozmq has limited support for Windows also.
- You obviously cannot use ipc://name schema for endpoint
- aiozmq`s loop
aiozmq.ZmqEventLoopis built on top of
selectsystem call, so it’s not fast comparing to
asyncio.ProactorEventLoopand it doesn’t support subprocesses.
The core requires only pyzmq and can be installed (with pyzmq as dependency) by executing:
pip3 install aiozmq
Also probably you want to use
RPC module is optional and requires msgpack. You can install msgpack by executing:
pip3 install msgpack
aiozmq can be executed by Python 3 only. The most Linux distributions uses pip3 for installing Python 3 libraries. But your system may be using Python 3 by default than try just pip instead of pip3. The same may be true for virtualenv, travis continuous integration system etc.
The project is hosted on GitHub
Please feel free to file an issue on bug tracker if you have found a bug or have some suggestion for library improvement.
The library uses Travis for Continious Integration.
Low-level request-reply example:
import asyncio import aiozmq import zmq @asyncio.coroutine def go(): router = yield from aiozmq.create_zmq_stream( zmq.ROUTER, bind='tcp://127.0.0.1:*') addr = list(router.transport.bindings()) dealer = yield from aiozmq.create_zmq_stream( zmq.DEALER, connect=addr) for i in range(10): msg = (b'data', b'ask', str(i).encode('utf-8')) dealer.write(msg) data = yield from router.read() router.write(data) answer = yield from dealer.read() print(answer) dealer.close() router.close() asyncio.get_event_loop().run_until_complete(go())
Example of RPC usage:
import asyncio import aiozmq.rpc class ServerHandler(aiozmq.rpc.AttrHandler): @aiozmq.rpc.method def remote_func(self, a:int, b:int) -> int: return a + b @asyncio.coroutine def go(): server = yield from aiozmq.rpc.serve_rpc( ServerHandler(), bind='tcp://127.0.0.1:5555') client = yield from aiozmq.rpc.connect_rpc( connect='tcp://127.0.0.1:5555') ret = yield from client.call.remote_func(1, 2) assert 3 == ret server.close() client.close() asyncio.get_event_loop().run_until_complete(go())
To execute the last example you need to install msgpack first.
Indices and tables¶
- Streams API
- Remote Procedure Calls
- Core API
- Examples of aiozmq usage
- Simple DEALER-ROUTER pair implemented on Core level
- DEALER-ROUTER pair implemented with streams
- Remote Procedure Call
- Pipeline aka Notifier
- Translation RPC exceptions back to client
- Translation instances of custom classes via RPC
- Validation of RPC methods
- RPC lookup in nested namespaces
- Use dict as RPC lookup table
- Use dynamic RPC lookup
- Socket event monitor
- Stream socket event monitor
- Synchronous and asynchronous code works together