aiothreads 1.1.1


pip install aiothreads

  Latest version

Released: Mar 03, 2026


Meta
Requires Python: >=3.10

Classifiers

Framework
  • AsyncIO

Intended Audience
  • Developers
  • Education
  • Information Technology
  • System Administrators

License
  • OSI Approved :: Apache Software License

Natural Language
  • English

Operating System
  • MacOS
  • Microsoft :: Windows
  • POSIX
  • POSIX :: Linux

Programming Language
  • Python
  • Python :: 3
  • Python :: 3.10
  • Python :: 3.11
  • Python :: 3.12
  • Python :: 3.13
  • Python :: 3.14
  • Python :: Implementation :: CPython

Topic
  • Software Development
  • Software Development :: Libraries
  • Software Development :: Libraries :: Python Modules

Typing
  • Typed

aiothreads

PyPI version PyPI - Python Version License PyPI - Downloads Coverage Status

Overview

aiothreads is a Python library that provides seamless integration between asyncio and thread-based execution. It offers decorators and utilities to run synchronous functions and generators in threads while maintaining clean async/await syntax in your asyncio applications.

Why aiothreads?

While Python 3.9+ provides asyncio.to_thread() for running sync functions in threads, aiothreads goes far beyond this basic functionality:

Limitations of asyncio.to_thread():

  • No support for generators or iterators
  • No way to run long-running or blocking operations without creating separate executors
  • No way to call async code from threads

asyncio example

import asyncio
import time


def sync_function(a, b):
    time.sleep(0.01)  # Simulation of blocking call
    return a + b


async def main():
    # Only works for simple functions
    # No support for generators
    result = await asyncio.to_thread(sync_function, 1, 2)
    assert result == 3


asyncio.run(main())

aiothreads - comprehensive solution

import asyncio
import time
from aiothreads import threaded, threaded_iterable, sync_await


def blocking_operation():
    time.sleep(0.01)  # Simulation of blocking call
    return "sync_result"


def expensive_data_source():
    """Simulation of expensive data source"""
    return range(20)


@threaded
def mixed_sync_async():
    sync_result = blocking_operation()

    # Calling async code from thread
    sync_await(asyncio.sleep, 0.01)
    return sync_result


@threaded_iterable(max_size=100)
def data_stream():
    # Automatic backpressure control
    for item in expensive_data_source():
        yield item


async def main():
    # Rich functionality with clean syntax
    result = await mixed_sync_async()
    assert result == "sync_result"

    # Stream processing with memory control
    items = []
    async for item in data_stream():
        items.append(item)
        if len(items) >= 10:
            break  # Automatically stops sync generator thread execution!

    assert len(items) == 10


asyncio.run(main())

Key Features

  • Zero Dependencies: Pure Python implementation with no external dependencies
  • Simple Decorators: Transform sync functions into async-compatible versions with @threaded
  • Generator Support: Convert sync generators to async iterators with @threaded_iterable
  • Thread Isolation: Run code in separate threads with @threaded_separate
  • Async-to-Sync Bridge: Call async code from synchronous threads
  • Context Variable Support: Proper context propagation across thread boundaries
  • Method Support: Works with instance methods, class methods, and static methods
  • Full Type Safety: Complete typing support with ParamSpec and TypeVar for static type checkers
  • Consistent Interface: All decorated functions become objects with sync_call, async_call, and __call__ (alias for async_call) methods

Quick Start

Installation

# Assuming standard installation method
pip install aiothreads

Basic Usage

Decorate any blocking function with @threaded and call it with await. Multiple calls run concurrently in the thread pool:

import asyncio
import time
from aiothreads import threaded


@threaded
def cpu_intensive_task(n):
    """A blocking function that will run in a thread"""
    time.sleep(0.01)  # Simulate CPU work
    return n * n


async def main():
    # Run multiple blocking operations concurrently
    tasks = [cpu_intensive_task(i) for i in range(5)]
    results = await asyncio.gather(*tasks)
    assert results == [0, 1, 4, 9, 16]


asyncio.run(main())

Working With Threads

Choosing the Right Decorator

Use Case Recommended Decorator Reason
Short I/O operations (< 30s) @threaded Efficient resource reuse
CPU-bound tasks (< 30s) @threaded Controlled concurrency
Blocking pipe/stream reading @threaded_separate Won't block thread pool, creates a separate thread
Operations that may hang @threaded_separate Isolation from pool
Continuous monitoring tasks @threaded_separate Don't monopolize pool workers
High-frequency short tasks @threaded Lower overhead
Resource-intensive generators @threaded_iterable Controlled memory usage
Long-lived data streams @threaded_iterable_separate Complete isolation

