How to Produce and Consume Messages with Kafka and Python

This blog teaches you how to produce and consume messages with Kafka and Python. You will learn how to use Kafka’s producer and consumer APIs, as well as different configuration options and serialization formats.

1. Introduction

In this blog, you will learn how to produce and consume messages with Kafka and Python. Kafka is a distributed streaming platform that allows you to publish and subscribe to streams of messages, process them in real time, and store them in a scalable and fault-tolerant way. Python is a popular and versatile programming language that has a rich set of libraries and tools for working with Kafka.

By the end of this blog, you will be able to:

  • Create a Kafka producer and consumer using Python
  • Send and receive messages to and from a Kafka topic
  • Configure the producer and consumer with various options
  • Serialize and deserialize messages with JSON and Avro formats

To follow along with this blog, you will need:

  • A basic understanding of Kafka and Python
  • A Kafka cluster running on your local machine or in the cloud
  • Python 3.6 or higher installed on your machine
  • The kafka-python library installed on your machine

Ready to get started? Let’s dive into Kafka and Python!

2. What is Kafka and Why Use It?

Kafka is a distributed streaming platform that enables you to publish and subscribe to streams of messages, process them in real time, and store them in a scalable and fault-tolerant way. Kafka is designed to handle high-throughput, low-latency, and reliable data delivery for applications that need to process large amounts of data from various sources.

But what is a stream of messages, and why do you need it? A stream of messages is a continuous flow of data records that are produced by one or more sources and consumed by one or more destinations. A message is a unit of data that contains some information, such as a sensor reading, a log entry, a user action, or a transaction. A stream of messages can be used to represent events that happen in the real world, such as user clicks, orders, payments, or sensor readings.

Streams of messages are useful for building applications that need to react to events in real time, such as analytics, monitoring, alerting, or recommendation systems. For example, you can use streams of messages to:

  • Analyze user behavior and preferences on a website or a mobile app
  • Monitor the performance and health of a system or a service
  • Alert users or operators when something goes wrong or needs attention
  • Recommend products or services based on user interests or actions

However, working with streams of messages can be challenging, especially when you have to deal with issues such as scalability, reliability, availability, and consistency. How do you ensure that your application can handle a large volume of messages without losing or duplicating any of them? How do you ensure that your application can continue to operate even when some of the sources or destinations fail or become unavailable? How do you ensure that your application can process the messages in the same order as they were produced, and that the results are consistent across different consumers?

This is where Kafka comes in. Kafka provides a platform that simplifies the management of streams of messages, and offers the following features:

  • High-throughput: Kafka can handle millions of messages per second, with minimal overhead and resource consumption.
  • Low-latency: Kafka can deliver messages to the consumers within milliseconds, enabling near-real-time processing and response.
  • Reliable: Kafka guarantees that the messages are delivered exactly once, without any loss or duplication, even in the case of failures or network partitions.
  • Scalable: Kafka can scale horizontally, by adding more brokers (servers) to the cluster, to increase the capacity and performance of the system.
  • Fault-tolerant: Kafka replicates the messages across multiple brokers, and automatically handles broker failures and recovery, ensuring high availability and durability of the data.
  • Consistent: Kafka preserves the order of the messages within a partition (a subset of the stream), and ensures that the consumers see the same order as the producers.

As you can see, Kafka is a powerful and versatile platform that can help you build applications that need to work with streams of messages. In the next section, you will learn how to set up Kafka and Python on your machine, and how to use the kafka-python library to interact with Kafka.

3. Setting Up Kafka and Python

Before you can start producing and consuming messages with Kafka and Python, you need to set up Kafka and Python on your machine. In this section, you will learn how to install and run Kafka locally, and how to install and use the kafka-python library to interact with Kafka.

Installing and Running Kafka Locally

Kafka requires Java to run, so you need to have Java installed on your machine. You can check if you have Java installed by running the following command in your terminal:

java -version

If you see the version of Java printed out, you are good to go. If not, you need to install Java from here.

Next, you need to download Kafka from here. Choose the latest stable version and download the binary file. For example, you can download Kafka 2.8.0 for Scala 2.13 from here.

