1. Introduction
Kafka is a popular distributed streaming platform that allows you to publish and subscribe to streams of records, store them in a fault-tolerant way, and process them as they occur. Kafka is widely used for various applications, such as data pipelines, event-driven architectures, microservices, and real-time analytics.
However, Kafka is not designed to be accessed directly from the web, as it uses a custom binary protocol over TCP. This means that you need to use a Kafka client library in your application to communicate with Kafka, which may not be available or convenient for some languages or platforms.
That’s where Kafka REST Proxy comes in. Kafka REST Proxy is a service that allows you to access Kafka via HTTP, using a RESTful API. With Kafka REST Proxy, you can use any language or tool that supports HTTP to interact with Kafka, without having to install or maintain a Kafka client library.
In this tutorial, you will learn how to use Kafka REST Proxy with Python to perform various operations on Kafka, such as producing and consuming messages, managing topics, and querying metadata. You will also learn how to use different endpoints and methods of the Kafka REST Proxy API, and how to handle the responses and errors.
By the end of this tutorial, you will be able to:
- Install and run Kafka REST Proxy on your local machine
- Use Python to interact with Kafka REST Proxy using the requests library
- Produce messages to Kafka topics using the /topics/{topic_name} endpoint
- Consume messages from Kafka topics using the /consumers/{consumer_group}/instances/{consumer_instance} endpoint
- Manage Kafka topics using the /topics and /topics/{topic_name} endpoints
- Query Kafka metadata using the /brokers and /partitions endpoints
Before you start, you will need the following prerequisites:
- A basic understanding of Kafka and its concepts
- A working installation of Kafka on your local machine
- A working installation of Python 3 on your local machine
- The requests library for Python, which you can install using pip install requests
Are you ready to learn how to use Kafka REST Proxy with Python? Let’s get started!
2. What is Kafka REST Proxy?
Kafka REST Proxy is a service that provides a RESTful interface to Kafka, allowing you to access Kafka via HTTP. It is part of the Confluent Platform, a distribution of Kafka that adds additional features and tools.
Kafka REST Proxy acts as a bridge between Kafka and the web, exposing various endpoints and methods that correspond to different operations on Kafka, such as:
- Producing messages to Kafka topics using the POST method
- Consuming messages from Kafka topics using the GET method
- Managing Kafka topics using the GET, POST, and DELETE methods
- Querying Kafka metadata using the GET method
Kafka REST Proxy supports different data formats for the messages, such as JSON, Avro, and binary. It also supports different content types for the requests and responses, such as JSON, XML, and CSV. You can specify the data format and the content type using the Content-Type and Accept headers in your HTTP requests.
Kafka REST Proxy also handles the serialization and deserialization of the messages, using Schema Registry, a service that stores and validates the schemas of the messages. Schema Registry ensures that the messages are compatible and consistent across different producers and consumers.
By using Kafka REST Proxy, you can benefit from the following advantages:
- You can use any language or tool that supports HTTP to interact with Kafka, without having to install or maintain a Kafka client library.
- You can leverage the existing features and tools of the web, such as authentication, caching, load balancing, and monitoring.
- You can simplify the integration of Kafka with other web services and applications, such as web browsers, mobile devices, and cloud platforms.
However, Kafka REST Proxy also has some limitations and trade-offs, such as:
- You may experience higher latency and lower throughput than using a native Kafka client, due to the additional network and processing overhead of HTTP and REST.
- You may lose some of the native features and functionalities of Kafka, such as message ordering, offset management, and consumer groups.
- You may encounter some compatibility and performance issues with different data formats and content types, especially with binary and Avro data.
Therefore, you should carefully evaluate your use case and requirements before deciding to use Kafka REST Proxy. In general, Kafka REST Proxy is more suitable for scenarios where you need to access Kafka from the web, or where you need to integrate Kafka with other web services and applications.
Now that you have a basic understanding of what Kafka REST Proxy is and what it can do, let’s see how to install and run it on your local machine.
3. How to Install and Run Kafka REST Proxy
In this section, you will learn how to install and run Kafka REST Proxy on your local machine. You will need to have Kafka and ZooKeeper running before you can start Kafka REST Proxy. If you don’t have them installed, you can follow the official Kafka quickstart guide to download and run them.
To install Kafka REST Proxy, you can use the Confluent Platform ZIP or TAR archive, which contains Kafka REST Proxy and other Confluent Platform components. You can download the latest version of the archive from the Confluent website. Alternatively, you can use the Confluent Platform Debian or RPM packages, which allow you to install Kafka REST Proxy and other Confluent Platform components using a package manager.
After you have downloaded and extracted the archive, or installed the packages, you can find the Kafka REST Proxy executable file in the bin directory. You can also find the Kafka REST Proxy configuration file in the etc/kafka-rest directory. The configuration file contains various settings that control the behavior of Kafka REST Proxy, such as the port, the host, the data format, and the schema registry URL. You can modify the configuration file according to your needs, or use the default settings for this tutorial.
To run Kafka REST Proxy, you can use the following command in your terminal:
bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties
This command will start Kafka REST Proxy with the configuration file specified. You should see some log messages indicating that Kafka REST Proxy is running and listening on a port. By default, the port is 8082, but you can change it in the configuration file.
Now that you have Kafka REST Proxy running, you can use your web browser or a tool like Postman to send HTTP requests to Kafka REST Proxy and receive responses. You can also use Python to interact with Kafka REST Proxy programmatically, which is what you will learn in the next section.
4. How to Use Python to Interact with Kafka REST Proxy
In this section, you will learn how to use Python to interact with Kafka REST Proxy programmatically. You will use the requests library, which is a simple and elegant way to make HTTP requests in Python. You will also use the json library, which is a built-in module that allows you to encode and decode JSON data in Python.
To use Python to interact with Kafka REST Proxy, you need to follow these steps:
- Import the requests and json libraries
- Define the base URL of Kafka REST Proxy, which is the host and port where Kafka REST Proxy is running. By default, the base URL is http://localhost:8082, but you can change it if you have a different configuration.
- Define the headers for your HTTP requests, which specify the data format and the content type of your requests and responses. For this tutorial, you will use JSON as the data format and the content type, so you will set the Content-Type and Accept headers to application/vnd.kafka.json.v2+json.
- Use the requests library to send HTTP requests to the different endpoints and methods of Kafka REST Proxy, passing the base URL, the headers, and the data as parameters.
- Use the json library to parse the responses from Kafka REST Proxy, which are also in JSON format. You can access the status code, the headers, and the content of the responses using the attributes of the requests library.
- Handle any errors or exceptions that may occur during the HTTP requests or the JSON parsing, using the built-in features of Python.
To illustrate these steps, you will use Python to perform the same operations that you did in the previous section using your web browser or Postman, such as producing and consuming messages, managing topics, and querying metadata. You will also see the code snippets and the output for each operation.
Let’s start by importing the requests and json libraries, defining the base URL and the headers, and testing the connection to Kafka REST Proxy.
# Import the requests and json libraries
import requests
import json
# Define the base URL of Kafka REST Proxy
base_url = "http://localhost:8082"
# Define the headers for the HTTP requests
headers = {
"Content-Type": "application/vnd.kafka.json.v2+json",
"Accept": "application/vnd.kafka.json.v2+json"
}
# Test the connection to Kafka REST Proxy by sending a GET request to the root endpoint
response = requests.get(base_url, headers=headers)
# Print the status code, the headers, and the content of the response
print(f"Status code: {response.status_code}")
print(f"Headers: {response.headers}")
print(f"Content: {response.content}")
# Parse the content of the response as JSON
content = json.loads(response.content)
# Print the parsed content
print(f"Parsed content: {content}")
The output of this code snippet should look something like this:
Status code: 200
Headers: {'Date': 'Wed, 17 Nov 2021 14:16:23 GMT', 'Content-Type': 'application/vnd.kafka.v2+json', 'Vary': 'Accept-Encoding, User-Agent', 'Content-Length': '110', 'Server': 'Jetty(9.4.43.v20210629)'}
Content: b'{"version":"6.2.1-ccs","commit":"f8f0a0f","kafka_cluster_id":"Yw7x7rQcT1y0m3Zx0X1JGg"}'
Parsed content: {'version': '6.2.1-ccs', 'commit': 'f8f0a0f', 'kafka_cluster_id': 'Yw7x7rQcT1y0m3Zx0X1JGg'}
This output shows that the connection to Kafka REST Proxy is successful, and that the response contains some information about the version, the commit, and the Kafka cluster ID of Kafka REST Proxy.
Now you are ready to use Python to interact with Kafka REST Proxy for various operations. In the next subsection, you will learn how to produce messages to Kafka topics using Python and Kafka REST Proxy.
4.1. Producing Messages to Kafka Topics
In this subsection, you will learn how to use Python and Kafka REST Proxy to produce messages to Kafka topics. You will use the /topics/{topic_name} endpoint of Kafka REST Proxy, which allows you to send messages to a specific topic using the POST method.
To produce messages to Kafka topics using Python and Kafka REST Proxy, you need to follow these steps:
- Create a Kafka topic that you want to produce messages to, or use an existing topic. You can use the kafka-topics.sh script to create topics from the command line, or use the /topics endpoint of Kafka REST Proxy, which you will learn in a later subsection.
- Define the data for your messages, which is a JSON object that contains an array of records. Each record is a JSON object that contains a key and a value, which are the key and the value of the message. Optionally, you can also specify a partition and a timestamp for each record, which are the partition and the timestamp of the message. If you don’t specify them, Kafka REST Proxy will assign them automatically.
- Use the requests library to send a POST request to the /topics/{topic_name} endpoint of Kafka REST Proxy, passing the base URL, the headers, and the data as parameters. Replace {topic_name} with the name of the topic that you want to produce messages to.
- Use the json library to parse the response from Kafka REST Proxy, which is a JSON object that contains an array of offsets. Each offset is a JSON object that contains the partition, the offset, the timestamp, and the error code of the message that was produced. You can use these offsets to track the status and the location of your messages in Kafka.
- Handle any errors or exceptions that may occur during the POST request or the JSON parsing, using the built-in features of Python.
To illustrate these steps, you will use Python and Kafka REST Proxy to produce some messages to a Kafka topic named test. You will also see the code snippets and the output for each step.
Let’s start by creating the Kafka topic test, using the kafka-topics.sh script from the command line. You can use the following command in your terminal:
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
This command will create a Kafka topic named test, with the default number of partitions and replication factor, and using the default bootstrap server on localhost:9092. You should see a message indicating that the topic is created successfully.
Next, define the data for your messages, which is a JSON object that contains an array of records. For this example, you will use three records, each with a key and a value. You can use the following code snippet in your Python script:
# Define the data for the messages
data = {
"records": [
{
"key": "key1",
"value": "value1"
},
{
"key": "key2",
"value": "value2"
},
{
"key": "key3",
"value": "value3"
}
]
}
Then, use the requests library to send a POST request to the /topics/test endpoint of Kafka REST Proxy, passing the base URL, the headers, and the data as parameters. You can use the following code snippet in your Python script:
# Send a POST request to the /topics/test endpoint of Kafka REST Proxy
response = requests.post(f"{base_url}/topics/test", headers=headers, data=json.dumps(data))
# Print the status code, the headers, and the content of the response
print(f"Status code: {response.status_code}")
print(f"Headers: {response.headers}")
print(f"Content: {response.content}")
The output of this code snippet should look something like this:
Status code: 200
Headers: {'Date': 'Wed, 17 Nov 2021 14:36:45 GMT', 'Content-Type': 'application/vnd.kafka.v2+json', 'Vary': 'Accept-Encoding, User-Agent', 'Content-Length': '174', 'Server': 'Jetty(9.4.43.v20210629)'}
Content: b'{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null},{"partition":0,"offset":1,"error_code":null,"error":null},{"partition":0,"offset":2,"error_code":null,"error":null}]}'
This output shows that the POST request is successful, and that the response contains an array of offsets for the messages that were produced. You can see that the messages were assigned to partition 0, and that the offsets are 0, 1, and 2, respectively. You can also see that there are no error codes or errors for the messages.
Finally, use the json library to parse the response from Kafka REST Proxy, which is a JSON object that contains an array of offsets. You can use the following code snippet in your Python script:
# Parse the content of the response as JSON
content = json.loads(response.content)
# Print the parsed content
print(f"Parsed content: {content}")
The output of this code snippet should look something like this:
Parsed content: {'offsets': [{'partition': 0, 'offset': 0, 'error_code': None, 'error': None}, {'partition': 0, 'offset': 1, 'error_code': None, 'error': None}, {'partition': 0, 'offset': 2, 'error_code': None, 'error': None}]}
This output shows the same information as the previous output, but in a more readable format. You can use this information to track the status and the location of your messages in Kafka.
Congratulations, you have successfully produced messages to Kafka topics using Python and Kafka REST Proxy. In the next subsection, you will learn how to consume messages from Kafka topics using Python and Kafka REST Proxy.
4.2. Consuming Messages from Kafka Topics
In the previous section, you learned how to produce messages to Kafka topics using the /topics/{topic_name} endpoint of the Kafka REST Proxy API. In this section, you will learn how to consume messages from Kafka topics using the /consumers/{consumer_group}/instances/{consumer_instance} endpoint.
To consume messages from Kafka topics using Kafka REST Proxy, you need to follow these steps:
- Create a consumer instance using the POST method and specify the consumer group, the data format, and the auto-offset-reset policy.
- Subscribe to one or more topics using the POST method and specify the topic names.
- Poll for messages using the GET method and specify the maximum number of messages to return.
- Commit the offsets using the POST method and specify the offset values.
- Delete the consumer instance using the DELETE method and specify the consumer group and the consumer instance.
Let’s see how to perform each of these steps using Python and the requests library.
Creating a consumer instance
To create a consumer instance, you need to send a POST request to the /consumers/{consumer_group} endpoint, where {consumer_group} is the name of the consumer group that you want to join. You also need to provide a JSON payload that contains the following fields:
- name: The name of the consumer instance. This must be unique within the consumer group.
- format: The data format of the messages. This can be json, binary, or avro.
- auto.offset.reset: The policy to use when there is no initial offset or the offset is out of range. This can be earliest, latest, or none.
For example, to create a consumer instance named my-consumer that belongs to the consumer group my-group, uses the JSON data format, and starts from the earliest offset, you can use the following code:
import requests
base_url = "http://localhost:8082" # The base URL of the Kafka REST Proxy
consumer_group = "my-group" # The name of the consumer group
consumer_name = "my-consumer" # The name of the consumer instance
data_format = "json" # The data format of the messages
offset_policy = "earliest" # The auto-offset-reset policy
# The endpoint to create a consumer instance
consumer_url = f"{base_url}/consumers/{consumer_group}"
# The JSON payload to create a consumer instance
consumer_data = {
"name": consumer_name,
"format": data_format,
"auto.offset.reset": offset_policy
}
# The headers to create a consumer instance
consumer_headers = {
"Content-Type": "application/vnd.kafka.v2+json"
}
# Send a POST request to create a consumer instance
consumer_response = requests.post(consumer_url, json=consumer_data, headers=consumer_headers)
# Check the status code of the response
if consumer_response.status_code == 200:
# Get the JSON data from the response
consumer_json = consumer_response.json()
# Get the base URI of the consumer instance
consumer_instance_url = consumer_json["base_uri"]
# Print the base URI of the consumer instance
print(f"Consumer instance created: {consumer_instance_url}")
else:
# Print the status code and the error message
print(f"Error: {consumer_response.status_code}, {consumer_response.text}")
If the request is successful, you will get a JSON response that contains the base_uri field, which is the base URI of the consumer instance. You will use this URI to perform the other operations on the consumer instance, such as subscribing, polling, committing, and deleting.
Subscribing to topics
To subscribe to one or more topics, you need to send a POST request to the /consumers/{consumer_group}/instances/{consumer_instance}/subscription endpoint, where {consumer_group} is the name of the consumer group and {consumer_instance} is the name of the consumer instance. You also need to provide a JSON payload that contains the following field:
- topics: A list of topic names that you want to subscribe to.
For example, to subscribe to the topics my-topic-1 and my-topic-2, you can use the following code:
import requests
base_url = "http://localhost:8082" # The base URL of the Kafka REST Proxy
consumer_group = "my-group" # The name of the consumer group
consumer_name = "my-consumer" # The name of the consumer instance
topics = ["my-topic-1", "my-topic-2"] # The list of topic names to subscribe to
# The endpoint to subscribe to topics
subscribe_url = f"{base_url}/consumers/{consumer_group}/instances/{consumer_name}/subscription"
# The JSON payload to subscribe to topics
subscribe_data = {
"topics": topics
}
# The headers to subscribe to topics
subscribe_headers = {
"Content-Type": "application/vnd.kafka.v2+json"
}
# Send a POST request to subscribe to topics
subscribe_response = requests.post(subscribe_url, json=subscribe_data, headers=subscribe_headers)
# Check the status code of the response
if subscribe_response.status_code == 204:
# Print a success message
print(f"Subscribed to topics: {topics}")
else:
# Print the status code and the error message
print(f"Error: {subscribe_response.status_code}, {subscribe_response.text}")
If the request is successful, you will get a status code of 204, which means that the subscription was successful. You can also subscribe to a single topic by providing a list with one element, or to all topics by providing a wildcard character (*) as the topic name.
Polling for messages
To poll for messages from the subscribed topics, you need to send a GET request to the /consumers/{consumer_group}/instances/{consumer_instance}/records endpoint, where {consumer_group} is the name of the consumer group and {consumer_instance} is the name of the consumer instance. You also need to provide a query parameter that specifies the following field:
- max_bytes: The maximum number of bytes to return in the response.
For example, to poll for messages with a maximum of 10 MB of data, you can use the following code:
import requests
base_url = "http://localhost:8082" # The base URL of the Kafka REST Proxy
consumer_group = "my-group" # The name of the consumer group
consumer_name = "my-consumer" # The name of the consumer instance
max_bytes = 10 * 1024 * 1024 # The maximum number of bytes to return (10 MB)
# The endpoint to poll for messages
poll_url = f"{base_url}/consumers/{consumer_group}/instances/{consumer_name}/records"
# The query parameter to poll for messages
poll_params = {
"max_bytes": max_bytes
}
# The headers to poll for messages
poll_headers = {
"Accept": "application/vnd.kafka.json.v2+json"
}
# Send a GET request to poll for messages
poll_response = requests.get(poll_url, params=poll_params, headers=poll_headers)
# Check the status code of the response
if poll_response.status_code == 200:
# Get the JSON data from the response
poll_json = poll_response.json()
# Print the number of messages returned
print(f"Number of messages: {len(poll_json)}")
# Loop through the messages
for message in poll_json:
# Get the topic name, partition number, offset value, key, and value of the message
topic = message["topic"]
partition = message["partition"]
offset = message["offset"]
key = message["key"]
value = message["value"]
# Print the message details
print(f"Topic: {topic}, Partition: {partition}, Offset: {offset}, Key: {key}, Value: {value}")
else:
# Print the status code and the error message
print(f"Error: {poll_response.status_code}, {poll_response.text}")
If the request is successful, you will get a JSON response that contains a list of messages, each with the following fields:
- topic: The name of the topic that the message belongs to.
- partition: The number of the partition that the message belongs to.
- offset: The offset value of the message
4.3. Managing Kafka Topics
In the previous sections, you learned how to produce and consume messages to and from Kafka topics using the Kafka REST Proxy API. In this section, you will learn how to manage Kafka topics using the same API. You will learn how to create, delete, list, and describe Kafka topics using different endpoints and methods.
To manage Kafka topics using Kafka REST Proxy, you need to use the following endpoints and methods:
- To create a topic, use the POST method on the /topics endpoint and specify the topic name, the number of partitions, and the replication factor.
- To delete a topic, use the DELETE method on the /topics/{topic_name} endpoint and specify the topic name.
- To list all topics, use the GET method on the /topics endpoint and get a list of topic names.
- To describe a topic, use the GET method on the /topics/{topic_name} endpoint and specify the topic name. You will get information about the topic, such as the number of partitions, the replication factor, and the partition details.
Let’s see how to perform each of these operations using Python and the requests library.
Creating a topic
To create a topic, you need to send a POST request to the /topics endpoint and provide a JSON payload that contains the following fields:
- name: The name of the topic that you want to create. This must be unique and valid.
- partitions: The number of partitions that you want to create for the topic. This must be a positive integer.
- replication_factor: The replication factor that you want to use for the topic. This must be a positive integer and less than or equal to the number of brokers in the cluster.
For example, to create a topic named my-new-topic with 3 partitions and a replication factor of 2, you can use the following code:
import requests base_url = "http://localhost:8082" # The base URL of the Kafka REST Proxy topic_name = "my-new-topic" # The name of the topic to create partitions = 3 # The number of partitions to create replication_factor = 2 # The replication factor to use # The endpoint to create a topic create_url = f"{base_url}/topics" # The JSON payload to create a topic create_data = { "name": topic_name, "partitions": partitions, "replication_factor": replication_factor } # The headers to create a topic create_headers = { "Content-Type": "application/vnd.kafka.v2+json" } # Send a POST request to create a topic create_response = requests.post(create_url, json=create_data, headers=create_headers) # Check the status code of the response if create_response.status_code == 204: # Print a success message print(f"Topic created: {topic_name}") else: # Print the status code and the error message print(f"Error: {create_response.status_code}, {create_response.text}")
If the request is successful, you will get a status code of 204, which means that the topic was created. You can also create multiple topics by providing a list of topic configurations in the JSON payload.
Deleting a topic
To delete a topic, you need to send a DELETE request to the /topics/{topic_name} endpoint, where {topic_name} is the name of the topic that you want to delete.
For example, to delete the topic named my-new-topic that you created in the previous step, you can use the following code:
import requests base_url = "http://localhost:8082" # The base URL of the Kafka REST Proxy topic_name = "my-new-topic" # The name of the topic to delete # The endpoint to delete a topic delete_url = f"{base_url}/topics/{topic_name}" # Send a DELETE request to delete a topic delete_response = requests.delete(delete_url) # Check the status code of the response if delete_response.status_code == 204: # Print a success message print(f"Topic deleted: {topic_name}") else: # Print the status code and the error message print(f"Error: {delete_response.status_code}, {delete_response.text}")
If the request is successful, you will get a status code of 204, which means that the topic was deleted. You can also delete multiple topics by sending multiple DELETE requests to the corresponding endpoints.
Listing all topics
To list all topics, you need to send a GET request to the /topics endpoint. You will get a JSON response that contains a list of topic names.
For example, to list all topics, you can use the following code:
import requests base_url = "http://localhost:8082" # The base URL of the Kafka REST Proxy # The endpoint to list all topics list_url = f"{base_url}/topics" # The headers to list all topics list_headers = { "Accept": "application/vnd.kafka.v2+json" } # Send a GET request to list all topics list_response = requests.get(list_url, headers=list_headers) # Check the status code of the response if list_response.status_code == 200: # Get the JSON data from the response list_json = list_response.json() # Print the number of topics returned print(f"Number of topics: {len(list_json)}") # Loop through the topic names for topic in list_json: # Print the topic name print(f"Topic: {topic}") else: # Print the status code and the error message print(f"Error: {list_response.status_code}, {list_response.text}")
If the request is successful, you will get a JSON response that contains a list of topic names. You can also filter the topics by providing a query parameter that specifies a prefix for the topic names.
Describing a topic
To describe a topic, you need to send a GET request to the /topics/{topic_name} endpoint, where {topic_name} is the name of the topic that you want to describe. You will get a JSON response that contains information about the topic, such as the number of partitions, the replication factor, and the partition details.
For example, to describe the topic named my-topic-1, you can use the following code:
import requests base_url = "http://localhost:8082" # The base URL of the Kafka REST Proxy topic_name = "my-topic-1" # The name of the topic to describe # The endpoint to describe a topic describe_url = f"{base_url}/topics/{topic_name}" # The headers to describe a topic describe_headers = { "Accept": "application/vnd.kafka.v2+json" } # Send a GET request to describe a topic describe_response = requests.get(describe_url, headers=describe_headers) # Check the status code of the response if describe_response.status_code == 200: # Get the JSON data from the response describe_json = describe_response.json() # Get the number of partitions, the replication factor, and the partition details of the topic partitions = describe_json["partitions"] replication_factor = describe_json["replication_factor"] partition_details = describe_json["partition_details"] # Print the topic information print(f"Topic: {topic_name}") print(f"Number of partitions: {partitions}") print(f"Replication factor: {replication_factor}") # Loop through the partition details for partition in partition_details: # Get the partition number, the leader broker, the replica brokers, and the in-sync replica brokers of the partition partition_number = partition["partition"] leader = partition["leader"] replicas = partition["replicas"] in_sync_replicas = partition["in_sync_replicas"] # Print the partition details print(f"Partition: {partition_number}") print(f"Leader: {leader}") print(f"Replicas: {replicas}") print(f"In-sync replicas: {in_sync_replicas}") else: # Print the status code and the error message print(f"Error: {describe_response.status_code}, {describe_response.text}")
If the request is successful, you will get a JSON response that contains information about the topic, such as the number of partitions, the replication factor, and the partition details. You can also describe multiple topics by sending multiple GET requests to the corresponding endpoints.
Now that you have learned how to manage Kafka
4.4. Querying Kafka Metadata
In this section, you will learn how to use the Kafka REST Proxy API to query the metadata of Kafka, such as the list of brokers, the list of topics, and the details of partitions. You will use the /brokers and /partitions endpoints, which support the GET method.
Querying the metadata of Kafka can be useful for various purposes, such as monitoring the status and health of Kafka, debugging issues, and optimizing performance. For example, you can use the metadata to check the availability and load of the brokers, the distribution and replication of the topics and partitions, and the offsets and lag of the consumers.
To query the metadata of Kafka, you will need to do the following steps:
- Import the requests library and define the base URL of the Kafka REST Proxy
- Send a GET request to the /brokers endpoint to get the list of brokers
- Send a GET request to the /topics endpoint to get the list of topics
- Send a GET request to the /topics/{topic_name}/partitions endpoint to get the details of partitions for a specific topic
- Parse and print the response data in JSON format
Let’s see how to do each step in Python.
1. Import the requests library and define the base URL of the Kafka REST Proxy
The first step is to import the requests library, which you will use to send HTTP requests to the Kafka REST Proxy. You also need to define the base URL of the Kafka REST Proxy, which is the URL of the server where the Kafka REST Proxy is running, followed by the port number and the API version. In this tutorial, we assume that the Kafka REST Proxy is running on the local machine, on port 8082, and using the API version 2.
import requests base_url = "http://localhost:8082/v2"
2. Send a GET request to the /brokers endpoint to get the list of brokers
The next step is to send a GET request to the /brokers endpoint, which returns the list of brokers in the Kafka cluster. The brokers are identified by their IDs, which are integers. You can use the requests.get() function to send the GET request, and pass the URL of the endpoint as the first argument. You also need to specify the content type and the accept headers as “application/vnd.kafka.v2+json”, which indicates that you are using the JSON format for the request and the response.
brokers_url = base_url + "/brokers" headers = { "Content-Type": "application/vnd.kafka.v2+json", "Accept": "application/vnd.kafka.v2+json" } response = requests.get(brokers_url, headers=headers)
3. Send a GET request to the /topics endpoint to get the list of topics
The next step is to send a GET request to the /topics endpoint, which returns the list of topics in the Kafka cluster. The topics are identified by their names, which are strings. You can use the same requests.get() function to send the GET request, and pass the URL of the endpoint as the first argument. You also need to use the same headers as before.
topics_url = base_url + "/topics" response = requests.get(topics_url, headers=headers)
4. Send a GET request to the /topics/{topic_name}/partitions endpoint to get the details of partitions for a specific topic
The next step is to send a GET request to the /topics/{topic_name}/partitions endpoint, which returns the details of partitions for a specific topic. The partitions are identified by their IDs, which are integers. The details include the leader, the replicas, and the in-sync replicas of each partition. You can use the same requests.get() function to send the GET request, and pass the URL of the endpoint as the first argument. You also need to use the same headers as before. You need to replace the {topic_name} placeholder with the actual name of the topic that you want to query. In this tutorial, we assume that there is a topic named “test” in the Kafka cluster.
topic_name = "test" partitions_url = base_url + "/topics/" + topic_name + "/partitions" response = requests.get(partitions_url, headers=headers)
5. Parse and print the response data in JSON format
The final step is to parse and print the response data in JSON format. You can use the response.json() method to parse the response data as a Python dictionary. You can then use the print() function to print the data, or use the json.dumps() function to format the data as a JSON string. You can also check the status code of the response, which indicates whether the request was successful or not. A status code of 200 means that the request was successful, while a status code of 4xx or 5xx means that there was an error.
import json data = response.json() print(json.dumps(data, indent=4)) status_code = response.status_code print("Status code:", status_code)
If everything goes well, you should see something like this in the output:
[ { "partition": 0, "leader": 1, "replicas": [ { "broker": 1, "leader": true, "in_sync": true } ] }, { "partition": 1, "leader": 1, "replicas": [ { "broker": 1, "leader": true, "in_sync": true } ] } ] Status code: 200
This means that the topic “test” has two partitions, and both of them have the broker 1 as the leader and the only replica.
Congratulations! You have successfully queried the metadata of Kafka using the Kafka REST Proxy API. You have learned how to use the /brokers and /partitions endpoints, and how to handle the responses and errors. You can use the same approach to query other endpoints and methods of the Kafka REST Proxy API, such as the /consumers and /topics/{topic_name}/configs endpoints. You can find the full documentation of the Kafka REST Proxy API here.
In the next and final section, you will learn how to conclude your tutorial and provide some useful resources and tips for further learning.
5. Conclusion
You have reached the end of this tutorial on how to use Kafka REST Proxy with Python to access Kafka via HTTP. You have learned how to:
- Install and run Kafka REST Proxy on your local machine
- Use Python to interact with Kafka REST Proxy using the requests library
- Produce messages to Kafka topics using the /topics/{topic_name} endpoint
- Consume messages from Kafka topics using the /consumers/{consumer_group}/instances/{consumer_instance} endpoint
- Manage Kafka topics using the /topics and /topics/{topic_name} endpoints
- Query Kafka metadata using the /brokers and /partitions endpoints
By using Kafka REST Proxy, you have gained the ability to access Kafka via HTTP, using a RESTful API. This can be useful for scenarios where you need to access Kafka from the web, or where you need to integrate Kafka with other web services and applications. You have also leveraged the existing features and tools of the web, such as authentication, caching, load balancing, and monitoring.
However, you have also learned that Kafka REST Proxy has some limitations and trade-offs, such as higher latency, lower throughput, and reduced functionality compared to using a native Kafka client. Therefore, you should carefully evaluate your use case and requirements before deciding to use Kafka REST Proxy.
We hope that this tutorial has been helpful and informative for you. If you want to learn more about Kafka REST Proxy, you can check out the following resources:
- The official documentation of the Kafka REST Proxy and the Kafka REST Proxy API
- The official documentation of the Schema Registry, which is used by Kafka REST Proxy to handle the schemas of the messages
- The official documentation of the requests library for Python, which is used to send HTTP requests to Kafka REST Proxy
- A blog post by Confluent on Kafka REST Proxy for MapR Streams, which explains the benefits and challenges of using Kafka REST Proxy
- A blog post by DataFlair on Kafka REST Proxy, which provides a detailed overview and examples of using Kafka REST Proxy
Thank you for reading this tutorial. We hope that you have enjoyed it and learned something new. If you have any questions or feedback, please feel free to leave a comment below. Happy coding!