For safe message-based communication between threads and processes in Python, I
tend to use
Pipe. A pattern often seen is using a queue for sending messages
from multiple producers to a single consumer.
When a producer wants a response to its message, I create a
piggy-back one end of the
Connection object) to the message. I use
Python dicts as messages, and use the string “reply_to” as the dictionary key
for the connection objects.
When the queue consumer processes a message, it doesn’t know who the sender is
or how to reach him. Though, if the message has an attached
object, the consumer can–almost magically–respond to the sender, across
thread and process boundaries.
All good? Nope.
Any message sent through the queues and pipes must be serializable, or
picklable as we say in Pythonesque. The
can be serialized, but not unserialized, which means that you will not see an
exception when you create your message, but some time later in the consumer
that tries to respond. The exception does not tell you much, unless you’ve seen
I’ve usually added a version of the workaround to some util package in my
projects; one function for pickling a connection, and one function for
unpickling a connection. In my code I’ve been forced to manually
Connection objects before putting them on a
This works great most of the time, but not this time. In the Python actor model
library Pykka I use
Connection objects to
implement futures for thread-based actors, similar to how I use gevent’s
AsyncResult for gevent-based actors. When someone sets a value on the future,
it is written to one end of a
Pipe. When someone tries to read the future’s
value, they block on the other end of the
Pipe until there is something to
get or a timeout is reached. The problem appeared when I tried to nest futures,
which is likely to happen if an actor, in response to your message, returns a
future result from another actor. I no longer have the opportunity to babysit
Connection object that goes into or comes out of another
They need to be able to watch over themselves. As the
Connection class is
implemented in C and is rather closed to changes, my solution was to wrap the
ConnectionWrapper class simply implements
__reduce__ on the wrapped
Connection object using multiprocessing’s own
To work like a real
Connection object, it dispatches any attribute access to
the wrapped connection by implementing
To make sure the connection remains wrapped even after a trip through
_ConnectionWrapperBuilder is used for
rebuilding the connection and rewrapping it on deserialization.
Given this wrapper, you can make your own
Pipe function which creates a new
pipe and wraps the connection objects for you.
Hopefully this trick will be of help until the bug is fixed in Python.