After downloading Kafka, you need to extract the file to a folder of your choice. For example, you can extract it to ~/kafka. Then, you need to open two terminal windows and navigate to the Kafka folder in both of them. In one terminal, you need to start the ZooKeeper server, which is a service that Kafka uses to manage the cluster. You can start the ZooKeeper server by running the following command:

bin/zookeeper-server-start.sh config/zookeeper.properties

In the other terminal, you need to start the Kafka server, which is the main component of Kafka that handles the messages. You can start the Kafka server by running the following command:

bin/kafka-server-start.sh config/server.properties

Now you have Kafka running on your machine. You can verify that by creating a topic and listing the topics. A topic is a category or a name that you assign to a stream of messages. You can create a topic by running the following command in a new terminal window:

bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092

This command creates a topic named test with the default settings. You can list the topics by running the following command:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

You should see the test topic in the output. You can also delete the topic by running the following command:

bin/kafka-topics.sh --delete --topic test --bootstrap-server localhost:9092

Congratulations, you have successfully installed and run Kafka locally. You can stop the Kafka and ZooKeeper servers by pressing Ctrl+C in the respective terminals.

Installing and Using the kafka-python Library

To interact with Kafka using Python, you need to install the kafka-python library, which is a Python client for Kafka. You can install the library using pip by running the following command in your terminal:

pip install kafka-python

Alternatively, you can install the library from the source code by cloning the GitHub repository and running the setup script. You can find the instructions here.

After installing the library, you can use it to create a Kafka producer and consumer in Python. A producer is an application that sends messages to a Kafka topic. A consumer is an application that receives messages from a Kafka topic. You can import the library and create a producer and consumer by running the following code in a Python shell or a script:

from kafka import KafkaProducer, KafkaConsumer

producer = KafkaProducer(bootstrap_servers='localhost:9092')
consumer = KafkaConsumer('test', bootstrap_servers='localhost:9092')

This code creates a producer and a consumer that connect to the Kafka server running on localhost:9092. The consumer subscribes to the test topic, which you created earlier. You can send and receive messages using the producer and consumer by running the following code:

# Send a message to the test topic
producer.send('test', b'Hello, Kafka!')

# Receive a message from the test topic
message = next(consumer)
print(message.value)

This code sends a message with the value b’Hello, Kafka!’ to the test topic using the producer. Then, it receives a message from the test topic using the consumer and prints its value. You should see b’Hello, Kafka!’ printed out.

Well done, you have successfully installed and used the kafka-python library to interact with Kafka. You can find more information and examples on how to use the library in the documentation.

In the next section, you will learn how to produce messages with Kafka and Python, and how to use different configuration options and serialization formats.

4. Producing Messages with Kafka and Python

In this section, you will learn how to produce messages with Kafka and Python, and how to use different configuration options and serialization formats. Producing messages with Kafka and Python involves creating a Kafka producer, sending messages to a Kafka topic, configuring the producer with various options, and serializing messages with JSON and Avro formats.

4.1. Creating a Kafka Producer

A Kafka producer is an application that sends messages to a Kafka topic. To create a Kafka producer in Python, you need to import the KafkaProducer class from the kafka-python library, and instantiate it with the bootstrap_servers parameter, which specifies the address of the Kafka server. For example, you can create a producer that connects to the Kafka server running on localhost:9092 by running the following code:

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')

This code creates a producer object that you can use to send messages to any topic. You can also pass other parameters to the KafkaProducer class, such as client_id, acks, retries, compression_type, and value_serializer. You can find the full list of parameters and their descriptions in the documentation.

In the next subsection, you will learn how to send messages to a Kafka topic using the producer object.

4.2. Sending Messages to a Kafka Topic

Now that you have created a Kafka producer, you can use it to send messages to a Kafka topic. A topic is a logical name that identifies a stream of messages. You can create topics in advance, or let Kafka create them automatically when you send messages to them. Each topic can have multiple partitions, which are the units of parallelism and replication in Kafka.

To send a message to a Kafka topic, you need to specify three parameters:

  • topic: The name of the topic that you want to send the message to.
  • value: The content of the message that you want to send. This can be any type of data, such as a string, a byte array, or an object.
  • key: An optional parameter that can be used to assign the message to a specific partition within the topic. This can be useful for ensuring that messages with the same key are always sent to the same partition, and therefore preserve the order of the messages. If you don’t specify a key, Kafka will use a round-robin algorithm to distribute the messages across the partitions.