Thread Pool Benefits:

  • Automatic resource management
  • Built-in concurrency limits
  • Lower overhead for frequent operations
  • Graceful shutdown handling

Separate Thread Benefits:

  • Complete isolation
  • No impact on other threaded operations
  • Suitable for blocking/hanging operations
  • Won't exhaust thread pool workers

Separate Thread Risks:

  • Be careful not to create too many separate threads

The @threaded Decorator

The @threaded decorator converts synchronous functions to run in the asyncio thread pool:

import asyncio
import time
from aiothreads import threaded


@threaded
def fetch_url(url: str) -> str:
    """Simulation of blocking HTTP request"""
    time.sleep(0.01)  # Simulation of blocking call
    return f"content of {url}"


async def main():
    urls = ['http://example.com', 'http://httpbin.org/json']

    # Both requests run concurrently in separate threads
    results = await asyncio.gather(*[fetch_url(url) for url in urls])
    assert len(results) == 2
    assert all(r.startswith("content of") for r in results)


asyncio.run(main())

Decorated Function Interface

When you decorate a function with @threaded, it becomes a Threaded object with three calling methods:

import asyncio
from aiothreads import threaded


@threaded
def compute(x: int, y: int) -> int:
    return x + y


async def main():
    # Three ways to call the function:

    # 1. Default async call (same as __call__)
    result = await compute(1, 2)
    assert result == 3

    # 2. Explicit async call
    result = await compute.async_call(1, 2)
    assert result == 3

    # 3. Accessing the sync call method
    # This runs the function as usual, blocking the thread
    # and returning the result directly
    result = compute.sync_call(1, 2)
    assert result == 3


asyncio.run(main())

All decorated functions also expose sync_call (bypasses threading, runs inline) and async_call (explicit async), in addition to the default __call__ which is an alias for async_call.

Separate Thread Execution

Use @threaded_separate to run functions in completely separate threads (not the thread pool):

import asyncio
import io
import time
from aiothreads import threaded_iterable_separate


@threaded_iterable_separate
def read_unix_pipe(pipe_path):
    """Simulation of reading from a named pipe"""
    # Simulation of blocking pipe read
    simulated_data = io.StringIO("line1\nline2\nline3\n")
    while True:
        time.sleep(0.01)  # Simulation of blocking call
        line = simulated_data.readline()
        if not line:
            break
        yield line.strip()


async def main():
    received = []
    async for line in read_unix_pipe('/tmp/my_pipe'):
        received.append(line)

    assert received == ["line1", "line2", "line3"]


asyncio.run(main())

Important Notice about Separate Functions

Functions decorated with @threaded_separate and @threaded_iterable_separate create new dedicated threads for each call, bypassing the thread pool entirely. This has important implications:

Use separate threads when:

  • Reading from blocking pipes or streams that may hang
  • Performing operations that might block indefinitely
  • Working with operations that could exhaust the thread pool

Resource Control Risks:

  • No automatic limits: Unlike thread pools, there's no built-in limit on concurrent separate threads
  • Memory overhead: Each thread consumes ~8MB of stack space by default
  • OS limits: You can hit system thread limits (typically 1000-4000 per process)
  • CPU context switching: Too many threads can degrade performance

Best Practices:

  • Use regular @threaded for most use cases (leverages controlled thread pool)
  • Reserve separate variants for genuinely long-running or problematic operations

Class Method Support

The decorators work seamlessly with class methods and preserve typing:

import asyncio
from typing import ClassVar
from aiothreads import threaded


class DataProcessor:
    default_timeout: ClassVar[int] = 30

    def __init__(self, **kwargs):
        for key, value in kwargs.items():
            setattr(self, key, value)

    @threaded
    def process(self, data: str) -> dict:
        return {"result": data.upper()}

    @threaded
    @staticmethod
    def utility_function(value: int) -> int:
        return value * 2

    @threaded
    @classmethod
    def from_config(cls, config: dict) -> 'DataProcessor':
        return cls(**config)


async def main():
    processor = DataProcessor()

    result: dict = await processor.process("data")
    assert result == {"result": "DATA"}

    utility_result: int = await processor.utility_function(10)
    assert utility_result == 20

    new_processor: DataProcessor = await DataProcessor.from_config({"default_timeout": 60})
    assert isinstance(new_processor, DataProcessor)


asyncio.run(main())

