How to Use Kafka Schema Registry with Python to Manage Schemas

This blog teaches you how to use Kafka Schema Registry with Python to manage schemas for your data. You will learn how to use different schema formats, such as Avro, JSON, or Protobuf, and how to perform schema evolution and compatibility checks.

1. Introduction

In this tutorial, you will learn how to use Kafka Schema Registry with Python to manage schemas for your data. You will also learn how to use different schema formats, such as Avro, JSON, or Protobuf, and how to perform schema evolution and compatibility checks.

But first, what is Kafka Schema Registry and why do you need it?

Kafka Schema Registry is a service that allows you to store and retrieve schemas for your data. Schemas are definitions of the structure and types of your data, such as fields, values, and metadata. Schemas help you to validate, serialize, and deserialize your data, as well as to ensure compatibility between different versions of your data.

By using Kafka Schema Registry, you can avoid hard-coding your schemas in your code, which can lead to errors and inconsistencies. You can also centralize your schemas in a single place, which makes it easier to manage and update them. You can also leverage the features of Kafka Schema Registry, such as schema validation, schema evolution, and schema compatibility, to ensure the quality and consistency of your data.

In this tutorial, you will use Python as the programming language to interact with Kafka Schema Registry. Python is a popular and versatile language that has many libraries and frameworks for working with data. You will use the confluent-kafka-python library, which is a wrapper around the librdkafka library, to produce and consume data with Kafka. You will also use the schema-registry library, which is a Python client for Kafka Schema Registry, to register, retrieve, and delete schemas.

By the end of this tutorial, you will be able to:

  • Install and run Kafka Schema Registry
  • Use Kafka Schema Registry with Python to register, retrieve, and delete schemas
  • Use different schema formats, such as Avro, JSON, or Protobuf, to produce and consume data with Kafka
  • Perform schema evolution and compatibility checks with Kafka Schema Registry

Ready to get started? Let’s go!

2. What is Kafka Schema Registry?

Kafka Schema Registry is a service that allows you to store and retrieve schemas for your data. Schemas are definitions of the structure and types of your data, such as fields, values, and metadata. Schemas help you to validate, serialize, and deserialize your data, as well as to ensure compatibility between different versions of your data.

Why do you need schemas for your data? Imagine you have a producer that sends data to a Kafka topic, and a consumer that reads data from the same topic. How do you make sure that the producer and the consumer agree on the format and meaning of the data? How do you handle changes in the data over time? How do you avoid data corruption or loss due to mismatched schemas?

One way to solve these problems is to use schemas. Schemas allow you to define the structure and types of your data, and to enforce rules and constraints on your data. For example, you can use schemas to specify that a field must be a string, a number, or a boolean, and that it must have a certain length, range, or pattern. You can also use schemas to specify default values, optional fields, or custom validations.

By using schemas, you can ensure that your data is consistent, valid, and compatible across different applications and systems. You can also reduce the risk of data corruption or loss due to schema mismatch, as schemas allow you to detect and handle schema changes gracefully.

However, using schemas also introduces some challenges. How do you store and manage your schemas? How do you share your schemas across different applications and systems? How do you handle schema evolution and compatibility? How do you avoid schema duplication or inconsistency?

This is where Kafka Schema Registry comes in. Kafka Schema Registry is a service that allows you to store and retrieve schemas for your data. You can use Kafka Schema Registry to register, update, delete, and fetch schemas for your data. You can also use Kafka Schema Registry to perform schema validation, schema evolution, and schema compatibility checks.

Kafka Schema Registry supports different schema formats, such as Avro, JSON, or Protobuf. Avro is a binary format that is compact and efficient, and supports schema evolution and compatibility. JSON is a text format that is human-readable and widely used, but does not support schema evolution and compatibility natively. Protobuf is a binary format that is similar to Avro, but requires explicit schema definition and compilation.

