aiokafka 0.12.0


pip install aiokafka

  Latest version

Released: Oct 26, 2024


Meta
Author: Andrew Svetlov
Requires Python: >=3.9

Classifiers

License
  • OSI Approved :: Apache Software License

Intended Audience
  • Developers

Programming Language
  • Python :: 3
  • Python :: 3.9
  • Python :: 3.10
  • Python :: 3.11
  • Python :: 3.12
  • Python :: 3.13

Operating System
  • OS Independent

Topic
  • System :: Networking
  • System :: Distributed Computing

Framework
  • AsyncIO

Development Status
  • 4 - Beta
|Build status| |Coverage| |Chat on Gitter|

asyncio client for Kafka

AIOKafkaProducer

AIOKafkaProducer is a high-level, asynchronous message producer.

Example of AIOKafkaProducer usage:

from aiokafka import AIOKafkaProducer
import asyncio

async def send_one():
    producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
    # Get cluster layout and initial topic/partition leadership information
    await producer.start()
    try:
        # Produce message
        await producer.send_and_wait("my_topic", b"Super message")
    finally:
        # Wait for all pending messages to be delivered or expire.
        await producer.stop()

asyncio.run(send_one())

AIOKafkaConsumer

AIOKafkaConsumer is a high-level, asynchronous message consumer. It interacts with the assigned Kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).

Example of AIOKafkaConsumer usage:

from aiokafka import AIOKafkaConsumer
import asyncio

async def consume():
    consumer = AIOKafkaConsumer(
        'my_topic', 'my_other_topic',
        bootstrap_servers='localhost:9092',
        group_id="my-group")
    # Get cluster layout and join group `my-group`
    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print("consumed: ", msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()

asyncio.run(consume())

Documentation

https://aiokafka.readthedocs.io/

Running tests

Docker is required to run tests. See https://docs.docker.com/engine/installation for installation notes. Also note, that lz4 compression libraries for python will require python-dev package, or python source header files for compilation on Linux. NOTE: You will also need a valid java installation. It’s required for the keytool utility, used to generate ssh keys for some tests.

Setting up tests requirements (assuming you’re within virtualenv on ubuntu 14.04+):

sudo apt-get install -y libkrb5-dev krb5-user
make setup

Running tests with coverage:

make cov

To run tests with a specific version of Kafka (default one is 2.8.1) use KAFKA_VERSION variable:

make cov SCALA_VERSION=2.11 KAFKA_VERSION=0.10.2.1

Test running cheatsheat:

  • make test FLAGS="-l -x --ff" - run until 1 failure, rerun failed tests first. Great for cleaning up a lot of errors, say after a big refactor.

  • make test FLAGS="-k consumer" - run only the consumer tests.

  • make test FLAGS="-m 'not ssl'" - run tests excluding ssl.

  • make test FLAGS="--no-pull" - do not try to pull new docker image before test run.

Wheel compatibility matrix

Platform CPython 3.9 CPython 3.10 CPython 3.11 CPython 3.12 CPython 3.13
macosx_10_13_x86_64
macosx_10_9_x86_64
macosx_11_0_arm64
manylinux2014_aarch64
manylinux2014_x86_64
manylinux_2_17_aarch64
manylinux_2_17_x86_64
win32
win_amd64

Files in release

aiokafka-0.12.0-cp310-cp310-macosx_10_9_x86_64.whl (366.6KiB)
aiokafka-0.12.0-cp310-cp310-macosx_11_0_arm64.whl (363.8KiB)
aiokafka-0.12.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (1.0MiB)
aiokafka-0.12.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.0MiB)
aiokafka-0.12.0-cp310-cp310-win32.whl (341.5KiB)
aiokafka-0.12.0-cp310-cp310-win_amd64.whl (360.0KiB)
aiokafka-0.12.0-cp311-cp311-macosx_10_9_x86_64.whl (366.9KiB)
aiokafka-0.12.0-cp311-cp311-macosx_11_0_arm64.whl (363.6KiB)
aiokafka-0.12.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (1.1MiB)
aiokafka-0.12.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1MiB)
aiokafka-0.12.0-cp311-cp311-win32.whl (340.4KiB)
aiokafka-0.12.0-cp311-cp311-win_amd64.whl (359.7KiB)
aiokafka-0.12.0-cp312-cp312-macosx_10_13_x86_64.whl (366.2KiB)
aiokafka-0.12.0-cp312-cp312-macosx_11_0_arm64.whl (361.9KiB)
aiokafka-0.12.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (1.1MiB)
aiokafka-0.12.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2MiB)
aiokafka-0.12.0-cp312-cp312-win32.whl (339.6KiB)
aiokafka-0.12.0-cp312-cp312-win_amd64.whl (357.5KiB)
aiokafka-0.12.0-cp313-cp313-macosx_10_13_x86_64.whl (362.3KiB)
aiokafka-0.12.0-cp313-cp313-macosx_11_0_arm64.whl (358.0KiB)
aiokafka-0.12.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (1.1MiB)
aiokafka-0.12.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1MiB)
aiokafka-0.12.0-cp313-cp313-win32.whl (337.8KiB)
aiokafka-0.12.0-cp313-cp313-win_amd64.whl (354.7KiB)
aiokafka-0.12.0-cp39-cp39-macosx_10_9_x86_64.whl (368.0KiB)
aiokafka-0.12.0-cp39-cp39-macosx_11_0_arm64.whl (365.1KiB)
aiokafka-0.12.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (1.0MiB)
aiokafka-0.12.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1MiB)
aiokafka-0.12.0-cp39-cp39-win32.whl (342.5KiB)
aiokafka-0.12.0-cp39-cp39-win_amd64.whl (361.3KiB)
aiokafka-0.12.0.tar.gz (551.7KiB)
Extras:
Dependencies:
async-timeout
packaging
typing-extensions (>=4.10.0)