The descriptor protocol handles method binding automatically โ€” decorators work the same way on module-level functions, instance methods, class methods, and static methods.

Working With Synchronous Generators

The @threaded_iterable Decorator

@threaded_iterable converts a sync generator into an async iterator. The generator runs in a thread, yielding items through a thread-safe channel. Breaking from the async for loop automatically stops the generator thread:

import asyncio
import time
from aiothreads import threaded_iterable

# Simulation of paginated API data
PAGES = {
    1: {'items': [{'id': 1}, {'id': 2}], 'total_pages': 3},
    2: {'items': [{'id': 3}, {'id': 4, 'type': 'target_item'}], 'total_pages': 3},
    3: {'items': [{'id': 5}, {'id': 6}], 'total_pages': 3},
}


def fetch_page(url):
    """Simulation of blocking HTTP request"""
    time.sleep(0.01)  # Simulation of blocking call
    page = int(url.split("page=")[1])
    return PAGES.get(page, {})


# If you want to prefetch data from an API with pagination more than 1 next page at a time, you can set
# `max_size` parameter to 10 for example.
@threaded_iterable(max_size=1)
def crawl_api_pages(base_url, start_page=1):
    """Recursively fetch API pages until no more data"""
    page = start_page
    while True:
        url = f"{base_url}?page={page}"
        data = fetch_page(url)

        if not data.get('items'):  # No more data
            break

        # Yield each item from this page
        for item in data['items']:
            yield item

        page += 1
        if page > data.get('total_pages', page):
            break


async def main():
    """Process API data with the ability to stop early"""
    processed_count = 0

    async for item in crawl_api_pages('https://api.example.com/data'):
        processed_count += 1

        # Break early if we find what we need
        if item.get('type') == 'target_item':
            break  # This automatically stops the sync generator!

    assert processed_count == 4


asyncio.run(main())

Key Benefit: When you break from the async loop, the sync generator automatically stops execution. No more HTTP requests will be made, and resources are properly cleaned up.

Backpressure Control

max_size controls the buffer between the sync generator (producer) and the async consumer. When the buffer is full, the sync generator blocks until the async side consumes an item. This prevents a fast producer from filling up memory.

Default Queue Size Configuration

When using @threaded decorator with generator functions (auto-converted to @threaded_iterable), the default max_size can be controlled via environment variable:

# Set before running your application
export THREADED_ITERABLE_DEFAULT_MAX_SIZE=512
python your_app.py
  • Default value: 1024 if not set
  • Explicit max_size parameter always overrides the default
import asyncio
import time
from aiothreads import threaded_iterable


def fetch_search_page(query, page):
    """Simulation of blocking search API call"""
    time.sleep(0.01)  # Simulation of blocking call
    if page > 3:
        return {'items': []}
    return {
        'items': [
            {'title': f'Result {i}', 'url': f'http://example.com/{i}', 'snippet': f'{query} snippet'}
            for i in range((page - 1) * 2, page * 2)
        ],
    }


@threaded_iterable(max_size=50)
def scrape_search_results(query, max_pages=None):
    """Scrape search results with bounded memory usage"""
    page = 1

    while max_pages is None or page <= max_pages:
        results = fetch_search_page(query, page)

        if not results.get('items'):
            break  # No more results

        for item in results['items']:
            yield {
                'title': item['title'],
                'url': item['url'],
                'snippet': item['snippet'],
                'page': page
            }

        page += 1


# Queue won't grow beyond 50 items, even with slow consumer
async def main():
    query = "python asyncio"
    processed = 0

    async for result in scrape_search_results(query, max_pages=100):
        processed += 1

        # Can break early and stop all requests
        if processed >= 4:
            break  # No more scraping will happen

    assert processed == 4


asyncio.run(main())

With max_size=50, the producer thread blocks after buffering 50 items until the async consumer catches up. Combined with early break, this gives full control over both memory usage and network activity.

Context Manager Support

Threaded iterators can be used as async context managers to guarantee cleanup. The generator's finally block always runs, even when you break early:

import asyncio
import time
from aiothreads import threaded_iterable

# Simulation of paginated API data
USERS_DB = [
    [{'name': 'Alice', 'role': 'user'}, {'name': 'Bob', 'role': 'user'}],
    [{'name': 'Charlie', 'role': 'admin'}],
]
cleanup_called = False


@threaded_iterable
def fetch_paginated_data(api_endpoint):
    """Fetch data with automatic session management"""
    global cleanup_called
    try:
        for page_data in USERS_DB:
            time.sleep(0.01)  # Simulation of blocking call
            for record in page_data:
                yield record
    finally:
        cleanup_called = True  # Always cleanup session