In this tutorial, you will learn how to use Kafka Schema Registry with Python to manage schemas for your data. You will also learn how to use different schema formats, such as Avro, JSON, or Protobuf, and how to perform schema evolution and compatibility checks.

3. How to Install and Run Kafka Schema Registry

Before you can use Kafka Schema Registry with Python, you need to install and run Kafka Schema Registry on your machine. In this section, you will learn how to do that.

Kafka Schema Registry is a component of Confluent Platform, which is a distribution of Apache Kafka that provides additional features and tools. You can download Confluent Platform from here. You can choose either the free community edition or the enterprise edition, depending on your needs and preferences.

After you download Confluent Platform, you need to unzip the file and navigate to the directory where it is located. For example, if you download Confluent Platform 6.2.0 for Linux, you can unzip the file and go to the directory with the following commands:

tar xzf confluent-6.2.0.tar.gz
cd confluent-6.2.0

Next, you need to start the Confluent services, including Kafka Schema Registry. You can do that with the following command:

./bin/confluent local services start

This command will start the following services: ZooKeeper, Kafka, Schema Registry, Kafka Connect, ksqlDB, Control Center, and REST Proxy. You can check the status of the services with the following command:

./bin/confluent local services status

You should see something like this:

control-center is [UP]
ksql-server is [UP]
kafka-rest is [UP]
schema-registry is [UP]
kafka is [UP]
zookeeper is [UP]
connect is [UP]

If you see any service that is not up, you can try to restart it with the following command:

./bin/confluent local services restart 

For example, if you want to restart Schema Registry, you can use this command:

./bin/confluent local services restart schema-registry

By default, Kafka Schema Registry runs on port 8081. You can verify that it is running by sending a GET request to the following URL:

http://localhost:8081/subjects

You should get an empty JSON array as a response, indicating that there are no schemas registered yet. You can use a tool like curl or Postman to send the request.

Congratulations, you have successfully installed and run Kafka Schema Registry on your machine. You are now ready to use it with Python.

4. How to Use Kafka Schema Registry with Python

In this section, you will learn how to use Kafka Schema Registry with Python to register, retrieve, and delete schemas for your data. You will also learn how to use different schema formats, such as Avro, JSON, or Protobuf, to produce and consume data with Kafka.

To use Kafka Schema Registry with Python, you need to install two libraries: confluent-kafka-python and schema-registry. The former is a wrapper around the librdkafka library, which provides low-level access to Kafka. The latter is a Python client for Kafka Schema Registry, which provides high-level methods to interact with schemas.

You can install both libraries using pip, which is a package manager for Python. You can use the following commands to install them:

pip install confluent-kafka
pip install schema-registry

Alternatively, you can use conda, which is another package manager for Python. You can use the following commands to install them:

conda install -c conda-forge confluent-kafka
conda install -c conda-forge schema-registry

After you install the libraries, you need to import them in your Python code. You can use the following statements to import them:

from confluent_kafka import Producer, Consumer, avro
from schema_registry.client import SchemaRegistryClient, schema

The Producer and Consumer classes are used to produce and consume data with Kafka. The avro module is used to work with Avro schemas and data. The SchemaRegistryClient class is used to create a client object that can communicate with Kafka Schema Registry. The schema module is used to create and manipulate schema objects for different formats.

Now that you have imported the libraries, you can start using Kafka Schema Registry with Python. In the next subsections, you will learn how to use different schema formats, such as Avro, JSON, or Protobuf, to produce and consume data with Kafka.

4.1. How to Produce and Consume Data with Avro Schema

Avro is a binary format that is compact and efficient, and supports schema evolution and compatibility. In this subsection, you will learn how to produce and consume data with Avro schema using Kafka Schema Registry and Python.

To produce data with Avro schema, you need to do the following steps:

  1. Create an Avro schema object that defines the structure and types of your data.
  2. Register the Avro schema with Kafka Schema Registry and get the schema ID.
  3. Serialize your data using the Avro schema and the schema ID.
  4. Send the serialized data to a Kafka topic.

