Streams API

New in version 0.6.

While aiozmq library is built on top of low-level ZmqTransport and ZmqProtocol API it provides a more convinient way also.

Please take a look on example:

import asyncio
import aiozmq
import zmq

@asyncio.coroutine
def go():
    router = yield from aiozmq.create_zmq_stream(
        zmq.ROUTER,
        bind='tcp://127.0.0.1:*')

    addr = list(router.transport.bindings())[0]
    dealer = yield from aiozmq.create_zmq_stream(
        zmq.DEALER,
        connect=addr)

    for i in range(10):
        msg = (b'data', b'ask', str(i).encode('utf-8'))
        dealer.write(msg)
        data = yield from router.read()
        router.write(data)
        answer = yield from dealer.read()
        print(answer)
    dealer.close()
    router.close()

asyncio.get_event_loop().run_until_complete(go())

The code creates two streams for request and response part of ZeroMQ connection and sends message through the wire with waiting for response.

create_zmq_stream

aiozmq.create_zmq_stream(zmq_type, *, bind=None, connect=None, loop=None, zmq_sock=None, high_read=None, low_read=None, high_write=None, low_write=None)

A wrapper for create_zmq_connection() returning a ZeroMQ stream (ZmqStream instance).

The arguments are all the usual arguments to create_zmq_connection() plus high and low watermarks for reading and writing messages.

This function is a coroutine.

Parameters:
  • zmq_type (int) – a type of ZeroMQ socket (zmq.REQ, zmq.REP, zmq.PUB, zmq.SUB, zmq.PAIR*, zmq.DEALER, zmq.ROUTER, zmq.PULL, zmq.PUSH, etc.)
  • bind (str or iterable of strings) –

    endpoints specification.

    Every endpoint generates call to ZmqTransport.bind() for accepting connections from specified endpoint.

    Other side should use connect parameter to connect to this transport.

  • connect (str or iterable of strings) –

    endpoints specification.

    Every endpoint generates call to ZmqTransport.connect() for connecting transport to specified endpoint.

    Other side should use bind parameter to wait for incoming connections.

  • zmq_sock (zmq.Socket) – a preexisting zmq socket that will be passed to returned transport.
  • loop (asyncio.AbstractEventLoop) – optional event loop instance, None for default event loop.
  • high_read (int) – high-watermark for reading from ZeroMQ socket. None by default (no limits).
  • low_read (int) – low-watermark for reading from ZeroMQ socket. None by default (no limits).
  • high_write (int) – high-watermark for writing into ZeroMQ socket. None by default (no limits).
  • low_write (int) – low-watermark for writing into ZeroMQ socket. None by default (no limits).
Returns:

ZeroMQ stream object, ZmqStream instance.

ZmqStream

class aiozmq.ZmqStream

A class for sending and receiving ZeroMQ messages.

transport

ZmqTransport instance, used for the stream.

at_closing()

Return True if the buffer is empty and feed_closing() was called.

close()

Close the stream and underlying ZeroMQ socket.

drain()

Wait until the write buffer of the underlying transport is flushed.

The intended use is to write:

w.write(data)
yield from w.drain()

When the transport buffer is full (the protocol is paused), block until the buffer is (partially) drained and the protocol is resumed. When there is nothing to wait for, the yield-from continues immediately.

This method is a coroutine.

exception()

Get the stream exception.

get_extra_info(name, default=None)

Return optional transport information: see asyncio.BaseTransport.get_extra_info().

read()

Read one ZeroMQ message from the wire and return it.

Raise ZmqStreamClosed if the stream was closed.

write(msg)

Writes message msg into ZeroMQ socket.

Parameters:msg – a sequence (tuple or list), containing multipart message daata.

Internal API

set_exception(exc)

Set the exception to exc. The exception may be retrieved by exception() call or raised by next read(), the private method.

set_transport(transport)

Set the transport to transport, the private method.

set_read_buffer_limits(high=None, low=None)

Set read buffer limits, the private method.

feed_closing()

Feed the socket closing signal, the private method.

feed_msg(msg)

Feed msg message to the stream’s internal buffer. Any operations waiting for the data will be resumed.

The private method.

Exceptions

exception aiozmq.ZmqStreamClosed

Raised by read operations on closed stream.