1. Introduction
In this blog, you will learn how to use Kafka Streams with Python to process data in real-time. Kafka Streams is a library that allows you to perform stream processing on data in Kafka, a distributed messaging system. Stream processing is a way of processing data as it arrives, rather than waiting for it to be stored in a database or a file. This enables you to handle large volumes of data with low latency and high throughput.
By using Kafka Streams with Python, you will be able to use the power and simplicity of Python to write stream processing applications. You will also be able to use different operators and transformations to manipulate data streams, such as filtering, mapping, aggregating, joining, and windowing. You will also see some examples of how to use Kafka Streams with Python to perform common tasks, such as counting words, detecting anomalies, and enriching data.
To follow this blog, you will need some basic knowledge of Kafka and Python. You will also need to install and set up Kafka Streams with Python on your machine. If you are not familiar with these topics, don’t worry, we will cover them in the next sections.
Are you ready to learn how to use Kafka Streams with Python to process data in real-time? Let’s get started!
2. What is Kafka Streams and Why Use It?
Kafka Streams is a stream processing library that allows you to perform real-time data processing on data in Kafka. But what does that mean and why is it useful?
Let’s start with some definitions. Kafka is a distributed messaging system that enables you to publish and subscribe to streams of data. A stream is a continuous flow of data records that are ordered by time. A data record is a key-value pair that contains some information. For example, a stream of sensor data might contain records with keys like “temperature” or “humidity” and values like “25°C” or “60%”.
Stream processing is a way of processing data as it arrives, rather than waiting for it to be stored in a database or a file. This enables you to handle large volumes of data with low latency and high throughput. Stream processing can be used for various purposes, such as:
- Analytics: You can analyze data streams to extract insights, such as trends, patterns, or anomalies.
- Transformation: You can transform data streams to change their format, structure, or content.
- Enrichment: You can enrich data streams by adding more information from other sources, such as databases or APIs.
- Aggregation: You can aggregate data streams to compute summary statistics, such as counts, averages, or sums.
- Joining: You can join data streams to combine data from different sources, such as sensors or users.
- Filtering: You can filter data streams to remove unwanted or irrelevant data.
Kafka Streams is a library that allows you to perform stream processing on data in Kafka. It provides a simple and flexible API that lets you write stream processing applications in Java or Scala. You can also use Python, thanks to the kafka-streams-python library, which is a wrapper around the Java API.
Some of the benefits of using Kafka Streams are:
- Scalability: You can scale your stream processing applications by adding more instances or partitions, without changing the code.
- Fault-tolerance: You can handle failures and recover from them automatically, thanks to Kafka’s replication and offset tracking features.
- Statefulness: You can maintain state in your stream processing applications, such as local variables, caches, or tables, and persist them in Kafka or RocksDB.
- Interoperability: You can integrate your stream processing applications with other Kafka clients or applications, such as producers, consumers, or connectors.
- Testability: You can test your stream processing applications using the built-in testing utilities or the interactive query feature.
So, if you want to process data in real-time, Kafka Streams is a great choice. It allows you to write stream processing applications in Python, using the power and simplicity of the language. It also allows you to use different operators and transformations to manipulate data streams, which we will cover in the next section.
But before we dive into the details of how to use Kafka Streams with Python, we need to install and set up the library on our machine. That’s what we will do in the following section.
3. How to Install and Set Up Kafka Streams with Python
In this section, you will learn how to install and set up Kafka Streams with Python on your machine. You will need the following prerequisites:
- A working installation of Python 3 and pip, the Python package manager.
- A working installation of Kafka and ZooKeeper, the distributed coordination service.
- A basic understanding of how to use Kafka as a producer and consumer of data streams.
If you don’t have these prerequisites, you can follow the links above to download and install them. You can also refer to the Kafka quickstart guide for a brief introduction to Kafka and ZooKeeper.
Once you have the prerequisites, you can proceed with the following steps:
- Install the kafka-streams-python library using pip. This library is a wrapper around the Kafka Streams Java API that allows you to use Python to write stream processing applications. To install it, run the following command in your terminal:
pip install kafka-streams-python
- Start ZooKeeper and Kafka on your machine. You can use the default configuration files that come with the Kafka installation. To start ZooKeeper, run the following command in your terminal:
bin/zookeeper-server-start.sh config/zookeeper.properties
- To start Kafka, run the following command in another terminal:
bin/kafka-server-start.sh config/server.properties
- Create two topics in Kafka, one for the input data stream and one for the output data stream. You can use any names you like, but for this tutorial, we will use “input-topic” and “output-topic”. To create the topics, run the following commands in another terminal:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic input-topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic output-topic
- Verify that the topics are created successfully by listing the topics in Kafka. To do that, run the following command in the same terminal:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
You should see the names of the topics you created, along with some other default topics.
- Write a simple stream processing application in Python that reads data from the input topic, applies a transformation, and writes the result to the output topic. For this tutorial, we will use a simple transformation that converts the data to uppercase. To write the application, create a new file called “app.py” and paste the following code:
from kafka_streams import KafkaStreams, StreamsBuilder
# Define the input and output topics
input_topic = "input-topic"
output_topic = "output-topic"
# Create a StreamsBuilder object
builder = StreamsBuilder()
# Create a stream from the input topic
stream = builder.stream(input_topic)
# Apply a transformation to the stream
# In this case, we convert the data to uppercase
transformed_stream = stream.map(lambda key, value: (key, value.upper()))
# Write the transformed stream to the output topic
transformed_stream.to(output_topic)
# Create a KafkaStreams object
streams = KafkaStreams(builder, application_id="my-app")
# Start the stream processing application
streams.start()
# Wait for the application to terminate
streams.join()
- Run the stream processing application by running the following command in your terminal:
python app.py
You should see some logs indicating that the application is running and connected to Kafka.
- Test the stream processing application by producing some data to the input topic and consuming the data from the output topic. To do that, you can use the Kafka console producer and consumer tools that come with the Kafka installation. To produce some data, run the following command in another terminal:
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic input-topic
Then, type some messages and press enter. For example, you can type:
hello
world
kafka
streams
python
- To consume the data from the output topic, run the following command in another terminal:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output-topic --from-beginning
You should see the messages you typed in uppercase, such as:
HELLO
WORLD
KAFKA
STREAMS
PYTHON
Congratulations! You have successfully installed and set up Kafka Streams with Python and written a simple stream processing application. You can stop the application by pressing Ctrl+C in the terminal where you ran the app.py file.
In the next section, you will learn how to create and consume data streams with Kafka Streams in more detail.
4. How to Create and Consume Data Streams with Kafka Streams
In the previous section, you learned how to install and set up Kafka Streams with Python and write a simple stream processing application. In this section, you will learn how to create and consume data streams with Kafka Streams in more detail.
As you may recall, a data stream is a continuous flow of data records that are ordered by time. A data record is a key-value pair that contains some information. For example, a stream of sensor data might contain records with keys like “temperature” or “humidity” and values like “25°C” or “60%”.
To create a data stream with Kafka Streams, you need to use a StreamsBuilder object. A StreamsBuilder object is a factory that allows you to create various types of streams from different sources, such as topics, tables, or global tables. A topic is a named category of data records that are published and subscribed by Kafka clients. A table is a view of a stream that represents the latest value for each key. A global table is a table that is fully replicated on each Kafka Streams instance.
To create a stream from a topic, you can use the stream method of the StreamsBuilder object. The stream method takes the name of the topic as an argument and returns a KStream object. A KStream object is an abstraction of a stream that allows you to perform various operations on the data records, such as filtering, mapping, aggregating, joining, or windowing. You can also create a stream from a table or a global table using the table or globalTable methods, respectively. These methods return a KTable or a GlobalKTable object, which are abstractions of tables that allow you to perform similar operations as KStream objects.
To consume a data stream with Kafka Streams, you need to use the to method of the KStream, KTable, or GlobalKTable object. The to method takes the name of the topic as an argument and writes the data records to that topic. You can also specify a Producer object that configures how the data records are written to the topic, such as the key and value serializers, the partitioner, or the timestamp extractor. A Producer object is a Kafka client that publishes data records to topics.
Here is an example of how to create and consume a data stream with Kafka Streams in Python. In this example, we create a stream from the input topic, apply a transformation that converts the data to uppercase, and write the result to the output topic.
from kafka_streams import KafkaStreams, StreamsBuilder
# Define the input and output topics
input_topic = "input-topic"
output_topic = "output-topic"
# Create a StreamsBuilder object
builder = StreamsBuilder()
# Create a stream from the input topic
stream = builder.stream(input_topic)
# Apply a transformation to the stream
# In this case, we convert the data to uppercase
transformed_stream = stream.map(lambda key, value: (key, value.upper()))
# Write the transformed stream to the output topic
transformed_stream.to(output_topic)
# Create a KafkaStreams object
streams = KafkaStreams(builder, application_id="my-app")
# Start the stream processing application
streams.start()
# Wait for the application to terminate
streams.join()
As you can see, creating and consuming data streams with Kafka Streams is quite simple and intuitive. You just need to use the StreamsBuilder object to create streams from different sources, apply various operations on the streams, and write the results to different topics. You can also use the Kafka console producer and consumer tools to test your stream processing applications, as we did in the previous section.
In the next section, you will learn how to apply different operators and transformations on data streams with Kafka Streams.
5. How to Apply Operators and Transformations on Data Streams
In the previous section, you learned how to create and consume data streams with Kafka Streams and Python. In this section, you will learn how to apply different operators and transformations on data streams with Kafka Streams.
Operators and transformations are functions that allow you to manipulate data streams in various ways, such as filtering, mapping, aggregating, joining, or windowing. They can be applied to KStream, KTable, or GlobalKTable objects, which are abstractions of streams and tables that we introduced in the previous section.
There are two types of operators and transformations: stateless and stateful. Stateless operators and transformations do not require any state to be maintained or stored. They only operate on the current data record and produce a new data record. For example, filtering and mapping are stateless operations. Stateful operators and transformations require some state to be maintained or stored. They operate on the current and previous data records and produce a new data record. For example, aggregating and joining are stateful operations.
Some of the common operators and transformations that you can use with Kafka Streams are:
- Filtering: Filtering is a stateless operation that allows you to remove data records that do not match a given condition. You can use the filter method of the KStream, KTable, or GlobalKTable object to apply a filtering function that returns a boolean value. For example, you can filter out data records that have a null value or a negative value.
- Mapping: Mapping is a stateless operation that allows you to change the key and/or value of a data record. You can use the map method of the KStream object or the mapValues method of the KTable or GlobalKTable object to apply a mapping function that returns a new key and/or value. For example, you can map the value of a data record to uppercase or lowercase.
- Aggregating: Aggregating is a stateful operation that allows you to compute a summary statistic for each key in a data stream. You can use the groupByKey method of the KStream object or the groupBy method of the KTable or GlobalKTable object to group the data records by key. Then, you can use the aggregate, count, or reduce methods to apply an aggregating function that returns a new value. For example, you can aggregate the values of a data stream to calculate the sum, average, or maximum.
- Joining: Joining is a stateful operation that allows you to combine data records from two or more data streams based on a common key. You can use the join, leftJoin, or outerJoin methods of the KStream or KTable object to join two data streams. You can also use the join or leftJoin methods of the GlobalKTable object to join a global table with a data stream. You need to provide a joining function that returns a new value. For example, you can join two data streams to enrich the data with additional information.
- Windowing: Windowing is a stateful operation that allows you to divide a data stream into smaller segments based on time or count. You can use the windowedBy method of the KStream object to apply a windowing function that returns a Windowed object. A Windowed object is an abstraction of a window that has a start and end time. You can use different types of windows, such as tumbling, hopping, sliding, or session windows. For example, you can window a data stream to analyze the data in a specific time interval.
- Time-Based Operations: Time-based operations are operations that depend on the timestamp of the data records. You can use the suppress method of the KTable object to suppress the intermediate updates of a data stream until a given time condition is met. You can also use the toStream method of the KTable object to convert a table to a stream based on the update timestamp. For example, you can suppress the updates of a data stream to emit only the final result after a window closes.
These are some of the operators and transformations that you can use with Kafka Streams to manipulate data streams. You can also use other operators and transformations, such as branching, merging, flattening, or selecting. You can find more information about them in the Kafka Streams DSL API documentation.
In the next section, you will see some examples of how to use Kafka Streams with Python to perform common tasks, such as counting words, detecting anomalies, and enriching data.
5.1. Filtering and Mapping
In this section, you will learn how to apply two basic operators on data streams: filtering and mapping. Operators are functions that take one or more data streams as input and produce one or more data streams as output. They allow you to manipulate data streams in various ways, such as transforming, aggregating, joining, or windowing.
Filtering and mapping are two common operators that you can use to change the content or structure of data streams. Filtering allows you to remove data records that do not satisfy a certain condition, while mapping allows you to modify data records by applying a function to them. For example, you can use filtering to remove records that have a null value, and you can use mapping to convert records from one data type to another.
To use filtering and mapping operators with Kafka Streams, you need to use the filter
and map
methods of the KStream
class. The KStream
class represents a data stream in Kafka Streams, and it provides various methods to apply operators on it. The filter
method takes a predicate function as an argument, and returns a new KStream
that contains only the records that satisfy the predicate. The map
method takes a mapper function as an argument, and returns a new KStream
that contains the records that are transformed by the mapper function.
Let’s see an example of how to use filtering and mapping operators with Kafka Streams and Python. Suppose you have a data stream that contains records of user actions on a website, such as clicks, views, or purchases. Each record has a key that is the user ID, and a value that is a JSON object with the action type and the timestamp. For example, a record might look like this:
('user1', '{"action": "click", "timestamp": "2021-01-01T12:34:56Z"}')
Now, suppose you want to filter out the records that have a null value, and map the records to a new format that has the user ID and the action type as a tuple. For example, you want to transform the record above to this:
('user1', 'click')
To do this, you can use the following code:
from kafka_streams import KafkaStreams
from kafka_streams.streams import KStream
# Create a KafkaStreams instance
ks = KafkaStreams()
# Create a KStream from a Kafka topic
user_actions = ks.stream("user-actions")
# Define a predicate function to filter out null values
def not_null(record):
key, value = record
return value is not None
# Define a mapper function to transform the record format
def transform(record):
key, value = record
value = json.loads(value)
return (key, value["action"])
# Apply the filter and map operators on the KStream
filtered_mapped = user_actions.filter(not_null).map(transform)
# Write the result to another Kafka topic
filtered_mapped.to("filtered-mapped")
# Start the KafkaStreams instance
ks.start()
As you can see, the code is simple and intuitive. You can use the filter
and map
methods to apply the filtering and mapping operators on the data stream, and use the to
method to write the result to another Kafka topic. You can also chain multiple operators together, as shown in the example.
Filtering and mapping are just two of the many operators that you can use with Kafka Streams and Python. In the next sections, you will learn how to use other operators, such as aggregating, reducing, joining, and windowing, to perform more complex stream processing tasks.
5.2. Aggregating and Reducing
In this section, you will learn how to apply two more operators on data streams: aggregating and reducing. Aggregating and reducing are two operators that allow you to compute summary statistics on data streams, such as counts, sums, averages, or minima. For example, you can use aggregating and reducing to calculate the total number of clicks, the average temperature, or the minimum price.
To use aggregating and reducing operators with Kafka Streams, you need to use the group_by
, aggregate
, and reduce
methods of the KStream
class. The group_by
method takes a grouper function as an argument, and returns a KGroupedStream
that groups the records by the grouper function. The aggregate
method takes an initializer function, an aggregator function, and a store name as arguments, and returns a KTable
that contains the aggregated values for each group. The reduce
method takes a reducer function and a store name as arguments, and returns a KTable
that contains the reduced values for each group.
Let’s see an example of how to use aggregating and reducing operators with Kafka Streams and Python. Suppose you have a data stream that contains records of product purchases on an online store. Each record has a key that is the product ID, and a value that is the price. For example, a record might look like this:
('p1', '10.99')
Now, suppose you want to aggregate the records by the product ID, and calculate the total revenue and the average price for each product. For example, you want to transform the records to this:
('p1', {'total': '21.98', 'average': '10.99'})
To do this, you can use the following code:
from kafka_streams import KafkaStreams
from kafka_streams.streams import KStream
# Create a KafkaStreams instance
ks = KafkaStreams()
# Create a KStream from a Kafka topic
product_purchases = ks.stream("product-purchases")
# Define a grouper function to group by the product ID
def group_by_product(record):
key, value = record
return key
# Define an initializer function to initialize the aggregation
def init():
return {'total': '0.00', 'average': '0.00', 'count': 0}
# Define an aggregator function to update the aggregation
def aggregate(agg_value, new_value):
agg_value['total'] = str(float(agg_value['total']) + float(new_value))
agg_value['count'] += 1
agg_value['average'] = str(float(agg_value['total']) / agg_value['count'])
return agg_value
# Apply the group_by and aggregate operators on the KStream
aggregated = product_purchases.group_by(group_by_product).aggregate(init, aggregate, "aggregated-store")
# Write the result to another Kafka topic
aggregated.to("aggregated")
# Start the KafkaStreams instance
ks.start()
As you can see, the code is simple and intuitive. You can use the group_by
and aggregate
methods to apply the aggregating operator on the data stream, and use the to
method to write the result to another Kafka topic. You can also use the reduce
method instead of the aggregate
method, if you only need to compute a single value for each group, such as the sum or the minimum.
Aggregating and reducing are just two of the many operators that you can use with Kafka Streams and Python. In the next sections, you will learn how to use other operators, such as joining, merging, and windowing, to perform more complex stream processing tasks.
5.3. Joining and Merging
In this section, you will learn how to apply two more operators on data streams: joining and merging. Joining and merging are two operators that allow you to combine data streams from different sources, such as sensors, users, or databases. For example, you can use joining and merging to enrich data streams with additional information, correlate data streams from different events, or compare data streams from different time periods.
To use joining and merging operators with Kafka Streams, you need to use the join
, outer_join
, left_join
, and merge
methods of the KStream
class. The join
method takes another KStream
, a joiner function, a window specification, and a store name as arguments, and returns a new KStream
that contains the records that are joined by the joiner function within the window. The outer_join
and left_join
methods are similar to the join
method, but they also include the records that do not have a matching record in the other stream. The merge
method takes another KStream
as an argument, and returns a new KStream
that contains the records from both streams.
Let’s see an example of how to use joining and merging operators with Kafka Streams and Python. Suppose you have two data streams that contain records of user actions and user profiles on a social media platform. The user actions stream has a key that is the user ID, and a value that is a JSON object with the action type and the timestamp. The user profiles stream has a key that is the user ID, and a value that is a JSON object with the user name and the location. For example, the records might look like this:
# User actions stream
('user1', '{"action": "post", "timestamp": "2021-01-01T12:34:56Z"}')
('user2', '{"action": "like", "timestamp": "2021-01-01T12:35:12Z"}')
# User profiles stream
('user1', '{"name": "Alice", "location": "New York"}')
('user2', '{"name": "Bob", "location": "London"}')
Now, suppose you want to join the two streams by the user ID, and create a new stream that contains the user name, the action type, and the timestamp. For example, you want to transform the records to this:
('user1', '{"name": "Alice", "action": "post", "timestamp": "2021-01-01T12:34:56Z"}')
('user2', '{"name": "Bob", "action": "like", "timestamp": "2021-01-01T12:35:12Z"}')
To do this, you can use the following code:
from kafka_streams import KafkaStreams
from kafka_streams.streams import KStream
# Create a KafkaStreams instance
ks = KafkaStreams()
# Create two KStreams from two Kafka topics
user_actions = ks.stream("user-actions")
user_profiles = ks.stream("user-profiles")
# Define a joiner function to combine the records
def joiner(action_value, profile_value):
action_value = json.loads(action_value)
profile_value = json.loads(profile_value)
return json.dumps({
"name": profile_value["name"],
"action": action_value["action"],
"timestamp": action_value["timestamp"]
})
# Define a window specification to join the records within 10 seconds
window = ks.time_windows(10)
# Apply the join operator on the two KStreams
joined = user_actions.join(user_profiles, joiner, window, "joined-store")
# Write the result to another Kafka topic
joined.to("joined")
# Start the KafkaStreams instance
ks.start()
As you can see, the code is simple and intuitive. You can use the join
method to apply the joining operator on the two streams, and use the to
method to write the result to another Kafka topic. You can also use the outer_join
or left_join
methods instead of the join
method, if you want to include the records that do not have a matching record in the other stream. You can also use the merge
method to merge the two streams into one stream, without applying any joiner function.
Joining and merging are just two of the many operators that you can use with Kafka Streams and Python. In the next sections, you will learn how to use other operators, such as windowing and time-based operations, to perform more complex stream processing tasks.
5.4. Windowing and Time-Based Operations
Another important aspect of stream processing is windowing and time-based operations. Windowing is a way of dividing a stream into smaller chunks based on time or other criteria. Time-based operations are ways of manipulating or querying data streams based on time or event order.
Why do we need windowing and time-based operations? Because streams are unbounded and continuous, we cannot process them as a whole. We need to break them down into manageable units that can be processed independently and incrementally. Windowing and time-based operations allow us to do that, and also enable us to perform complex analytics and aggregations on data streams.
Kafka Streams provides several types of windows and time-based operations that you can use in your stream processing applications. Some of the most common ones are:
- Tumbling windows: These are fixed-size, non-overlapping windows that divide a stream into equal intervals. For example, you can use a tumbling window of one minute to count the number of events that occur in each minute.
- Hopping windows: These are fixed-size, overlapping windows that divide a stream into equal intervals with a certain amount of overlap. For example, you can use a hopping window of one minute with a hop of 10 seconds to count the number of events that occur in each minute, but also update the count every 10 seconds.
- Sliding windows: These are variable-size, overlapping windows that divide a stream based on the difference between the timestamps of the records. For example, you can use a sliding window of one minute to count the number of events that occur within one minute of each other.
- Session windows: These are variable-size, non-overlapping windows that divide a stream based on the periods of activity and inactivity of the records. For example, you can use a session window of 10 minutes to count the number of events that occur within a session of 10 minutes or less, where a session is defined by a gap of inactivity of more than 10 minutes.
- Windowed joins: These are ways of joining two streams based on the windows that they belong to. For example, you can use a windowed join to join two streams of user clicks and purchases based on the time window that they occur in.
- Time-based aggregations: These are ways of aggregating data streams based on the windows that they belong to. For example, you can use a time-based aggregation to compute the average temperature of a stream of sensor data based on the tumbling window of one hour that they belong to.
- Time-based queries: These are ways of querying data streams based on the windows that they belong to or the timestamps of the records. For example, you can use a time-based query to retrieve the latest value of a stream of stock prices based on the timestamp of the record.
To use windowing and time-based operations in Kafka Streams with Python, you need to use the windowed_by
method on a KStream
or a KTable
object. This method takes a Window
object as an argument, which defines the type and size of the window. You can use the TimeWindows
, HoppingWindows
, SlidingWindows
, or SessionWindows
classes to create different types of windows. You can also use the join
, aggregate
, or query
methods on the windowed stream or table to perform different types of operations.
Let’s see an example of how to use windowing and time-based operations in Kafka Streams with Python. Suppose we have two streams of data: one for user clicks and one for user purchases. We want to join these two streams based on the time window that they occur in, and compute the conversion rate for each window. The conversion rate is the ratio of the number of purchases to the number of clicks. We will use a tumbling window of one hour to divide the streams into equal intervals.
First, we need to import the necessary modules and create a StreamsBuilder
object.
from kafka_streams import StreamsBuilder, KStream, KTable, TimeWindows
from kafka_streams.serdes import StringSerde, IntegerSerde, FloatSerde
builder = StreamsBuilder()
Next, we need to create two KStream
objects from the topics that contain the user clicks and purchases data. We assume that the data records have the user ID as the key and the timestamp as the value.
clicks_stream = builder.stream("user-clicks", key_serde=StringSerde(), value_serde=IntegerSerde())
purchases_stream = builder.stream("user-purchases", key_serde=StringSerde(), value_serde=IntegerSerde())
Then, we need to create a TimeWindow
object that defines the tumbling window of one hour. We also need to specify the grace period, which is the amount of time that late-arriving records are allowed to be included in the window. We set the grace period to zero, meaning that no late records are accepted.
window = TimeWindows.of(3600000).grace(0)
Next, we need to use the windowed_by
method on both streams to create windowed streams. We also need to use the count
method on the windowed streams to count the number of records in each window.
clicks_windowed = clicks_stream.windowed_by(window).count()
purchases_windowed = purchases_stream.windowed_by(window).count()
Then, we need to use the join
method on the windowed streams to join them based on the window that they belong to. We also need to provide a function that computes the conversion rate for each window. The function takes the counts of clicks and purchases as arguments and returns the ratio of purchases to clicks.
def conversion_rate(clicks, purchases):
if clicks == 0:
return 0.0
else:
return float(purchases) / float(clicks)
conversion_rate_stream = clicks_windowed.join(purchases_windowed, conversion_rate, value_serde=FloatSerde())
Finally, we need to write the conversion rate stream to a new topic. We also need to start the stream processing application and wait for it to finish.
conversion_rate_stream.to("conversion-rate", value_serde=FloatSerde())
app = builder.build()
app.start()
app.join()
That’s it! We have successfully used windowing and time-based operations in Kafka Streams with Python to join two streams of data and compute the conversion rate for each window. You can use the same approach to perform other types of operations on data streams, such as aggregations, queries, or transformations.
In the next section, we will see some examples of how to use Kafka Streams with Python to perform common tasks, such as counting words, detecting anomalies, and enriching data.
6. Examples of Using Kafka Streams with Python
In this section, we will see some examples of how to use Kafka Streams with Python to perform common tasks, such as counting words, detecting anomalies, and enriching data. These examples will demonstrate how to use the different operators and transformations that we learned in the previous section, as well as some additional features of Kafka Streams.
For each example, we will assume that we have a stream of data from a specific topic, and we want to write the result of our stream processing application to a new topic. We will also assume that we have already imported the necessary modules and created a StreamsBuilder
object, as we did in the previous sections.
6.1. Word Count
One of the simplest and most classic examples of stream processing is word count. The goal is to count the number of occurrences of each word in a stream of text. This can be useful for analyzing the frequency and distribution of words in a corpus of documents, such as tweets, news articles, or books.
To perform word count in Kafka Streams with Python, we need to do the following steps:
- Create a
KStream
object from the topic that contains the text data. We assume that the data records have the document ID as the key and the text as the value. - Use the
flat_map
method on the stream to split the text into words and emit each word as a new record with the word as the key and the value as 1. - Use the
group_by
method on the stream to group the records by the word key. - Use the
count
method on the grouped stream to count the number of records for each word. - Write the resulting
KTable
object to a new topic.
Here is the code for the word count example:
# Create a KStream object from the topic "text-data"
text_stream = builder.stream("text-data", key_serde=StringSerde(), value_serde=StringSerde())
# Split the text into words and emit each word as a new record
word_stream = text_stream.flat_map(lambda key, value: [(word, 1) for word in value.split()])
# Group the records by the word key
word_grouped = word_stream.group_by(lambda key, value: key, key_serde=StringSerde(), value_serde=IntegerSerde())
# Count the number of records for each word
word_count = word_grouped.count()
# Write the resulting KTable object to the topic "word-count"
word_count.to("word-count", key_serde=StringSerde(), value_serde=IntegerSerde())
That’s it! We have successfully performed word count in Kafka Streams with Python. You can run this code and see the output in the topic “word-count”. You can also modify the code to perform other types of text analysis, such as sentiment analysis, topic modeling, or keyword extraction.
6.2. Anomaly Detection
Another common example of stream processing is anomaly detection. The goal is to identify and flag unusual or suspicious events in a stream of data. This can be useful for monitoring and alerting purposes, such as detecting fraud, cyberattacks, or system failures.
To perform anomaly detection in Kafka Streams with Python, we need to do the following steps:
- Create a
KStream
object from the topic that contains the event data. We assume that the data records have the event ID as the key and the event details as the value. - Use the
map_values
method on the stream to apply a function that scores each event based on some criteria. The function should return a score between 0 and 1, where 0 means normal and 1 means anomalous. - Use the
filter
method on the stream to filter out the events that have a score below a certain threshold. The threshold should be a parameter that can be adjusted based on the desired sensitivity. - Write the resulting
KStream
object to a new topic.
Here is the code for the anomaly detection example:
# Create a KStream object from the topic "event-data"
event_stream = builder.stream("event-data", key_serde=StringSerde(), value_serde=StringSerde())
# Define a function that scores each event based on some criteria
def score_event(event):
# This is a dummy function that returns a random score between 0 and 1
# You can replace this with your own logic to score the events
import random
return random.random()
# Apply the function to each event and emit the event ID and the score as a new record
scored_stream = event_stream.map_values(lambda value: score_event(value))
# Define a threshold for filtering out normal events
threshold = 0.9
# Filter out the events that have a score below the threshold
anomaly_stream = scored_stream.filter(lambda key, value: value >= threshold)
# Write the resulting KStream object to the topic "anomaly-data"
anomaly_stream.to("anomaly-data", key_serde=StringSerde(), value_serde=FloatSerde())
That’s it! We have successfully performed anomaly detection in Kafka Streams with Python. You can run this code and see the output in the topic “anomaly-data”. You can also modify the code to perform other types of event analysis, such as clustering, classification, or regression.
6.3. Data Enrichment
Another common example of stream processing is data enrichment. The goal is to add more information to a stream of data from other sources, such as databases or APIs. This can be useful for enhancing the quality and value of the data, as well as providing more context and insights.
To perform data enrichment in Kafka Streams with Python, we need to do the following steps:
- Create a
KStream
object from the topic that contains the data that we want to enrich. We assume that the data records have the data ID as the key and the data value as the value. - Create a
KTable
object from the topic that contains the data that we want to use for enrichment. We assume that the data records have the data ID as the key and the enrichment value as the value. - Use the
join
method on the stream and the table to join them based on the data ID. We also need to provide a function that combines the data value and the enrichment value into a new value. - Write the resulting
KStream
object to a new topic.
Here is the code for the data enrichment example:
# Create a KStream object from the topic "data-to-enrich"
data_stream = builder.stream("data-to-enrich", key_serde=StringSerde(), value_serde=StringSerde())
# Create a KTable object from the topic "enrichment-data"
enrichment_table = builder.table("enrichment-data", key_serde=StringSerde(), value_serde=StringSerde())
# Define a function that combines the data value and the enrichment value into a new value
def combine_data(data_value, enrichment_value):
# This is a dummy function that concatenates the two values with a comma
# You can replace this with your own logic to combine the values
return data_value + "," + enrichment_value
# Join the stream and the table based on the data ID and apply the function
enriched_stream = data_stream.join(enrichment_table, combine_data, value_serde=StringSerde())
# Write the resulting KStream object to the topic "enriched-data"
enriched_stream.to("enriched-data", key_serde=StringSerde(), value_serde=StringSerde())
That’s it! We have successfully performed data enrichment in Kafka Streams with Python. You can run this code and see the output in the topic “enriched-data”. You can also modify the code to perform other types of data manipulation, such as transformation, filtering, or aggregation.
7. Conclusion
In this blog, you learned how to use Kafka Streams with Python to process data in real-time. You learned what Kafka Streams is and why it is useful, how to install and set up Kafka Streams with Python, how to create and consume data streams with Kafka Streams, how to apply operators and transformations on data streams, and how to use windowing and time-based operations on data streams. You also saw some examples of how to use Kafka Streams with Python to perform common tasks, such as word count, anomaly detection, and data enrichment.
Kafka Streams is a powerful and flexible library that allows you to write stream processing applications in Python, using the power and simplicity of the language. It also allows
6.1. Word Count
In this section, we will see how to use Kafka Streams with Python to implement a simple word count application. The word count application is a classic example of stream processing, where we want to count the frequency of words in a stream of text data. For example, we might want to count the words in a stream of tweets, blog posts, or news articles.
To implement the word count application, we will need to perform the following steps:
- Create a Kafka producer that generates text data and sends it to a Kafka topic.
- Create a Kafka Streams application that reads the text data from the Kafka topic, splits it into words, and counts the occurrences of each word.
- Create a Kafka consumer that reads the word counts from the Kafka Streams application and prints them to the console.
Let’s see how to do each of these steps in Python.
Creating a Kafka Producer
To create a Kafka producer, we will use the kafka-python library, which is a Python client for Kafka. We will also use the Faker library, which is a Python library that generates fake data, such as names, addresses, or sentences. We will use Faker to generate some random sentences that we will send to the Kafka topic.
First, we need to install the kafka-python and Faker libraries using pip:
pip install kafka-python
pip install Faker
Next, we need to import the KafkaProducer class from the kafka-python library and the Faker class from the Faker library:
from kafka import KafkaProducer
from faker import Faker
Then, we need to create an instance of the KafkaProducer class, passing the bootstrap_servers parameter, which is a list of Kafka brokers that the producer can connect to. We also need to specify the value_serializer parameter, which is a function that converts the values to bytes before sending them to Kafka. We will use the encode method of the str class to convert the strings to bytes:
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: x.encode('utf-8'))
Next, we need to create an instance of the Faker class, passing the locale parameter, which is the language and country code of the fake data. We will use ‘en_US’ for English and United States:
faker = Faker('en_US')
Finally, we need to write a loop that generates a random sentence using the sentence method of the faker instance, and sends it to the Kafka topic using the send method of the producer instance. We will use ‘text’ as the name of the Kafka topic, and we will print the sentence to the console for debugging purposes. We will also add a sleep time of one second between each iteration to simulate a continuous stream of data:
import time
while True:
sentence = faker.sentence()
producer.send('text', value=sentence)
print(sentence)
time.sleep(1)
This completes the code for the Kafka producer. We can run it in a terminal window using the python command:
python producer.py
We should see something like this in the terminal:
Company quickly respond according.
Himself particularly instead.
Himself early enough.
...
This means that the producer is generating and sending random sentences to the Kafka topic ‘text’.
Creating a Kafka Streams Application
To create a Kafka Streams application, we will use the kafka-streams-python library, which is a wrapper around the Kafka Streams Java API. We will also use the re library, which is a Python library that provides regular expression operations. We will use re to split the sentences into words and remove any punctuation marks.
First, we need to install the kafka-streams-python library using pip:
pip install kafka-streams-python
Next, we need to import the KafkaStreams class from the kafka-streams-python library and the re library:
from kafka_streams import KafkaStreams
import re
Then, we need to define a function that takes a sentence as an input and returns a list of words as an output. We will use the sub method of the re library to replace any non-alphanumeric characters with spaces, and then use the split method of the str class to split the sentence by spaces. We will also use the lower method of the str class to convert the words to lowercase:
def split_sentence(sentence):
words = re.sub('[^a-zA-Z0-9]', ' ', sentence).split()
words = [word.lower() for word in words]
return words
Next, we need to create an instance of the KafkaStreams class, passing a list of Kafka brokers that the application can connect to, and a unique application id:
streams = KafkaStreams(['localhost:9092'], 'word-count-app')
Then, we need to create a stream object that reads the data from the Kafka topic ‘text’, using the stream method of the streams instance. We also need to specify the key_deserializer and value_deserializer parameters, which are functions that convert the keys and values from bytes to strings. We will use the decode method of the bytes class to do that:
stream = streams.stream('text', key_deserializer=lambda x: x.decode('utf-8'), value_deserializer=lambda x: x.decode('utf-8'))
Next, we need to apply the split_sentence function to each value in the stream, using the map_values method of the stream object. This will return a new stream object with the same keys but with values as lists of words:
stream = stream.map_values(split_sentence)
Then, we need to flatten the lists of words into individual words, using the flat_map method of the stream object. This will return a new stream object with keys as words and values as None:
stream = stream.flat_map(lambda k, v: [(word, None) for word in v])
Next, we need to count the occurrences of each word in the stream, using the count_by_key method of the stream object. This will return a new stream object with keys as words and values as counts:
stream = stream.count_by_key()
Finally, we need to write the word counts to another Kafka topic, using the to method of the stream object. We will use ‘word-count’ as the name of the Kafka topic, and we will also specify the key_serializer and value_serializer parameters, which are functions that convert the keys and values to bytes before sending them to Kafka. We will use the encode method of the str class to do that:
stream.to('word-count', key_serializer=lambda x: x.encode('utf-8'), value_serializer=lambda x: str(x).encode('utf-8'))
This completes the code for the Kafka Streams application. We can run it in another terminal window using the python command:
python streams.py
We should see something like this in the terminal:
[2021-10-18 16:05:06,789] INFO Kafka Streams version: 2.8.0 (org.apache.kafka.streams.KafkaStreams)
[2021-10-18 16:05:06,789] INFO Kafka Streams commit ID: ebb1d6e8f0a5f0c4 (org.apache.kafka.streams.KafkaStreams)
[2021-10-18 16:05:06,790] INFO Creating KafkaStreams object (org.apache.kafka.streams.KafkaStreams)
[2021-10-18 16:05:06,791] INFO stream-client [word-count-app-1b7f0f9f-0b0f-4c0f-8f1a-9f0c3c5c8f0c] Kafka Streams version: 2.8.0 (org.apache.kafka.streams.KafkaStreams)
[2021-10-18 16:
6.2. Anomaly Detection
In this section, we will see how to use Kafka Streams with Python to implement a simple anomaly detection application. Anomaly detection is a technique of identifying unusual or unexpected patterns or events in data streams, such as spikes, drops, or outliers. For example, we might want to detect anomalies in a stream of sensor data, such as temperature, pressure, or humidity.
To implement the anomaly detection application, we will need to perform the following steps:
- Create a Kafka producer that generates sensor data and sends it to a Kafka topic.
- Create a Kafka Streams application that reads the sensor data from the Kafka topic, computes the moving average and standard deviation of the data, and flags any data points that deviate from the normal range.
- Create a Kafka consumer that reads the anomaly detection results from the Kafka Streams application and prints them to the console.
Let’s see how to do each of these steps in Python.
Creating a Kafka Producer
To create a Kafka producer, we will use the same libraries and code as in the previous section, except that we will generate some random sensor data instead of random sentences. We will use the NumPy library, which is a Python library that provides scientific computing functions, such as generating random numbers. We will use NumPy to generate some random numbers that follow a normal distribution, with a mean of 50 and a standard deviation of 10. These numbers will represent the sensor readings, such as temperature in degrees Celsius.
First, we need to install the NumPy library using pip:
pip install numpy
Next, we need to import the KafkaProducer class from the kafka-python library and the numpy library:
from kafka import KafkaProducer
import numpy as np
Then, we need to create an instance of the KafkaProducer class, passing the same parameters as before:
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: x.encode('utf-8'))
Finally, we need to write a loop that generates a random number using the normal method of the np.random module, and sends it to the Kafka topic using the send method of the producer instance. We will use ‘sensor’ as the name of the Kafka topic, and we will print the number to the console for debugging purposes. We will also add a sleep time of one second between each iteration to simulate a continuous stream of data:
import time
while True:
reading = np.random.normal(50, 10)
producer.send('sensor', value=str(reading))
print(reading)
time.sleep(1)
This completes the code for the Kafka producer. We can run it in a terminal window using the python command:
python producer.py
We should see something like this in the terminal:
51.23456789
48.76543210
49.87654321
...
This means that the producer is generating and sending random sensor readings to the Kafka topic ‘sensor’.
Creating a Kafka Streams Application
To create a Kafka Streams application, we will use the same library and code as in the previous section, except that we will apply some different operators and transformations to the data stream. We will use the statistics library, which is a Python library that provides statistical functions, such as calculating the mean and standard deviation of a data set. We will use statistics to compute the moving average and standard deviation of the sensor readings, and flag any readings that are more than two standard deviations away from the average as anomalies.
First, we need to import the KafkaStreams class from the kafka-streams-python library and the statistics library:
from kafka_streams import KafkaStreams
import statistics
Then, we need to create an instance of the KafkaStreams class, passing the same parameters as before:
streams = KafkaStreams(['localhost:9092'], 'anomaly-detection-app')
Then, we need to create a stream object that reads the data from the Kafka topic ‘sensor’, using the stream method of the streams instance. We also need to specify the same deserializer parameters as before:
stream = streams.stream('sensor', key_deserializer=lambda x: x.decode('utf-8'), value_deserializer=lambda x: x.decode('utf-8'))
Next, we need to convert the values from strings to floats, using the map_values method of the stream object. This will return a new stream object with the same keys but with values as floats:
stream = stream.map_values(float)
Then, we need to compute the moving average and standard deviation of the values in the stream, using the aggregate method of the stream object. The aggregate method takes three parameters: an initializer function, an aggregator function, and a serializer function. The initializer function returns the initial value of the aggregation, which in our case is an empty list. The aggregator function takes the current value of the aggregation and the new value from the stream, and returns the updated value of the aggregation, which in our case is a list with the new value appended. The serializer function converts the value of the aggregation to bytes before sending it to Kafka, which in our case is the encode method of the str class. The aggregate method returns a new stream object with keys as None and values as lists of floats:
stream = stream.aggregate(lambda: [], lambda agg, val: agg + [val], lambda x: str(x).encode('utf-8'))
Next, we need to apply a windowing operation to the stream, using the window_by method of the stream object. A window is a way of grouping data by time intervals, such as seconds, minutes, or hours. A windowing operation applies a function to each window of data, such as counting, summing, or averaging. In our case, we want to apply a sliding window of 10 seconds, which means that we want to group the data by every 10 seconds, and update the result every time a new data point arrives. The window_by method takes two parameters: the size of the window in milliseconds, and the advance of the window in milliseconds. The size of the window is how long the window covers, and the advance of the window is how much the window moves forward each time. We will use 10000 milliseconds for both parameters, which means that we will have a sliding window of 10 seconds that updates every 10 seconds. The window_by method returns a new stream object with keys as window objects and values as lists of floats:
stream = stream.window_by(10000, 10000)
Then, we need to calculate the mean and standard deviation of each window of data, using the map_values method of the stream object. We will use the mean and stdev methods of the statistics library to do that. We will also check if the last value in the window is more than two standard deviations away from the mean, and if so, we will mark it as an anomaly. We will return a tuple of the mean, standard deviation, last value, and anomaly flag as the new value of the stream. This will return a new stream object with keys as window objects and values as tuples of floats and booleans:
stream = stream.map_values(lambda x: (statistics.mean(x), statistics.stdev(x), x[-1], abs(x[-1] - statistics.mean(x)) > 2 * statistics.stdev(x)))
Finally, we need to write the anomaly detection results to another Kafka topic, using the to method of the stream object. We will use ‘anomaly’ as the name of the Kafka topic, and we will also specify the same serializer parameters as before:
stream.to('anomaly', key_serializer=lambda x: x.encode('utf-8'), value_serializer=lambda x: str(x).encode('utf-8'))
This completes the code for the Kafka Streams application. We can run it in another terminal window using the python command:
python streams.py
We should see something like this in the terminal:
[2021-10-18 16:15:06,789] INFO Kafka Streams version: 2.8.0 (org.apache.kafka.streams.KafkaStreams)
[2021-10
6.3. Data Enrichment
In this section, you will learn how to use Kafka Streams with Python to enrich data streams by adding more information from other sources, such as databases or APIs. Data enrichment can be useful for enhancing the quality and value of your data, as well as providing more context and insights.
For example, suppose you have a stream of user events, such as clicks, views, or purchases, on an e-commerce website. You might want to enrich this stream by adding information about the user’s profile, such as their name, age, gender, location, or preferences. This can help you to better understand your customers and provide them with more personalized recommendations or offers.
To enrich data streams with Kafka Streams, you can use the KStream-KTable join operator, which allows you to join a stream with a table. A table is a view of a stream that represents the latest value for each key. For example, a table of user profiles can be derived from a stream of user updates, where each update contains the user’s ID and some attributes. The table will store the most recent attributes for each user ID.
The KStream-KTable join operator works as follows:
- For each record in the stream, it extracts the key and queries the table for the corresponding value.
- If the key is found in the table, it joins the stream record with the table value and emits the result.
- If the key is not found in the table, it discards the stream record and does not emit anything.
The KStream-KTable join operator is non-windowed, which means it does not depend on the time of the records. It always joins the stream record with the latest value in the table, regardless of when they were produced. This ensures that the join results are always up-to-date and consistent.
To use the KStream-KTable join operator with Python, you need to do the following steps:
- Create a stream from a Kafka topic that contains the data you want to enrich, such as user events.
- Create a table from another Kafka topic that contains the data you want to use for enrichment, such as user profiles.
- Define a join function that takes two parameters: the stream record and the table value, and returns the enriched record.
- Call the
join
method on the stream object, passing the table object and the join function as arguments. - Write the resulting stream to another Kafka topic, or consume it directly.
Let’s see an example of how to use the KStream-KTable join operator with Python to enrich a stream of user events with user profiles.
# Import the kafka-streams-python library
from kafka_streams import KafkaStreams, KStream, KTable
# Define the Kafka topics for the stream and the table
USER_EVENTS_TOPIC = "user-events"
USER_PROFILES_TOPIC = "user-profiles"
ENRICHED_EVENTS_TOPIC = "enriched-events"
# Create a KafkaStreams object
ks = KafkaStreams()
# Create a stream from the user events topic
user_events_stream = ks.stream(USER_EVENTS_TOPIC)
# Create a table from the user profiles topic
user_profiles_table = ks.table(USER_PROFILES_TOPIC)
# Define a join function that takes a user event and a user profile and returns an enriched event
def join_user_event_with_user_profile(user_event, user_profile):
# Parse the user event and the user profile as JSON objects
user_event = json.loads(user_event)
user_profile = json.loads(user_profile)
# Add the user profile attributes to the user event
user_event["user_name"] = user_profile["name"]
user_event["user_age"] = user_profile["age"]
user_event["user_gender"] = user_profile["gender"]
user_event["user_location"] = user_profile["location"]
user_event["user_preferences"] = user_profile["preferences"]
# Return the enriched event as a JSON string
return json.dumps(user_event)
# Join the stream with the table using the join function
enriched_events_stream = user_events_stream.join(user_profiles_table, join_user_event_with_user_profile)
# Write the resulting stream to the enriched events topic
enriched_events_stream.to(ENRICHED_EVENTS_TOPIC)
# Start the KafkaStreams object
ks.start()
Now, you have a stream of enriched user events that contains more information about the users, such as their name, age, gender, location, and preferences. You can use this stream for further analysis or processing, such as generating reports, dashboards, or recommendations.
In this section, you learned how to use Kafka Streams with Python to enrich data streams by joining them with tables. In the next section, you will learn how to use Kafka Streams with Python to perform windowing and time-based operations on data streams.
7. Conclusion
In this blog, you have learned how to use Kafka Streams with Python to process data in real-time. You have seen how to use different operators and transformations to manipulate data streams, such as filtering, mapping, aggregating, joining, and windowing. You have also seen some examples of how to use Kafka Streams with Python to perform common tasks, such as counting words, detecting anomalies, and enriching data.
Kafka Streams is a powerful and flexible library that allows you to write stream processing applications in Python, using the power and simplicity of the language. It also provides scalability, fault-tolerance, statefulness, interoperability, and testability features that make it easy and reliable to work with data streams.
If you want to learn more about Kafka Streams and Python, you can check out the following resources:
- The official documentation of Kafka Streams, which contains a comprehensive guide, a tutorial, and an API reference.
- The GitHub repository of the kafka-streams-python library, which contains the source code, installation instructions, and examples.
- The blog post by Confluent, which introduces Kafka Streams and its features, benefits, and use cases.
- The online course by Udemy, which teaches you how to use Kafka Streams with Java and Scala, and covers the core concepts, architecture, and API of the library.
We hope you enjoyed this blog and learned something new and useful. If you have any questions, feedback, or suggestions, please leave a comment below. Thank you for reading and happy streaming!