To create an Avro schema object, you can use the schema.AvroSchema class from the schema-registry library. You can pass a string that contains the JSON representation of your schema to the constructor of the class. For example, if you want to create a schema for a user record that has three fields: name, age, and email, you can use the following code:

from schema_registry.client import schema

avro_schema = schema.AvroSchema('''
{
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "int"},
        {"name": "email", "type": "string"}
    ]
}
''')

To register the Avro schema with Kafka Schema Registry, you need to create a SchemaRegistryClient object that can communicate with the Schema Registry service. You can pass the URL of the Schema Registry service to the constructor of the class. For example, if the Schema Registry service is running on localhost:8081, you can use the following code:

from schema_registry.client import SchemaRegistryClient

client = SchemaRegistryClient(url='http://localhost:8081')

Then, you can use the register method of the client object to register the Avro schema and get the schema ID. You need to pass the name of the subject and the schema object to the method. The subject is a logical name that associates the schema with a Kafka topic. For example, if you want to register the Avro schema for a topic named “users”, you can use the following code:

subject = 'users-value'
schema_id = client.register(subject, avro_schema)

The register method will return the schema ID, which is a unique identifier for the schema. You can also use the get_id method of the client object to get the schema ID by passing the subject and the schema object. For example, you can use the following code:

schema_id = client.get_id(subject, avro_schema)

To serialize your data using the Avro schema and the schema ID, you can use the avro.AvroProducer class from the confluent-kafka-python library. You can pass a dictionary that contains the configuration parameters for the producer to the constructor of the class. For example, you can use the following code:

from confluent_kafka import avro

producer_config = {
    'bootstrap.servers': 'localhost:9092',
    'schema.registry.url': 'http://localhost:8081'
}

producer = avro.AvroProducer(producer_config)

Then, you can use the produce method of the producer object to send the serialized data to a Kafka topic. You need to pass the name of the topic, the value of the data, the value schema object, and the value schema ID to the method. For example, if you want to send a user record with the name “Alice”, the age 25, and the email “alice@example.com” to the topic “users”, you can use the following code:

value = {
    'name': 'Alice',
    'age': 25,
    'email': 'alice@example.com'
}

producer.produce(topic='users', value=value, value_schema=avro_schema, value_schema_id=schema_id)
producer.flush()

The flush method of the producer object will ensure that all the messages are delivered to the Kafka broker.

To consume data with Avro schema, you need to do the following steps:

  1. Create a SchemaRegistryClient object that can communicate with the Schema Registry service.
  2. Get the Avro schema and the schema ID for the topic that you want to consume from.
  3. Create an avro.AvroConsumer object that can deserialize the data using the Avro schema and the schema ID.
  4. Subscribe to the Kafka topic and poll for messages.

To create a SchemaRegistryClient object, you can use the same code as before:

from schema_registry.client import SchemaRegistryClient

client = SchemaRegistryClient(url='http://localhost:8081')

To get the Avro schema and the schema ID for the topic that you want to consume from, you can use the get_latest_version method of the client object. You need to pass the name of the subject to the method. The subject is the same as the one that you used to register the schema. For example, if you want to consume from the topic “users”, you can use the following code:

subject = 'users-value'
schema_id, avro_schema, version = client.get_latest_version(subject)

The get_latest_version method will return the schema ID, the schema object, and the version number of the schema.

To create an avro.AvroConsumer object that can deserialize the data using the Avro schema and the schema ID, you can use the same code as before:

from confluent_kafka import avro

consumer_config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'user-consumer',
    'schema.registry.url': 'http://localhost:8081'
}

consumer = avro.AvroConsumer(consumer_config)

Then, you can use the subscribe and poll methods of the consumer object to subscribe to the Kafka topic and poll for messages. You need to pass a list of topic names to the subscribe method, and a timeout value to the poll method. For example, if you want to consume from the topic “users”, you can use the following code:

consumer.subscribe(['users'])

