The Channel API provides efficient bidirectional message passing between Erlang and Python. Channels use enif_ioq for zero-copy buffering and integrate with Python's asyncio for non-blocking operations.
Channels are faster than the Reactor pattern for message passing scenarios:
| Message Size | Channel | Reactor | Speedup |
|---|---|---|---|
| 64 bytes | 6.2M ops/s | 772K ops/s | 8x |
| 1KB | 3.8M ops/s | 734K ops/s | 5x |
| 16KB | 1.1M ops/s | 576K ops/s | 2x |
Use channels when you need:
- High-throughput message streaming
- Bidirectional Erlang-Python communication
- Asyncio integration
- Backpressure support
%% Create a channel
{ok, Ch} = py_channel:new(),
%% Send messages with sender PID for replies
ok = py_channel:send(Ch, {request, self(), <<"data">>}),
%% Wait for response
receive
{response, Result} ->
io:format("Got result: ~p~n", [Result])
end,
%% Close when done
py_channel:close(Ch).from erlang.channel import Channel, reply
def process_messages(channel_ref):
ch = Channel(channel_ref)
for msg in ch:
# Extract sender PID from message
_, sender_pid, data = msg
# Process and reply
result = process(data)
reply(sender_pid, ('response', result))from erlang.channel import Channel, reply
async def process_messages(channel_ref):
ch = Channel(channel_ref)
async for msg in ch:
# Extract sender PID from message
_, sender_pid, data = msg
# Process and reply
result = await process(data)
reply(sender_pid, ('response', result))Create a new channel.
%% Unbounded channel
{ok, Ch} = py_channel:new().
%% Channel with backpressure (max 10KB queued)
{ok, Ch} = py_channel:new(#{max_size => 10000}).Options:
max_size- Maximum queue size in bytes. When exceeded,send/2returnsbusy.
Send an Erlang term to Python.
ok = py_channel:send(Ch, Term).Returns:
ok- Message queued successfullybusy- Queue full (backpressure){error, closed}- Channel was closed
Close the channel. Python receivers will get StopIteration.
ok = py_channel:close(Ch).Get channel status.
Info = py_channel:info(Ch).
%% #{size => 1024, max_size => 10000, closed => false}Wrapper for receiving messages from Erlang.
from erlang.channel import Channel
ch = Channel(channel_ref)Blocking receive. Blocks Python execution until a message is available.
msg = ch.receive() # Blocks until message availableBehavior:
- If the channel has data, returns immediately
- If empty, suspends the Erlang process via
receive, releasing the dirty scheduler - Other Erlang processes can run while waiting for data
Raises: ChannelClosed when the channel is closed.
Non-blocking receive. Returns immediately.
msg = ch.try_receive() # Returns None if emptyReturns: Message or None if empty.
Raises: ChannelClosed when the channel is closed.
Asyncio-compatible receive. Yields to other coroutines while waiting.
msg = await ch.async_receive()Behavior:
- When using
ErlangEventLoop: Uses event-driven notification (no polling). The channel notifies the event loop via timer dispatch when data arrives. - When using other asyncio loops: Falls back to polling with 100us sleep intervals.
Raises: ChannelClosed when the channel is closed.
Close the channel from Python. Wakes any waiting receivers.
ch.close() # Signal no more data will be sentSafe to call multiple times.
Channels support the with statement for automatic cleanup:
with Channel(channel_ref) as ch:
for msg in ch:
process(msg)
# channel automatically closed on exit# Sync iteration
for msg in channel:
process(msg)
# Async iteration
async for msg in channel:
process(msg)Send a message to an Erlang process.
from erlang.channel import reply
# Reply to the sender
reply(sender_pid, {"status": "ok", "result": data})Raised when receiving from a closed channel.
from erlang.channel import Channel, ChannelClosed
try:
msg = ch.receive()
except ChannelClosed:
print("Channel closed")Channels support backpressure to prevent unbounded memory growth.
{ok, Ch} = py_channel:new(#{max_size => 10000}),
case py_channel:send(Ch, LargeData) of
ok ->
continue;
busy ->
%% Queue is full, wait before retrying
timer:sleep(10),
retry
end.#{size := Size, max_size := MaxSize} = py_channel:info(Ch),
Utilization = Size / MaxSize.%% Erlang: Send request, receive response
{ok, Ch} = py_channel:new(),
ok = py_channel:send(Ch, {request, self(), <<"compute">>}),
receive
{response, Result} -> Result
end.from erlang.channel import Channel, reply
def handle_requests(channel_ref):
ch = Channel(channel_ref)
for msg in ch:
if msg[0] == 'request':
_, sender_pid, data = msg
result = compute(data)
reply(sender_pid, ('response', result))%% Erlang: Stream data to Python
{ok, Ch} = py_channel:new(),
lists:foreach(fun(Item) ->
ok = py_channel:send(Ch, Item)
end, large_list()),
py_channel:close(Ch).async def process_stream(channel_ref):
ch = Channel(channel_ref)
results = []
async for item in ch:
results.append(process(item))
return results%% Erlang: Distribute work across Python workers
{ok, Ch} = py_channel:new(#{max_size => 100000}),
%% Start multiple Python workers on the channel
[spawn_python_worker(Ch) || _ <- lists:seq(1, 4)],
%% Send work items
[py_channel:send(Ch, {work, Item}) || Item <- WorkItems],
%% Signal completion
py_channel:close(Ch).import asyncio
from erlang.channel import Channel
async def worker(channel_ref, worker_id):
ch = Channel(channel_ref)
async for msg in ch:
if msg[0] == 'work':
_, item = msg
await process_item(item)
print(f"Worker {worker_id} processed {item}")-
Use async iteration for high-throughput scenarios - it allows other coroutines to run while waiting.
-
Set appropriate
max_sizeto prevent memory issues while maintaining throughput. -
Batch messages when possible - sending fewer larger messages is more efficient than many small ones.
-
Avoid
try_receivepolling - use blockingreceive()or asyncasync_receive()instead.
Erlang Python
────── ──────
py_channel:new() ─────────────────▶ Channel created
py_channel:send(Ch, Term)
│
▼
enif_term_to_binary()
│
▼
enif_ioq_enq_binary() ──────────▶ channel.receive()
│
▼
enif_ioq_peek()
│
▼
enif_binary_to_term()
│
▼
Python term
py_channel:close() ───────────────▶ StopIteration
When using ErlangEventLoop, async_receive() uses event-driven notification:
Python C / Erlang
────── ──────────
await ch.async_receive()
│
├── try_receive() ──────────▶ Check queue (fast path)
│ └── Data? Return immediately
│
└── No data:
│
├── Create Future + callback_id
├── Register in loop._timers[callback_id]
│
└── _channel_wait() ────▶ Register waiter in channel
(callback_id + loop ref)
│
await future ◀─────────────────────────────┘
│ │
│ [Data arrives]
│ │
│ py_channel:send()
│ │
│ channel_send()
│ │
│ event_loop_add_pending()
│ │
│ pthread_cond_signal()
│ │
│ ┌───────────────────────────────┘
│ │
│ ▼
│ _run_once_native_for() returns pending
│ │
│ ▼
│ _dispatch(callback_id, TIMER)
│ │
│ ▼
│ Fire handle from _timers
│ │
│ ▼
│ Callback: try_receive() → future.set_result(data)
│
▼
Return data
This avoids polling overhead - Python only wakes when data actually arrives.
For binary protocols and raw byte streaming (e.g., HTTP bodies, file transfers), use ByteChannel instead of Channel. ByteChannel passes bytes directly without term serialization, avoiding encoding/decoding overhead.
| Use Case | Channel | ByteChannel |
|---|---|---|
| Structured messages | Yes | No |
| RPC-style communication | Yes | No |
| HTTP bodies | No | Yes |
| File streaming | No | Yes |
| Binary protocols | No | Yes |
| Raw byte streams | No | Yes |
%% Create a byte channel
{ok, Ch} = py_byte_channel:new(),
%% Send raw bytes
ok = py_byte_channel:send(Ch, <<"HTTP/1.1 200 OK\r\n">>),
ok = py_byte_channel:send(Ch, BodyBytes),
%% Receive raw bytes
{ok, Data} = py_byte_channel:recv(Ch),
%% Non-blocking receive
{ok, Data} = py_byte_channel:try_receive(Ch),
{error, empty} = py_byte_channel:try_receive(Ch), %% If no data
%% Close when done
py_byte_channel:close(Ch).from erlang import ByteChannel, ByteChannelClosed
def process_bytes(channel_ref):
ch = ByteChannel(channel_ref)
# Blocking receive (releases GIL while waiting)
data = ch.receive_bytes()
# Non-blocking receive
data = ch.try_receive_bytes() # Returns None if empty
# Iterate over bytes
for chunk in ch:
process(chunk)
# Send bytes back
ch.send_bytes(b"response data")
# Close when done
ch.close()
# Or use context manager for automatic cleanup
with ByteChannel(channel_ref) as ch:
for chunk in ch:
process(chunk)
# channel automatically closedfrom erlang import ByteChannel, ByteChannelClosed
async def process_bytes_async(channel_ref):
ch = ByteChannel(channel_ref)
# Async receive (yields to other coroutines)
data = await ch.async_receive_bytes()
# Async iteration
async for chunk in ch:
process(chunk)Event-driven async: When using ErlangEventLoop, async_receive_bytes() uses event-driven notification instead of polling. The channel signals the event loop when data arrives, avoiding CPU overhead from sleep loops.
Channel (term-based):
Erlang: term_to_binary() ──▶ enif_ioq ──▶ binary_to_term() :Python
ByteChannel (raw bytes):
Erlang: raw bytes ─────────▶ enif_ioq ─────────▶ raw bytes :Python
ByteChannel reuses the same underlying py_channel_t structure but skips the term serialization/deserialization steps.
- Reactor - FD-based protocol handling for sockets
- Asyncio - Erlang-native asyncio event loop
- Getting Started - Basic usage guide