To send a message to a Kafka topic, you can use the send() method of the producer object. This method returns a Future object, which represents the result of the asynchronous operation. You can use the get() method of the Future object to wait for the operation to complete, and get the RecordMetadata object, which contains information about the message, such as the topic, partition, and offset.

For example, the following code snippet shows how to send a message with the value “Hello, Kafka!” and the key “greeting” to the topic “test”:

from kafka import KafkaProducer
from kafka.errors import KafkaError

# Create a Kafka producer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

# Send a message to the topic "test"
future = producer.send(topic='test', value=b'Hello, Kafka!', key=b'greeting')

# Wait for the operation to complete and get the metadata
try:
    record_metadata = future.get(timeout=10)
    print(f"Message sent to topic {record_metadata.topic}, partition {record_metadata.partition}, offset {record_metadata.offset}")
except KafkaError as e:
    print(f"Failed to send message: {e}")

You can also send multiple messages to the same or different topics in a batch, by using the send() method multiple times, and then calling the flush() method to ensure that all the messages are delivered. For example, the following code snippet shows how to send three messages to the topic “test”:

from kafka import KafkaProducer
from kafka.errors import KafkaError

# Create a Kafka producer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

# Send three messages to the topic "test"
producer.send(topic='test', value=b'Message 1')
producer.send(topic='test', value=b'Message 2')
producer.send(topic='test', value=b'Message 3')

# Flush the producer to ensure that all the messages are delivered
producer.flush()

As you can see, sending messages to a Kafka topic with Python is quite simple and straightforward. However, there are many configuration options that you can use to customize the behavior of the producer, such as the compression, batching, retry, and acknowledgment policies. In the next section, you will learn how to configure the Kafka producer with various options.

4.3. Configuring the Kafka Producer

The Kafka producer has many configuration options that you can use to customize its behavior and performance. You can pass these options as keyword arguments when you create the producer object, or you can use a configuration file to store them. Some of the most important and commonly used options are:

  • compression_type: This option specifies the compression algorithm that the producer will use to compress the messages before sending them to the broker. This can reduce the network bandwidth and improve the throughput, but it also adds some CPU overhead. The possible values are None (no compression), ‘gzip’, ‘snappy’, ‘lz4’, or ‘zstd’. The default value is None.
  • batch_size: This option specifies the maximum size of a batch of messages that the producer will send in a single request. A larger batch size can improve the efficiency and throughput, but it also increases the latency and memory usage. The value is in bytes, and the default value is 16384 (16 KB).
  • linger_ms: This option specifies the maximum time that the producer will wait before sending a batch of messages. This can help to increase the batch size and reduce the number of requests, but it also increases the latency. The value is in milliseconds, and the default value is 0 (no delay).
  • acks: This option specifies the level of acknowledgment that the producer will expect from the broker after sending a batch of messages. The possible values are 0 (no acknowledgment), 1 (acknowledgment from the leader broker only), or ‘all’ (acknowledgment from all the in-sync replicas). A higher level of acknowledgment can improve the reliability and durability, but it also increases the latency and network traffic. The default value is 1.
  • retries: This option specifies the number of times that the producer will retry to send a batch of messages if the broker returns an error or does not acknowledge the request. A higher number of retries can improve the reliability and fault-tolerance, but it also increases the latency and network traffic. The default value is 0 (no retries).
  • max_in_flight_requests_per_connection: This option specifies the maximum number of unacknowledged requests that the producer will send on a single connection to the broker. A higher number of requests can improve the throughput and utilization, but it also increases the risk of reordering and duplication of messages in case of failures or retries. The default value is 5.

For example, the following code snippet shows how to create a Kafka producer with some custom configuration options:

from kafka import KafkaProducer

# Create a Kafka producer with custom configuration
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    compression_type='gzip',
    batch_size=32768,
    linger_ms=100,
    acks='all',
    retries=3,
    max_in_flight_requests_per_connection=1
)