while True:
    message = consumer.poll(1.0)
    if message is None:
        continue
    if message.error():
        print(f"Error: {message.error()}")
        break
    else:
        print(f"Received message: {message.value()}")

The poll method will return a Message object that contains the deserialized data in the value attribute. You can access the data as a Python dictionary.

Congratulations, you have learned how to produce and consume data with Avro schema using Kafka Schema Registry and Python. In the next subsection, you will learn how to use JSON schema to produce and consume data with Kafka.

4.2. How to Produce and Consume Data with JSON Schema

In this section, you will learn how to produce and consume data with JSON schema using Kafka Schema Registry and Python. JSON schema is a text format that is human-readable and widely used, but does not support schema evolution and compatibility natively. You will use the JSON Schema standard to define and validate your data, and the JSON Schema Serializer and JSON Schema Deserializer to serialize and deserialize your data with Kafka.

To produce and consume data with JSON schema, you will need to do the following steps:

  1. Create a JSON schema for your data and register it with Kafka Schema Registry
  2. Create a producer that sends data with JSON schema to a Kafka topic
  3. Create a consumer that reads data with JSON schema from a Kafka topic

Let’s see how to do each step in detail.

Create a JSON schema for your data and register it with Kafka Schema Registry

The first step is to create a JSON schema for your data and register it with Kafka Schema Registry. A JSON schema is a JSON document that describes the structure and types of your data, such as fields, values, and metadata. You can use the JSON Schema documentation to learn how to write a JSON schema.

For example, suppose you want to send and receive data about users, such as their name, age, and email. You can create a JSON schema for your user data as follows:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "User",
  "type": "object",
  "properties": {
    "name": {
      "type": "string",
      "minLength": 1
    },
    "age": {
      "type": "integer",
      "minimum": 0
    },
    "email": {
      "type": "string",
      "format": "email"
    }
  },
  "required": ["name", "age", "email"]
}

This JSON schema defines a user object with three properties: name, age, and email. It also specifies the type, format, and constraints of each property, and the required properties for a valid user object.

Once you have created your JSON schema, you need to register it with Kafka Schema Registry. You can use the schema-registry library, which is a Python client for Kafka Schema Registry, to register, retrieve, and delete schemas. You can install the library with the following command:

pip install confluent-kafka[schema-registry]

To register your JSON schema with Kafka Schema Registry, you need to create a JSONSchema object and pass it to the register_schema method of the SchemaRegistryClient object. You also need to provide the subject name and the schema version. The subject name is a unique identifier for your schema, and the schema version is a number that indicates the evolution of your schema. You can use the following code to register your JSON schema:

from confluent_kafka.schema_registry import SchemaRegistryClient, JSONSchema

# Create a SchemaRegistryClient object
schema_registry_client = SchemaRegistryClient({"url": "http://localhost:8081"})

# Create a JSONSchema object
json_schema = JSONSchema('{"$schema": "http://json-schema.org/draft-07/schema#","title": "User","type": "object","properties": {"name": {"type": "string","minLength": 1},"age": {"type": "integer","minimum": 0},"email": {"type": "string","format": "email"}},"required": ["name", "age", "email"]}')

# Register the JSON schema with the subject name and the schema version
subject_name = "user-json"
schema_version = 1
registered_schema = schema_registry_client.register_schema(subject_name, json_schema, schema_version)

# Print the registered schema
print(registered_schema)

This code will register your JSON schema with the subject name “user-json” and the schema version 1, and print the registered schema. The registered schema will have a schema id, which is a unique identifier for your schema in Kafka Schema Registry. You will need this schema id later to produce and consume data with JSON schema.

Create a producer that sends data with JSON schema to a Kafka topic

The next step is to create a producer that sends data with JSON schema to a Kafka topic. You can use the confluent-kafka-python library, which is a wrapper around the librdkafka library, to produce and consume data with Kafka. You can install the library with the following command:

pip install confluent-kafka[json]