async def main():
    global cleanup_called

    # Process data with guaranteed cleanup
    count = 0
    async with fetch_paginated_data('/api/users') as user_stream:
        async for user in user_stream:
            count += 1

            # Early termination still triggers cleanup
            if user.get('role') == 'admin':
                break  # Session will be properly closed

    assert count == 3
    assert cleanup_called


asyncio.run(main())

Separate Thread Generators

Use @threaded_iterable_separate to run each generator in its own dedicated thread, completely independent of the thread pool. This is useful for generators that may block indefinitely or need complete isolation:

import asyncio
import time
from aiothreads import threaded_iterable_separate


def heavy_computation(i):
    """Simulation of CPU-intensive computation"""
    time.sleep(0.001)  # Simulation of blocking call
    return i * i


@threaded_iterable_separate(max_size=50)
def cpu_intensive_generator():
    """Runs in dedicated thread, not thread pool"""
    for i in range(10):
        yield heavy_computation(i)


async def main():
    results = []
    async for item in cpu_intensive_generator():
        results.append(item)
    assert results == [i * i for i in range(10)]


asyncio.run(main())

Resource Control Warning

@threaded_iterable_separate creates a new dedicated thread for each iterator instance. Multiple concurrent iterations can quickly exhaust system resources:

import asyncio
import time
from aiothreads import threaded_iterable_separate, threaded_iterable


def expensive_operation(i):
    """Simulation of expensive operation"""
    time.sleep(0.001)  # Simulation of blocking call
    return i


@threaded_iterable_separate
def data_stream():
    # Each async iteration creates a new thread
    for i in range(5):
        yield expensive_operation(i)


# SAFER: Use regular threaded_iterable with controlled concurrency
@threaded_iterable(max_size=100)
def safer_data_stream():
    for i in range(5):
        yield expensive_operation(i)


async def main():
    results = []
    async for item in safer_data_stream():
        results.append(item)
    assert results == [0, 1, 2, 3, 4]


asyncio.run(main())

Reserve threaded_iterable_separate for cases where you specifically need each generator to run in complete isolation from the thread pool.

Calling Async Code from Threads

When working in threaded functions, you can call back into async code:

Function Input When to use
sync_await(func, *args) Async callable + args Inside @threaded โ€” simplest API
wait_coroutine(coro) Coroutine object Inside @threaded when you already have a coroutine
sync_wait_coroutine(loop, func, *args) Explicit loop + async callable With asyncio.to_thread (no auto loop)

Basic Async Calls

Use sync_await to call an async function from within a @threaded function. The event loop is automatically available via context variables โ€” no manual setup needed:

import asyncio
import time
from aiothreads import sync_await, threaded


def blocking_operation():
    time.sleep(0.01)  # Simulation of blocking call
    return {"key": "value"}


async def async_api_call(data):
    """Simulation of async API call"""
    await asyncio.sleep(0.01)  # Simulation of async I/O
    return {**data, "processed": True}


def process_result(data):
    return data


@threaded
def mixed_sync_async_work():
    # Do some sync work
    sync_result = blocking_operation()

    # Call async function from thread
    # Event loop is automatically available from context
    async_result = sync_await(async_api_call, sync_result)

    # Continue with sync work
    return process_result(async_result)


async def main():
    result = await mixed_sync_async_work()
    assert result == {"key": "value", "processed": True}


asyncio.run(main())

sync_await blocks the current thread until the coroutine completes on the event loop, then returns the result. This lets you freely interleave sync and async work within the same function.

Event Loop Context

@threaded decorated functions automatically store the current event loop in context variables, making it available for async calls within the thread:

import asyncio
from aiothreads import threaded, sync_await, wait_coroutine


async def some_async_function(arg):
    return f"async_{arg}"


async def another_async_function(arg):
    return f"awaited_{arg}"


@threaded
def thread_with_async_calls():
    # Event loop is automatically available
    result1 = sync_await(some_async_function, "arg1")
    result2 = wait_coroutine(another_async_function("arg2"))
    return result1 + result2


async def main():
    result = await thread_with_async_calls()
    assert result == "async_arg1awaited_arg2"


asyncio.run(main())

Both sync_await and wait_coroutine read the loop from the EVENT_LOOP context variable, which @threaded sets automatically before entering the thread.

Coroutine Waiting