You can find the full list of configuration options and their descriptions in the kafka-python documentation. You can also use the config attribute of the producer object to access the current configuration as a dictionary.

As you can see, configuring the Kafka producer with various options can help you to optimize its performance and reliability for your specific use case. However, you should also be aware of the trade-offs and implications of each option, and test them in your environment before applying them. In the next section, you will learn how to serialize messages with JSON and Avro formats.

4.4. Serializing Messages with JSON and Avro

When you send messages to a Kafka topic, you need to serialize them, which means converting them from their original format (such as a string, a byte array, or an object) to a binary format that can be transferred over the network. Similarly, when you receive messages from a Kafka topic, you need to deserialize them, which means converting them back to their original format.

There are many serialization formats that you can use, such as JSON, Avro, Protobuf, or Thrift. Each format has its own advantages and disadvantages, such as the size, speed, schema support, and compatibility. In this section, you will learn how to serialize messages with JSON and Avro formats, which are two of the most popular and widely used formats in the Kafka ecosystem.

JSON (JavaScript Object Notation) is a human-readable and lightweight format that represents data as a collection of name-value pairs. JSON is easy to use and understand, and it supports basic data types, such as strings, numbers, booleans, arrays, and objects. However, JSON also has some drawbacks, such as the lack of schema enforcement, the verbosity, and the overhead of parsing and encoding.

To serialize messages with JSON format, you can use the json module from the Python standard library, or any other third-party library that supports JSON, such as simplejson or ujson. You can use the dumps() function to convert a Python object to a JSON string, and then encode it to a byte array. You can also use the loads() function to decode a byte array to a JSON string, and then convert it to a Python object.

For example, the following code snippet shows how to serialize and deserialize a message with JSON format:

import json

# Create a Python object
message = {
    'name': 'Alice',
    'age': 25,
    'gender': 'female'
}

# Serialize the object to a JSON string
json_string = json.dumps(message)

# Encode the string to a byte array
json_bytes = json_string.encode('utf-8')

# Decode the byte array to a JSON string
json_string = json_bytes.decode('utf-8')

# Deserialize the string to a Python object
message = json.loads(json_string)

Avro (Apache Avro) is a binary and compact format that represents data as a schema and a payload. Avro supports complex data types, such as records, enums, unions, arrays, and maps, and it also supports schema evolution and compatibility. Avro has some advantages over JSON, such as the smaller size, the faster speed, and the schema validation. However, Avro also has some drawbacks, such as the difficulty of reading and debugging, and the dependency on the schema.

To serialize messages with Avro format, you can use the avro module from the apache-avro library, which is the official Python implementation of Avro. You can use the Schema class to define the schema of your data, and the DatumWriter and DatumReader classes to write and read data according to the schema. You can also use the BinaryEncoder and BinaryDecoder classes to encode and decode data to and from a binary format.

For example, the following code snippet shows how to serialize and deserialize a message with Avro format:

from avro import schema, io, datafile

# Define the schema of the data
schema = schema.Parse("""
{
    "type": "record",
    "name": "Message",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "int"},
        {"name": "gender", "type": "string"}
    ]
}
""")

# Create a Python object
message = {
    'name': 'Alice',
    'age': 25,
    'gender': 'female'
}

# Serialize the object to a binary format
writer = io.DatumWriter(schema)
bytes_writer = io.BytesIO()
encoder = io.BinaryEncoder(bytes_writer)
writer.write(message, encoder)
avro_bytes = bytes_writer.getvalue()

# Deserialize the binary format to a Python object
reader = io.DatumReader(schema)
bytes_reader = io.BytesIO(avro_bytes)
decoder = io.BinaryDecoder(bytes_reader)
message = reader.read(decoder)

As you can see, serializing messages with JSON and Avro formats can help you to transfer data efficiently and reliably with Kafka and Python. However, you should also consider the trade-offs and requirements of each format, and choose the one that suits your use case best. In the next section, you will learn how to consume messages with Kafka and Python.

5. Consuming Messages with Kafka and Python

In the previous section, you learned how to produce messages with Kafka and Python. In this section, you will learn how to consume messages with Kafka and Python. Consuming messages means receiving and processing the messages that are published to a Kafka topic by one or more producers. You will use the kafka-python library to create a Kafka consumer and subscribe to a topic. You will also learn how to configure the consumer with various options, and how to deserialize the messages with JSON and Avro formats.