To create a producer that sends data with JSON schema, you need to create a SerializingProducer object and pass it a JSONSerializer object as the value serializer. The JSONSerializer object will serialize your data to JSON format and attach the schema id to the message. You also need to provide the bootstrap servers, the schema registry url, and the topic name. You can use the following code to create a producer that sends data with JSON schema:

from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry.json_schema import JSONSerializer
from confluent_kafka.serialization import StringSerializer

# Create a JSONSerializer object
value_serializer = JSONSerializer(json_schema, schema_registry_client)

# Create a SerializingProducer object
producer_conf = {
    "bootstrap.servers": "localhost:9092",
    "key.serializer": StringSerializer("utf_8"),
    "value.serializer": value_serializer
}
producer = SerializingProducer(producer_conf)

# Create a topic name
topic_name = "user-json-topic"

This code will create a producer that sends data with JSON schema to the topic name “user-json-topic”. To send data, you need to create a user object that matches your JSON schema, and pass it to the produce method of the SerializingProducer object. You also need to provide a key for your message, which can be any string. You can use the following code to send data with JSON schema:

# Create a user object
user = {"name": "Alice", "age": 25, "email": "alice@example.com"}

# Send data with JSON schema
producer.produce(topic=topic_name, key="user1", value=user)
producer.flush()

This code will send a user object with JSON schema to the topic name “user-json-topic” with the key “user1”. The producer will serialize the user object to JSON format and attach the schema id to the message. The message will look something like this:

{
  "schema_id": 1,
  "payload": {
    "name": "Alice",
    "age": 25,
    "email": "alice@example.com"
  }
}

The schema id is 1, which corresponds to the registered schema in Kafka Schema Registry. The payload is the user object in JSON format.

Create a consumer that reads data with JSON schema from a Kafka topic

The final step is to create a consumer that reads data with JSON schema from a Kafka topic. You can use the confluent-kafka-python library, which is a wrapper around the librdkafka library, to produce and consume data with Kafka. You can install the library with the following command:

pip install confluent-kafka[json]

To create a consumer that reads data with JSON schema, you need to create a DeserializingConsumer object and pass it a JSONDeserializer object as the value deserializer. The JSONDeserializer object will deserialize your data from JSON format and validate it against the schema. You also need to provide the bootstrap servers, the schema registry url, the group id, and the topic name. You can use the following code to

4.3. How to Produce and Consume Data with Protobuf Schema

In this section, you will learn how to produce and consume data with Protobuf schema using Kafka Schema Registry and Python. Protobuf schema is a binary format that is similar to Avro, but requires explicit schema definition and compilation. You will use the Protocol Buffers language to define and compile your schema, and the Protobuf Serializer and Protobuf Deserializer to serialize and deserialize your data with Kafka.

To produce and consume data with Protobuf schema, you will need to do the following steps:

  1. Create a Protobuf schema for your data and compile it to a Python module
  2. Register the Protobuf schema with Kafka Schema Registry
  3. Create a producer that sends data with Protobuf schema to a Kafka topic
  4. Create a consumer that reads data with Protobuf schema from a Kafka topic

Let’s see how to do each step in detail.

Create a Protobuf schema for your data and compile it to a Python module

The first step is to create a Protobuf schema for your data and compile it to a Python module. A Protobuf schema is a file that describes the structure and types of your data, using the Protocol Buffers language. You can use the Protocol Buffers documentation to learn how to write a Protobuf schema.

For example, suppose you want to send and receive data about users, such as their name, age, and email. You can create a Protobuf schema for your user data as follows:

syntax = "proto3";

message User {
  string name = 1;
  int32 age = 2;
  string email = 3;
}

This Protobuf schema defines a user message with three fields: name, age, and email. It also specifies the type and order of each field, using the proto3 syntax.

Once you have created your Protobuf schema, you need to compile it to a Python module, using the protoc compiler. The protoc compiler will generate a Python module that contains the classes and methods for your schema. You can install the protoc compiler with the following command:

pip install protobuf