Use wait_coroutine when you already have a coroutine object. When using asyncio.to_thread instead of @threaded, pass the event loop explicitly with sync_wait_coroutine:

import asyncio
from aiothreads import wait_coroutine, threaded


@threaded
def thread_worker():
    # Create and wait for a coroutine
    # Event loop is automatically available from context
    async def fetch_data():
        await asyncio.sleep(0.01)
        return "data"

    result = wait_coroutine(fetch_data())
    return result


# When using with asyncio.to_thread, manual loop passing required
def manual_thread_worker(loop):
    async def fetch_data():
        return "data from specific loop"

    from aiothreads import sync_wait_coroutine
    result = sync_wait_coroutine(loop, fetch_data)
    return result


async def main():
    # Automatic loop handling with @threaded
    result1 = await thread_worker()
    assert result1 == "data"

    # Manual loop handling with asyncio.to_thread
    loop = asyncio.get_running_loop()
    result2 = await asyncio.to_thread(manual_thread_worker, loop)
    assert result2 == "data from specific loop"


asyncio.run(main())

Context Variables

@threaded copies the current context before entering the thread, so contextvars.ContextVar values set by the caller are available inside the thread:

import asyncio
import contextvars
from aiothreads import threaded

user_context = contextvars.ContextVar('user')


@threaded
def process_user_data(data):
    # Context variable is available in thread
    current_user = user_context.get()
    return f"Processing {data} for {current_user}"


async def main():
    user_context.set("alice")

    # Context propagates to thread
    result = await process_user_data("report")
    assert result == "Processing report for alice"


asyncio.run(main())

This works because @threaded uses contextvars.copy_context() before dispatching to the thread, so the thread inherits a snapshot of the caller's context.

Advanced Usage

FromThreadChannel with Timeout

The FromThreadChannel class provides efficient event-based communication from threads to async code. It uses asyncio.Event instead of polling for immediate wake-up when data is available.

import asyncio
from aiothreads import FromThreadChannel, ChannelClosed, ChannelTimeout, threaded


async def main():
    # Basic usage with timeout
    channel = FromThreadChannel(maxsize=10, timeout=5.0)

    @threaded
    def producer():
        with channel:
            for i in range(10):
                channel.put(i)

    producer()

    results = []
    try:
        while True:
            item = await channel.get()
            results.append(item)
    except ChannelClosed:
        pass

    assert results == list(range(10))

    # Override timeout per-call
    channel2 = FromThreadChannel(timeout=10.0)

    try:
        await channel2.get(timeout=0.01)
        assert False, "Should have raised ChannelTimeout"
    except ChannelTimeout:
        pass  # Expected: timed out after 0.01 seconds


asyncio.run(main())

Timeout Behavior:

  • timeout=0 (default): No timeout, wait indefinitely
  • timeout=N: Wait up to N seconds, then raise ChannelTimeout
  • Per-call timeout overrides the default set in constructor

Error Handling

Exceptions raised inside threaded functions propagate normally to the caller. Use standard try/except to handle them:

import asyncio
from aiothreads import threaded


@threaded
def risky_operation(should_fail: bool = False):
    if should_fail:
        raise ValueError("Something went wrong")
    return "success"


async def main():
    result = await risky_operation(should_fail=False)
    assert result == "success"

    try:
        await risky_operation(should_fail=True)
        assert False, "Should have raised ValueError"
    except ValueError as e:
        assert str(e) == "Something went wrong"


asyncio.run(main())

The original exception type and traceback are preserved, so standard error handling patterns work without changes.

Performance Considerations

Control the thread pool size with set_default_executor. Use @threaded_separate for CPU-bound work that shouldn't compete with pool workers handling I/O:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
from aiothreads import threaded, threaded_separate


# For CPU-bound tasks, consider using threaded_separate
@threaded_separate
def cpu_bound_task(data):
    time.sleep(0.01)  # Simulation of CPU-intensive work
    return data * 2


# For I/O bound tasks, regular threaded is usually sufficient
@threaded
def io_bound_task(url):
    time.sleep(0.01)  # Simulation of blocking I/O
    return f"response from {url}"


async def main():
    # Control thread pool size at the event loop level
    loop = asyncio.get_running_loop()
    loop.set_default_executor(ThreadPoolExecutor(max_workers=10))

    cpu_result = await cpu_bound_task(21)
    assert cpu_result == 42

    io_result = await io_bound_task("http://example.com")
    assert io_result == "response from http://example.com"


asyncio.run(main())

Wheel compatibility matrix

Platform Python 3
any

Files in release

No dependencies