The steps to consume messages with Kafka and Python are:

  1. Create a Kafka consumer object
  2. Subscribe to a Kafka topic
  3. Loop over the messages in the topic
  4. Process each message according to your application logic
  5. Commit the offsets of the messages
  6. Close the consumer object

Let’s go through each step in detail.

5.1. Creating a Kafka Consumer

The first step to consume messages with Kafka and Python is to create a Kafka consumer object. A Kafka consumer object is an instance of the KafkaConsumer class from the kafka-python library. It represents a client that connects to a Kafka cluster and subscribes to one or more topics.

To create a Kafka consumer object, you need to import the KafkaConsumer class and pass some parameters to its constructor. The most important parameter is bootstrap_servers, which specifies the list of brokers that the consumer will connect to. You can also pass other parameters to customize the behavior of the consumer, such as group_id, auto_offset_reset, enable_auto_commit, and value_deserializer. We will explain these parameters in the next section.

Here is an example of how to create a Kafka consumer object in Python:

# Import the KafkaConsumer class
from kafka import KafkaConsumer

# Create a Kafka consumer object
consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'], # List of brokers
    group_id='my-group', # Consumer group id
    auto_offset_reset='earliest', # Start from the beginning of the topic
    enable_auto_commit=True, # Commit the offsets automatically
    value_deserializer=lambda x: x.decode('utf-8') # Decode the messages as utf-8 strings
)

As you can see, creating a Kafka consumer object is quite simple and straightforward. You just need to provide the necessary parameters and assign the object to a variable. In this case, we named the variable consumer, but you can use any name you like.

Now that you have created a Kafka consumer object, you can use it to subscribe to a Kafka topic and receive messages from it. We will show you how to do that in the next section.

5.2. Receiving Messages from a Kafka Topic

Once you have created a Kafka consumer object, you can use it to subscribe to a Kafka topic and receive messages from it. A Kafka topic is a logical name that identifies a stream of messages. You can create topics using the Kafka command-line tools or the kafka-python library. Each topic can have one or more partitions, which are physical units that store the messages. Each partition can have one or more replicas, which are copies of the partition that provide fault-tolerance and load-balancing.

To subscribe to a Kafka topic, you need to use the subscribe method of the Kafka consumer object. This method takes a list of topic names as an argument, and assigns the consumer to one or more partitions of those topics. You can also use a pattern to subscribe to multiple topics that match a regular expression. For example, you can subscribe to all topics that start with ‘test’ by using the pattern ‘test.*’.

Here is an example of how to subscribe to a Kafka topic in Python:

# Subscribe to a single topic
consumer.subscribe(['my-topic'])

# Subscribe to multiple topics
consumer.subscribe(['my-topic-1', 'my-topic-2', 'my-topic-3'])

# Subscribe to topics that match a pattern
consumer.subscribe(pattern='test.*')

After subscribing to a topic, you can start receiving messages from it by using a for loop. The Kafka consumer object is an iterator that returns a ConsumerRecord object for each message. A ConsumerRecord object contains the following attributes:

  • topic: the name of the topic
  • partition: the id of the partition
  • offset: the offset of the message within the partition
  • timestamp: the timestamp of the message
  • key: the key of the message (optional)
  • value: the value of the message

Here is an example of how to receive messages from a Kafka topic in Python:

# Loop over the messages in the topic
for message in consumer:
    # Print the message attributes
    print(f"Topic: {message.topic}")
    print(f"Partition: {message.partition}")
    print(f"Offset: {message.offset}")
    print(f"Timestamp: {message.timestamp}")
    print(f"Key: {message.key}")
    print(f"Value: {message.value}")
    print()

As you can see, receiving messages from a Kafka topic is quite easy and convenient. You just need to use a for loop and access the message attributes. In the next section, you will learn how to process each message according to your application logic.

5.3. Configuring the Kafka Consumer

In the previous section, you learned how to create a Kafka consumer object and subscribe to a Kafka topic. In this section, you will learn how to configure the Kafka consumer with various options that affect its behavior and performance. You will also learn how to use the value_deserializer parameter to specify how to decode the messages from binary format to Python objects.