To compile your Protobuf schema to a Python module, you need to run the protoc compiler with the --python_out option and the name of your schema file. You also need to provide the output directory for the generated Python module. You can use the following command to compile your Protobuf schema:

protoc --python_out=. user.proto

This command will compile your Protobuf schema from the file user.proto and generate a Python module named user_pb2.py in the current directory. The Python module will contain the User class and the methods for serializing and deserializing your data.

Register the Protobuf schema with Kafka Schema Registry

The next step is to register the Protobuf schema with Kafka Schema Registry. You can use the schema-registry library, which is a Python client for Kafka Schema Registry, to register, retrieve, and delete schemas. You can install the library with the following command:

pip install confluent-kafka[schema-registry]

To register your Protobuf schema with Kafka Schema Registry, you need to create a ProtobufSchema object and pass it to the register_schema method of the SchemaRegistryClient object. You also need to provide the subject name and the schema version. The subject name is a unique identifier for your schema, and the schema version is a number that indicates the evolution of your schema. You can use the following code to register your Protobuf schema:

from confluent_kafka.schema_registry import SchemaRegistryClient, ProtobufSchema

# Create a SchemaRegistryClient object
schema_registry_client = SchemaRegistryClient({"url": "http://localhost:8081"})

# Create a ProtobufSchema object
protobuf_schema = ProtobufSchema('syntax = "proto3";message User {string name = 1;int32 age = 2;string email = 3;}')

# Register the Protobuf schema with the subject name and the schema version
subject_name = "user-protobuf"
schema_version = 1
registered_schema = schema_registry_client.register_schema(subject_name, protobuf_schema, schema_version)

# Print the registered schema
print(registered_schema)

This code will register your Protobuf schema with the subject name “user-protobuf” and the schema version 1, and print the registered schema. The registered schema will have a schema id, which is a unique identifier for your schema in Kafka Schema Registry. You will need this schema id later to produce and consume data with Protobuf schema.

Create a producer that sends data with Protobuf schema to a Kafka topic

The next step is to create a producer that sends data with Protobuf schema to a Kafka topic. You can use the confluent-kafka-python library, which is a wrapper around the librdkafka library, to produce and consume data with Kafka. You can install the library with the following command:

pip install confluent-kafka[protobuf]

To create a producer that sends data with Protobuf schema, you need to create a SerializingProducer object and pass it a ProtobufSerializer object as the value serializer. The ProtobufSerializer object will serialize your data to Protobuf format and attach the schema id to the message. You also need to provide the bootstrap servers, the schema registry url, the topic name, and the Python module generated by the protoc compiler. You can use the following code to create a producer that sends data with Protobuf schema:

from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer
from confluent_kafka.serialization import StringSerializer
import user_pb2 # The Python module generated by the protoc compiler

# Create a ProtobufSerializer object
value_serializer = ProtobufSerializer(user_pb2.User, schema_registry_client)

# Create a SerializingProducer object
producer_conf = {
    "bootstrap.servers": "localhost:9092",
    "key.serializer": StringSerializer("utf_8"),
    "value.serializer": value_serializer
}
producer = SerializingProducer(producer_conf)

# Create a topic name
topic_name = "user-protobuf-topic"

This code will create a producer that sends data with Protobuf schema to the topic name “user-protobuf-topic”. To send data, you need to create a user object that matches your Protobuf schema, using the User class from the user_pb2 module, and pass it to the produce method of the SerializingProducer object. You also need to provide a key for your message, which can be any string. You can use the following code to send data with Protobuf schema:

# Create a user object
user = user_pb2.User(name="Bob", age=30, email="bob@example.com")

# Send data with Protobuf schema
producer.produce(topic=topic_name, key="user2", value=user)
producer.flush()

This code will send a user object with Protobuf schema to the topic name “user-protobuf-topic” with the key “user2”. The producer will serialize the user object to Protobuf format and attach the schema id to the message. The message will look something like this:

{
  "schema_id": 2,
  "payload": b'\n\x03Bob\x10\x1e\x1a\x0fbob@example.com'
}

The schema id is 2, which corresponds to the registered schema in Kafka Schema Registry. The payload is the user object in Protobuf format.

Create a consumer that reads data with Protobuf schema from a Kafka

5. How to Perform Schema Evolution and Compatibility Checks

Schema evolution is the process of changing the schema of your data over time. Schema evolution can happen for various reasons, such as adding new features, fixing bugs, or adapting to changing requirements. Schema evolution can also affect the compatibility of your data, which is the ability of different versions of your data to work together.

How do you handle schema evolution and compatibility with Kafka Schema Registry? Kafka Schema Registry provides a feature called schema compatibility, which allows you to define and enforce rules on how your schemas can evolve. Schema compatibility helps you to avoid breaking changes in your data, such as data loss, data corruption, or data inconsistency.

Kafka Schema Registry supports different types of schema compatibility, such as:

  • BACKWARD: A new schema is backward compatible if it can be used to read data written by the previous schema.
  • FORWARD: A new schema is forward compatible if the previous schema can be used to read data written by the new schema.
  • FULL: A new schema is full compatible if it is both backward and forward compatible.
  • NONE: A new schema is none compatible if it does not have any compatibility requirements.

You can set the schema compatibility type for each subject (topic-value or topic-key) in Kafka Schema Registry. You can also change the schema compatibility type at any time. By default, the schema compatibility type is BACKWARD.

How do you check the schema compatibility with Kafka Schema Registry? Kafka Schema Registry provides a REST API that allows you to test the compatibility of a new schema against the latest registered schema for a given subject. You can use the POST /compatibility/subjects/(string: subject)/versions/(versionId: version) endpoint to perform the compatibility check. You can also use the schema-registry library to perform the compatibility check programmatically.

In this section, you will learn how to perform schema evolution and compatibility checks with Kafka Schema Registry. You will use the Avro schema format as an example, but you can also apply the same principles to other schema formats, such as JSON or Protobuf.

Here are the steps you will follow:

  1. Create a new schema that adds a new field to the existing schema.
  2. Register the new schema with Kafka Schema Registry and check the schema compatibility.
  3. Produce and consume data with the new schema and verify the results.
  4. Change the schema compatibility type and repeat the steps above.

Let’s get started!

6. Conclusion

In this tutorial, you have learned how to use Kafka Schema Registry with Python to manage schemas for your data. You have also learned how to use different schema formats, such as Avro, JSON, or Protobuf, and how to perform schema evolution and compatibility checks.

Here are the main points you have covered:

  • Kafka Schema Registry is a service that allows you to store and retrieve schemas for your data.
  • Schemas are definitions of the structure and types of your data, such as fields, values, and metadata.
  • Schemas help you to validate, serialize, and deserialize your data, as well as to ensure compatibility between different versions of your data.
  • Kafka Schema Registry supports different schema formats, such as Avro, JSON, or Protobuf.
  • Kafka Schema Registry provides a feature called schema compatibility, which allows you to define and enforce rules on how your schemas can evolve.
  • Kafka Schema Registry supports different types of schema compatibility, such as BACKWARD, FORWARD, FULL, or NONE.
  • Kafka Schema Registry provides a REST API and a Python client library that allow you to register, update, delete, and fetch schemas, as well as to perform schema validation, schema evolution, and schema compatibility checks.

You have also seen how to use the confluent-kafka-python library to produce and consume data with Kafka, using different schema formats. You have also seen how to handle schema changes and verify the results.

By using Kafka Schema Registry with Python, you can manage schemas for your data in a centralized and consistent way. You can also leverage the features of Kafka Schema Registry, such as schema validation, schema evolution, and schema compatibility, to ensure the quality and consistency of your data.

We hope you have enjoyed this tutorial and learned something new. If you have any questions or feedback, please feel free to leave a comment below. Thank you for reading!

Leave a Reply

Your email address will not be published. Required fields are marked *