The Buffer API provides a zero-copy WSGI input buffer for streaming HTTP request bodies from Erlang to Python. Buffers use shared memory with GIL-released blocking reads for efficient data transfer.
Buffers are designed for WSGI/ASGI wsgi.input scenarios where Erlang receives HTTP body chunks and Python needs to consume them:
- Zero-copy access via Python's buffer protocol (
memoryview) - File-like interface (
read,readline,readlines) - Blocking reads that release the GIL while waiting
- Fast substring search using
memchr/memmem
Use buffers when you need:
- WSGI input for HTTP request bodies
- Streaming data from Erlang to Python
- Zero-copy access to binary data
%% Create a buffer (chunked - unknown size)
{ok, Buf} = py_buffer:new(),
%% Or with known content length (pre-allocates memory)
{ok, Buf} = py_buffer:new(4096),
%% Write HTTP body chunks
ok = py_buffer:write(Buf, <<"chunk1">>),
ok = py_buffer:write(Buf, <<"chunk2">>),
%% Signal end of data
ok = py_buffer:close(Buf),
%% Pass to WSGI app
py:call(Ctx, myapp, handle_request, [#{<<"wsgi.input">> => Buf}]).def handle_request(environ):
wsgi_input = environ['wsgi.input']
# Read all data
body = wsgi_input.read()
# Or read line by line
for line in wsgi_input:
process(line)
# Or read specific amount
chunk = wsgi_input.read(1024)Create a buffer for chunked/streaming data (unknown content length).
{ok, Buf} = py_buffer:new().The buffer starts with a default capacity (64KB) and grows as needed.
Create a buffer with known content length.
{ok, Buf} = py_buffer:new(ContentLength).Arguments:
ContentLength- Expected total size in bytes, orundefinedfor chunked
Pre-allocating avoids buffer growth overhead when content length is known.
Write binary data to the buffer.
ok = py_buffer:write(Buf, Data).Arguments:
Buf- Buffer reference fromnew/0,1Data- Binary data to append
Returns:
ok- Data written successfully{error, closed}- Buffer was closed
Writing signals any waiting Python readers via pthread_cond_broadcast.
Signal end of data (EOF).
ok = py_buffer:close(Buf).After closing:
- No more data can be written
- Python's
read()returns remaining data then empty bytes - Waiting Python threads are woken up
The buffer appears in Python as erlang.PyBuffer when passed from Erlang.
from erlang import PyBufferRead up to size bytes, blocking if needed.
data = buf.read() # Read all (blocks until EOF)
chunk = buf.read(1024) # Read up to 1024 bytesBehavior:
- If
size=-1, reads all data (waits for EOF if content length known) - If data available, returns immediately
- If empty, blocks until data arrives (GIL released during wait)
- Returns empty bytes at EOF
Read available bytes without blocking. For async I/O.
chunk = buf.read_nonblock(1024) # Read up to 1024 available bytes
data = buf.read_nonblock() # Read all available bytesBehavior:
- Returns immediately with whatever data is available
- Never blocks, even if no data available
- Returns empty bytes if nothing available (check
readable_amount()first) - Use with
readable_amount()andat_eof()for async I/O loops
Return number of bytes available without blocking.
available = buf.readable_amount()
if available > 0:
data = buf.read_nonblock(available)Returns: Number of bytes that can be read immediately.
Check if buffer is at EOF with no more data.
while not buf.at_eof():
if buf.readable_amount() > 0:
chunk = buf.read_nonblock(4096)
process(chunk)
else:
await asyncio.sleep(0.001) # Yield to event loopReturns: True if EOF signaled AND all data has been read.
Read one line, blocking if needed.
line = buf.readline() # Read until newline or EOFReturns: Bytes including the trailing newline, or empty at EOF.
Uses memchr for fast newline scanning.
Read all lines as a list.
lines = buf.readlines() # ['line1\n', 'line2\n', ...]Arguments:
hint- Optional size hint; stops after approximately this many bytes
Seek to position within already-written data.
buf.seek(0) # Seek to beginning (SEEK_SET)
buf.seek(10, 1) # Seek forward 10 bytes (SEEK_CUR)
buf.seek(-5, 2) # Seek 5 bytes before end (SEEK_END, requires EOF)Limitations:
- Cannot seek past written data
SEEK_ENDrequires EOF flag set
Return current read position.
pos = buf.tell() # Current byte offsetFast substring search using memchr/memmem.
idx = buf.find(b'\n') # Find first newline
idx = buf.find(b'boundary') # Find multipart boundaryReturns: Lowest index where substring found, or -1 if not found.
Single-byte search uses memchr (very fast). Multi-byte uses memmem.
Buffers support Python's buffer protocol for zero-copy access:
# Create memoryview for zero-copy access
mv = memoryview(buf)
# Access without copying
first_byte = mv[0]
slice_data = bytes(mv[10:20])
# Release when done
mv.release()Properties:
readonly=True- Buffer is read-only from Pythonndim=1- One-dimensional byte array
Line-by-line iteration:
for line in buf:
process(line)Equivalent to calling readline() until EOF.
buf.readable() # True - always readable
buf.writable() # False - not writable from Python
buf.seekable() # True - limited seeking supported
buf.closed # True if buffer is closed
len(buf) # Available bytes (write_pos - read_pos)
buf.close() # Mark buffer as closedErlang Python
------ ------
py_buffer:new() -----------------> Buffer created
(pthread mutex+cond initialized)
py_buffer:write(Buf, Data)
|
v
memcpy to buffer
pthread_cond_broadcast() ------> read()/readline() wakes up
(GIL was released during wait)
|
v
Return data to Python
py_buffer:close() ---------------> EOF flag set
Waiting readers return
Memory Layout:
py_buffer_resource_t
+------------------+
| data* | --> [chunk1][chunk2][chunk3]...
| capacity | ^ ^
| write_pos | ----+ |
| read_pos | ------------+
| content_length |
| mutex |
| data_ready (cond)|
| eof |
| closed |
| view_count |
+------------------+
-
Use known content length when available - avoids buffer reallocation:
ContentLength = byte_size(Body), {ok, Buf} = py_buffer:new(ContentLength).
-
Write in reasonable chunks - very small writes have overhead:
%% Good: write accumulated chunks ok = py_buffer:write(Buf, AccumulatedData). %% Less efficient: many tiny writes %% [py_buffer:write(Buf, <<B>>) || B <- binary_to_list(Data)].
-
Use memoryview for zero-copy when processing large bodies:
mv = memoryview(buf) # Process without copying boundary_pos = buf.find(b'--boundary') part = bytes(mv[:boundary_pos])
-
Use find() for parsing -
memchr/memmemare faster than Python string methods.
%% Simulate receiving HTTP body
{ok, Buf} = py_buffer:new(byte_size(Body)),
ok = py_buffer:write(Buf, Body),
ok = py_buffer:close(Buf),
%% Build WSGI environ
Environ = #{
<<"REQUEST_METHOD">> => <<"POST">>,
<<"PATH_INFO">> => <<"/api/data">>,
<<"CONTENT_TYPE">> => <<"application/json">>,
<<"CONTENT_LENGTH">> => integer_to_binary(byte_size(Body)),
<<"wsgi.input">> => Buf
},
%% Call WSGI app
{ok, Response} = py:call(myapp, handle, [Environ]).%% Create buffer for chunked encoding
{ok, Buf} = py_buffer:new(),
%% Spawn writer process
spawn(fun() ->
%% Simulate receiving chunks
lists:foreach(fun(Chunk) ->
ok = py_buffer:write(Buf, Chunk),
timer:sleep(10) % Simulate network delay
end, get_chunks()),
ok = py_buffer:close(Buf)
end),
%% Python can start reading immediately
%% read() will block until data available
py:call(myapp, stream_handler, [Buf]).def parse_multipart(buf, boundary):
"""Parse multipart form data from buffer."""
parts = []
while True:
# Find next boundary using fast memmem
idx = buf.find(boundary.encode())
if idx == -1:
break
# Read headers until blank line
headers = {}
while True:
line = buf.readline()
if line == b'\r\n':
break
name, value = line.split(b':', 1)
headers[name.strip()] = value.strip()
# Read content until next boundary
# ... process part
parts.append({'headers': headers, 'data': data})
return partsFor asyncio applications, use the non-blocking methods to avoid blocking the event loop:
import asyncio
from erlang import PyBuffer
async def read_buffer_async(buf):
"""Read from buffer without blocking the event loop."""
chunks = []
while not buf.at_eof():
available = buf.readable_amount()
if available > 0:
# Read available data
chunk = buf.read_nonblock(4096)
chunks.append(chunk)
else:
# Yield to event loop, check again soon
await asyncio.sleep(0.001)
return b''.join(chunks)
async def process_wsgi_body_async(environ):
"""Process WSGI body in async context."""
buf = environ['wsgi.input']
# Read body without blocking
body = await read_buffer_async(buf)
return json.loads(body)For production use, consider integrating with Erlang's event notification:
async def read_with_notification(buf, notify_channel):
"""Read using Erlang channel for data-ready notifications."""
chunks = []
while not buf.at_eof():
available = buf.readable_amount()
if available > 0:
chunk = buf.read_nonblock(available)
chunks.append(chunk)
else:
# Wait for Erlang to signal data is ready
await notify_channel.async_receive()
return b''.join(chunks)- Channel - Bidirectional message passing
- Reactor - FD-based protocol handling
- Web Frameworks - ASGI/WSGI integration
- Getting Started - Basic usage guide