The Kafka consumer object accepts many parameters that can be passed to its constructor. Some of the most important parameters are:

  • group_id: This parameter specifies the consumer group id that the consumer belongs to. A consumer group is a set of consumers that cooperate to consume messages from a topic. Each consumer in a group is assigned a subset of partitions of the topic, and only consumes messages from those partitions. This way, the consumers can balance the load and avoid duplicate consumption. If you don’t specify a group_id, the consumer will act as a standalone consumer that consumes all the messages from the topic.
  • auto_offset_reset: This parameter specifies what to do when the consumer has no valid offset for a partition, or when the offset is out of range. This can happen when the consumer is new and has not committed any offset, or when the messages have been deleted or compacted by the broker. The possible values for this parameter are ‘earliest’, ‘latest’, or ‘none’. If you set it to ‘earliest’, the consumer will start from the beginning of the topic. If you set it to ‘latest’, the consumer will start from the end of the topic. If you set it to ‘none’, the consumer will raise an exception if there is no valid offset.
  • enable_auto_commit: This parameter specifies whether to enable the automatic commit of the offsets. The offset is a number that indicates the position of the message within the partition. By committing the offset, the consumer tells the broker which messages it has already consumed, and which messages it needs to consume next. If you set this parameter to True, the consumer will commit the offsets periodically, based on the auto_commit_interval_ms parameter. If you set it to False, you will have to manually commit the offsets using the commit method of the consumer object.
  • value_deserializer: This parameter specifies a function that takes a binary value as an argument and returns a Python object. This function is used to decode the messages from the binary format that they are stored in the broker to the Python format that you can use in your application. You can use any function that can perform this conversion, such as lambda x: x.decode(‘utf-8’) for utf-8 strings, or json.loads for JSON objects. You can also use the key_deserializer parameter to specify a function for decoding the keys of the messages, if they are present.

Here is an example of how to configure the Kafka consumer with some of these parameters:

# Import the KafkaConsumer class and the json module
from kafka import KafkaConsumer
import json

# Create a Kafka consumer object with some configuration options
consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'], # List of brokers
    group_id='my-group', # Consumer group id
    auto_offset_reset='earliest', # Start from the beginning of the topic
    enable_auto_commit=False, # Disable the automatic commit of the offsets
    value_deserializer=lambda x: json.loads(x) # Decode the messages as JSON objects
)

As you can see, configuring the Kafka consumer is quite flexible and easy. You just need to pass the parameters that you want to change to the constructor of the consumer object. In the next section, you will learn how to process each message according to your application logic.

5.4. Deserializing Messages with JSON and Avro

In the previous section, you learned how to serialize messages with JSON and Avro formats before sending them to a Kafka topic. In this section, you will learn how to deserialize messages with JSON and Avro formats after receiving them from a Kafka topic.

Deserializing messages means converting them from a binary or text format to a Python object that you can manipulate and process. Deserializing messages is necessary if you want to access the data and metadata of the messages, such as the key, value, timestamp, or headers.

To deserialize messages with JSON and Avro formats, you will need to use the same libraries and schemas that you used for serialization. You will also need to specify the deserializer function for the key and value of the messages when creating the Kafka consumer.

Let’s see how to deserialize messages with JSON and Avro formats using the kafka-python library.

Deserializing Messages with JSON

To deserialize messages with JSON format, you will need to use the json library that comes with Python. You will also need to use the json.loads() function to convert a JSON string to a Python object.

When creating the Kafka consumer, you will need to pass the json.loads function as the value for the value_deserializer parameter. This will tell the consumer to use the json.loads function to deserialize the value of the messages. You can also pass the json.loads function as the value for the key_deserializer parameter if you want to deserialize the key of the messages as well.

Here is an example of how to create a Kafka consumer that deserializes messages with JSON format:

# Import the json library
import json

# Import the KafkaConsumer class from kafka-python
from kafka import KafkaConsumer

# Create a Kafka consumer that connects to a bootstrap server
consumer = KafkaConsumer(
    bootstrap_servers='localhost:9092',
    # Specify the deserializer function for the key and value of the messages
    key_deserializer=json.loads,
    value_deserializer=json.loads
)

