How to set up your Kafka and Zookeeper pytest fixtures

How to set up your Kafka and Zookeeper pytest fixtures
Delving deep into the weird world of Kafka

Good morning / good afternoon / good evening fellow hackers. Today, we'll learn about how to use a "cascading fixture" pattern in order to enable you to test your kafka consumer / producer code.

You will programmatically create a docker network, a zookeeper container and a kafka container that will be automatically torn down at the end of your tests leaving your environment nice and smooth like a baby's bottom.

You will need to have: python, pytest, docker and kafka-python.

First let's put our client in a session scoped fixture since we'll be using it multiple times.

@pytest.fixture(scope="session")
def docker_client() -> DockerClient:
    return docker.from_env()

Then let's create a network that our two containers can use to talk to each other.

@pytest.fixture(scope="session")
def network(docker_client: DockerClient, resource_postfix: str) -> Network:
    _network = docker_client.networks.create(name=f"network-{resource_postfix}")
    yield _network
    _network.remove()

Next let's kick it with launching our zookeeper container. Note that if you haven't already got confluentinc/cp-zookeeper:6.2.0, it will be downloaded first as part of the fixture, incurring a minor delay to your first test. If you don't want that, pull the image first with docker pull confluentinc/cp-zookeeper:6.2.0

@pytest.fixture(scope="session")
def zookeeper(docker_client: DockerClient, network: Network) -> Container:
    zookeeper_container = docker_client.containers.run(
        image="confluentinc/cp-zookeeper:6.2.0",
        ports={"2181/tcp": "2181/tcp"},
        network=network.name,
        name="zookeeper",
        hostname="zookeeper",
        environment={"ZOOKEEPER_CLIENT_PORT": 2181},
        detach=True,
    )
    yield zookeeper_container
    zookeeper_container.remove(force=True)

Kafka is coming next. Same thing again, if you don't want to wait: docker pull confluentinc/cp-server:6.2.0

@pytest.fixture(scope="session")
def broker(docker_client: DockerClient, network: Network, zookeeper: Container) -> Container:
    # TODO: find and remove unnecessary environment variables
    broker_container = docker_client.containers.run(
        image="confluentinc/cp-server:6.2.0",
        ports={
            "9092/tcp": "9092/tcp",
            "29092/tcp": "29092/tcp",
        },
        network=network.name,
        name="broker",
        hostname="broker",
        environment={
            "KAFKA_BROKER_ID": 1,
            "KAFKA_ZOOKEEPER_CONNECT": f"{zookeeper.name}:2181",
            "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT",
            "KAFKA_ADVERTISED_LISTENERS": "PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092",
            "KAFKA_METRIC_REPORTERS": "io.confluent.metrics.reporter.ConfluentMetricsReporter",
            "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": 1,
            "KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS": 0,
            "KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR": 1,
            "KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR": 1,
            "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR": 1,
            "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR": 1,
            "KAFKA_JMX_PORT": 9101,
            "KAFKA_JMX_HOSTNAME": "localhost",
            "CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS": "broker:29092",
            "CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS": 1,
            "CONFLUENT_METRICS_ENABLE": 'true',
        },
        detach=True,
    )
    yield broker_container
    broker_container.remove(force=True)

A few common things to note about our 3 fixtures so far:

  1. They are both session scoped. The reason for that is that you only want one instance per session to avoid having to wait for both containers to launch with e.g every message you send to the kafka broker.
  2. They both receive docker_client: DockerClient and network: Network, hence, the adoption of the name "cascading fixture" pattern.
  3. The yield keyword will return the network or container object and tear it down at the end of the session.

Now's the time for the Big Lebowski of today's blog. The mighty kafka client itself.

@pytest.fixture(scope="session")
def kafka_admin_client(broker: Container) -> AdminClient:
    has_started = False
    while not has_started:
        log_line = str(broker.logs(tail=1))
        if "INFO Kafka startTimeMs" in log_line:
            has_started = True

    admin_client = AdminClient(conf={"bootstrap.servers": "0.0.0.0:9092"})

    return admin_client

There is a cheeky while not has_started: loopy thingy there, which is necessary as the fixture broker: Container is passed, when the docker container is alive, but that doesn't mean that kafka has actually started. So we resort to tailing the broker's logs until we match the start time keyword startTimeMs and then we can safely return the client.

Assuming that you need a test topic to publish to, the below will create that and return it as soon as its creation is confirmed. It therefore helps you avoid pushing a message to a non-existing topic (it's normally a very fast operation but it's better to be safe than be sorry).

@pytest.fixture(scope="session")
def new_topic(kafka_admin_client: AdminClient, topic_name: str) -> NewTopic:
    new_topic = NewTopic(
        topic=topic_name,
        num_partitions=1,
        replication_factor=1,
    )
    kafka_admin_client.create_topics(new_topics=[new_topic])
    topic_exists = False
    while not topic_exists:
        cluster_metadata = kafka_admin_client.list_topics()
        topics = cluster_metadata.topics
        topic_exists = (new_topic.topic in topics.keys())
    yield new_topic
    kafka_admin_client.delete_topics(topics=[new_topic.topic])

If your eagle eyes haven't failed you, there is weird topic_name: str in the function's signature. What is it? What is it my precious?

Task: Create a fixture called topic_name with a session scope that outputs a string.

Now you (almost) have a full set up and you can test publishing and consuming!

Visit my github repo containing a component and integration test example (ok ok ok, it also has the topic_name fixture if you're that lazy).

See you soon!

七転び八起き