# Subscribe to a Kafka topic
consumer.subscribe(['json_topic'])

# Loop over the messages in the topic
for message in consumer:
    # Print the key and value of the message as Python objects
    print(message.key, message.value)

The output of the code above might look something like this:

{'name': 'Alice'} {'age': 25, 'city': 'New York'}
{'name': 'Bob'} {'age': 30, 'city': 'London'}
{'name': 'Charlie'} {'age': 35, 'city': 'Paris'}

As you can see, the key and value of the messages are deserialized as Python dictionaries, which you can access and manipulate as you wish.

Deserializing Messages with Avro

To deserialize messages with Avro format, you will need to use the avro library that you installed with pip. You will also need to use the avro.io and io modules to create a BinaryDecoder and a DatumReader objects. These objects will help you decode the binary data and read the Avro schema of the messages.

When creating the Kafka consumer, you will need to pass a custom deserializer function as the value for the value_deserializer parameter. This function will take the binary data of the message value as input, and return a Python object as output. You can also pass a custom deserializer function as the value for the key_deserializer parameter if you want to deserialize the key of the messages as well.

Here is an example of how to create a Kafka consumer that deserializes messages with Avro format:

# Import the avro library
import avro

# Import the KafkaConsumer class from kafka-python
from kafka import KafkaConsumer

# Import the io module
import io

# Define a custom deserializer function for the message value
def value_deserializer(data):
    # Create a BytesIO object from the binary data
    bytes_reader = io.BytesIO(data)
    # Create a BinaryDecoder object from the BytesIO object
    decoder = avro.io.BinaryDecoder(bytes_reader)
    # Create a DatumReader object with the Avro schema
    reader = avro.io.DatumReader(avro.schema.parse(open("value_schema.avsc", "rb").read()))
    # Read the Avro schema and return a Python object
    return reader.read(decoder)

# Create a Kafka consumer that connects to a bootstrap server
consumer = KafkaConsumer(
    bootstrap_servers='localhost:9092',
    # Specify the deserializer function for the value of the messages
    value_deserializer=value_deserializer
)

# Subscribe to a Kafka topic
consumer.subscribe(['avro_topic'])

# Loop over the messages in the topic
for message in consumer:
    # Print the value of the message as a Python object
    print(message.value)

The output of the code above might look something like this:

{'name': 'Alice', 'age': 25, 'city': 'New York'}
{'name': 'Bob', 'age': 30, 'city': 'London'}
{'name': 'Charlie', 'age': 35, 'city': 'Paris'}

As you can see, the value of the messages are deserialized as Python dictionaries, which you can access and manipulate as you wish.

In this section, you learned how to deserialize messages with JSON and Avro formats using the kafka-python library. In the next and final section, you will learn how to conclude your blog and provide some resources for further learning.

6. Conclusion

Congratulations! You have reached the end of this blog on how to produce and consume messages with Kafka and Python. You have learned how to use Kafka’s producer and consumer APIs to send and receive messages with Python. You have also learned how to use different configuration options and serialization formats to customize your messages and optimize your performance.

Here are some key points that you have learned from this blog:

  • Kafka is a distributed streaming platform that enables you to publish and subscribe to streams of messages, process them in real time, and store them in a scalable and fault-tolerant way.
  • Kafka’s producer and consumer APIs allow you to create producers and consumers that can send and receive messages to and from a Kafka topic.
  • Kafka’s configuration options allow you to adjust the behavior and performance of your producers and consumers, such as the batch size, compression type, acks, retries, and timeouts.
  • Kafka’s serialization and deserialization functions allow you to convert your messages from Python objects to binary or text formats, and vice versa, such as JSON and Avro.

By following this blog, you have gained a solid foundation on how to work with Kafka and Python. You can now use these skills to build your own applications that need to handle streams of messages in a fast, reliable, and scalable way.

If you want to learn more about Kafka and Python, here are some resources that you can check out:

Thank you for reading this blog. I hope you enjoyed it and learned something new. If you have any questions or feedback, please feel free to leave a comment below. Happy coding!

Leave a Reply

Your email address will not be published. Required fields are marked *