Avro Schema Serializer and Deserializer for Schema Registry on Confluent Platform¶
This document describes how to use Avro schemas with the Apache Kafka® Java client and console tools.
The Confluent Schema Registry based Avro serializer, by design, does not include the message schema; but rather, includes the schema ID (in addition to a magic byte) followed by the normal binary encoding of the data itself. You can choose whether or not to embed a schema inline; allowing for cases where you may want to communicate the schema offline, with headers, or some other way. This is in contrast to other systems, such as Hadoop, that always include the schema with the message data. To learn more, see Wire format.
Avro serializer¶
You can plug KafkaAvroSerializer
into KafkaProducer
to send messages of Avro type to Kafka.
Currently supported primitive types are null
, Boolean
, Integer
, Long
, Float
, Double
, String
,
byte[]
, and complex type of IndexedRecord
. Sending data of other types to KafkaAvroSerializer
will
cause a SerializationException
. Typically, IndexedRecord
is used for the value of the Kafka message.
If used, the key of the Kafka message is often one of the primitive types
mentioned above. When sending a message to a topic t, the Avro schema for the
key and the value will be automatically registered in Schema Registry under the subject
t-key and t-value, respectively, if the compatibility test passes. The only
exception is that the null
type is never registered in Schema Registry.
In the following example, a message is sent with a key of type string and a value of type Avro record
to Kafka. A SerializationException
may occur during the send call, if the data is not well formed.
The examples below use the default hostname and port for the Kafka bootstrap server (localhost:9092
) and Schema Registry (localhost:8081
).
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Properties;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
KafkaProducer producer = new KafkaProducer(props);
String key = "key1";
String userSchema = "{\"type\":\"record\"," +
"\"name\":\"myrecord\"," +
"\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "value1");
ProducerRecord<Object, Object> record = new ProducerRecord<>("topic1", key, avroRecord);
try {
producer.send(record);
} catch(SerializationException e) {
// may need to do something with it
}
// When you're finished producing records, you can flush the producer to ensure it has all been written to Kafka and
// then close the producer to free its resources.
finally {
producer.flush();
producer.close();
}
The avro-maven-plugin generated code adds Java-specific properties such as "avro.java.string":"String"
,
which may prevent schema evolution. You can override this by setting avro.remove.java.properties=true
in the Avro serializer configurations.
Avro deserializer¶
You can plug in KafkaAvroDeserializer
to KafkaConsumer
to receive messages of any Avro type from Kafka.
In the following example, messages are received with a key of type string
and a value of type Avro record
from Kafka. When getting the message key or value, a SerializationException
may occur if the data is
not well formed.
The examples below use the default hostname and port for the Kafka bootstrap server (localhost:9092
) and Schema Registry (localhost:8081
).
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.avro.generic.GenericRecord;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Properties;
import java.util.Random;
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
String topic = "topic1";
final Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
for (ConsumerRecord<String, GenericRecord> record : records) {
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
With Avro, it is not necessary to use a property to specify a specific type,
since the type can be derived directly from the Avro schema, using the namespace
and name of the Avro type. This allows the Avro deserializer to be used out of
the box with topics that have records of heterogeneous Avro types. This would
be the case when using the RecordNameStrategy
(or TopicRecordNameStrategy
) to
store multiple types in the same topic, as described in Martin Kleppmann’s blog post
Should You Put Several Event Types in the Same Kafka Topic?.
(An alternative is to use schema references, as described in Multiple event types in the same topic and
Putting Several Event Types in the Same Topic – Revisited)
This differs from the Protobuf and JSON Schema deserializers, where in order to return a specific rather than a generic type, you must use a specific property.
To return a specific type in Avro, you must add the following configuration:
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
Here is a summary of specific and generic return types for each schema format.
Avro | Protobuf | JSON Schema | |
---|---|---|---|
Specific type | Generated class that implements org.apache.avro.SpecificRecord | Generated class that extends com.google.protobuf.Message | Java class (that is compatible with Jackson serialization) |
Generic type | org.apache.avro.GenericRecord | com.google.protobuf.DynamicMessage | com.fasterxml.jackson.databind.JsonNode |
Configure expiry time for client-side schema caches¶
The following format-agnostic configuration options for cache expiry time are available on both the serializer and deserializer:
latest.cache.size
- The maximum size for caches holding latest schemaslatest.cache.ttl.sec
- The time to live (TTL) in seconds for caches holding latest schemas, or-1
for no TTL
Test drive Avro schema¶
To see how this works and test drive the Avro schema format, you can use the producer and consumer commands
in a shell to send and receive Avro data in JSON format. Under the hood, the
producer and consumer use AvroMessageFormatter
and AvroMessageReader
to
convert between Avro and JSON.
Avro defines both a binary serialization format and a JSON serialization format. This allows you to use JSON when human-readability is desired, and the more efficient binary format when storing data in topics.
The command line producer and consumer are useful for understanding how the built-in Avro schema support works on Confluent Platform and Confluent Cloud.
When you incorporate the serializer and deserializer into the code for your own producers and consumers, messages and associated schemas are processed the same way as they are on the console producers and consumers.
The suggested consumer commands include a flag to read --from-beginning
to
be sure you capture the messages even if you don’t run the consumer immediately
after running the producer. If you leave off the --from-beginning
flag, the
consumer will read only the last message produced during its current session.
The examples below include a few minimal configs. For full property references, see Configurations reference.
Prerequisites¶
- Prerequisites to run these examples are generally the same as those described for the Schema Registry Tutorial with the exception of Maven, which is not needed here. Also, Confluent Platform version 5.5.0 or later is required here.
- The following examples use the default Schema Registry URL value (
localhost:8081
). The examples show how to configure this inline by supplying the URL as an argument to the--property
flag in the command line arguments of the producer and consumer (--property schema.registry.url=<address of your schema registry>
). Alternatively, you could set this property in$CONFLUENT_HOME/etc/kafka/server.properties
, and not have to include it in the producer and consumer commands. For example:confluent.schema.registry.url=http://localhost:8081
- These examples make use of the
kafka-avro-console-producer
andkafka-avro-console-consumer
, which are located in$CONFLUENT_HOME/bin
.
Confluent Cloud prerequisites are:
- A Confluent Cloud account
- Permission to create a topic and schema in a cluster in Confluent Cloud
- Stream Governance Package enabled
- API key and secret for Confluent Cloud cluster (
$APIKEY
,$APISECRET
) - API key and secret for Schema Registry (
$SR_APIKEY
,$SR_APISECRET
) - Schema Registry endpoint URL (
$SCHEMA_REGISTRY_URL
) - Cluster ID (
$CLUSTER_ID
) - Schema registry cluster ID (
$SR_CLUSTER_ID
)
The examples assume that API keys, secrets, cluster IDs, and API endpoints are
stored in persistent environment variables wherever possible, and refer to them
as such. You can store these in shell variables if your setup is temporary. If
you want to return to this environment and cluster for future work, consider
storing them in a profile (such as .zsh
, .bashrc
, or powershell.exe
profiles).
The following steps provide guidelines on these prerequisites specific to these examples. To learn more general information, see Manage Clusters.
Log in to Confluent Cloud:
confluent login
Create a Kafka cluster in Confluent Cloud
confluent kafka cluster create <name> [flags]
For example:
confluent kafka cluster create quickstart_cluster --cloud "aws" --region "us-west-2"
Your output will include a cluster ID (in the form of
lkc-xxxxxx
), show the cluster name and cluster type (in this case, “Basic”), and endpoints. Take note of the cluster ID, and store it in an environment variable such as$CLUSTER_ID
.Get an API key and secret for the cluster:
confluent api-key create --resource $CLUSTER_ID
Store the API key and secret for your cluster in a safe place, such as shell environment variables:
$APIKEY
,$APISECRET
View Stream Governance packages and Schema Registry endpoint URL.
A Stream Governance package was enabled as a part of creating the environment.
To view governance packages, use the Confluent CLI command confluent environment list:
confluent environment list
Your output will show the environment ID, name, and associated Stream Governance packages.
To view the Stream Governance API endpoint URL, use the command confluent schema-registry cluster describe:
confluent schema-registry cluster describe
Your output will show the Schema Registry cluster ID in the form of
lsrc-xxxxxx
) and endpoint URL, which is also available to you in Cloud Console on the right side panel under “Stream Governance API” in the environment. Store these in environment variables:$SR_CLUSTER_ID
and$SCHEMA_REGISTRY_URL
.
Create a Schema Registry API key, using the Schema Registry cluster ID (
$SR_CLUSTER_ID
) from the previous step as the resource ID.confluent api-key create --resource $SR_CLUSTER_ID
Store the API key and secret for your Schema Registry in a safe place, such as shell environment variables:
$SR_APIKEY
and$SR_APISECRET
Create and use schemas¶
Start Confluent Platform using the following command:
confluent local services start
Tip
- Alternatively, you can simply run
confluent local services schema-registry start
which also startskafka
andzookeeper
as dependencies. This demo does not directly reference the other services, such as Connect and Control Center. That said, you may want to run the full stack anyway to further explore, for example, how the topics and messages display on Control Center. To learn more aboutconfluent local
, see Quick Start for Confluent Platform and confluent local in the Confluent CLI command reference. - The
confluent local
commands run in the background so you can re-use this command window. Separate sessions are required for the producer and consumer.
- Alternatively, you can simply run
Verify registered schema types.
Schema Registry supports arbitrary schema types. You should verify which schema types are currently registered with Schema Registry.
To do so, type the following command (assuming you use the default URL and port for Schema Registry,
localhost:8081
):curl http://localhost:8081/schemas/types
The response will be one or more of the following. If additional schema format plugins are installed, these will also be available.
["JSON", "PROTOBUF", "AVRO"]
Alternatively, use the curl
--silent
flag, and pipe the command through jq (curl --silent http://localhost:8081/schemas/types | jq
) to get nicely formatted output:"JSON", "PROTOBUF", "AVRO"
Use the producer to send Avro records in JSON as the message value.
The new topic,
transactions-avro
, will be created as a part of this producer command if it does not already exist. This command starts a producer, and creates a schema for the transactions-avro topic. The schema has two fields,id
andamount
.kafka-avro-console-producer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic transactions-avro \ --property value.schema='{"type":"record","name":"Transaction","fields":[{"name":"id","type":"string"},{"name": "amount", "type": "double"}]}'
Tip
The producer does not show a
>
prompt, just a blank line at which to type producer messages.Type the following command in the shell, and hit return.
{ "id":"1000", "amount":500 }
Open a new terminal window, and use the consumer to read from topic
transactions-avro
and get the value of the message in JSON.kafka-avro-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic transactions-avro --property schema.registry.url=http://localhost:8081
You should see following in the console.
{"id":"1000","amount":500.0}
Leave this consumer running.
Register a new schema version under the same subject by adding a new field,
customer_id
.Since the default subject level compatibility is BACKWARD, you must add the new field as “optional” in order for it to be compatible with the previous version.
Open a new terminal window, and run the following command:
kafka-avro-console-producer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic transactions-avro \ --property value.schema='{"type": "record","name": "Transaction","fields": [{"name": "id", "type": "string"},{"name": "amount", "type": "double"},{"name": "customer_id", "type": "string", "default":null}]}'
Type the following into your producer, and hit return:
{ "id":"1001", "amount":500, "customer_id":"1221" }
Return to your running consumer to read from topic
transactions-avro
and get the new message.You should see the new output added to the original.
{"id":"1000","amount":500.0} {"id":"1001","amount":500.0,"customer_id":"1221"}
(If by chance you closed the original consumer, just restart it using the same command shown in step 5.)
In another shell, use
curl
commands to examine the schema that was registered with Schema Registry.curl --silent http://localhost:8081/subjects/transactions-avro-value/versions/1/schema
Your output should show the following, showing the
id
andamount
fields added in version 1 of the schema:{"type":"record","name":"Transaction","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"}]}
To view version 2:
curl --silent http://localhost:8081/subjects/transactions-avro-value/versions/2/schema
Output for version 2 will include the
customer_id
field:{"type":"record","name":"Transaction","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"customer_id","type":"string","default":null}]}
Run this command to view the schema in more detail. (The command as shown is piped through
jq
withcurl
download messages suppressed for more readable output.)curl --silent -X GET http://localhost:8081/subjects/transactions-avro-value/versions/latest | jq .
Your output should resemble:
"subject": "transactions-avro-value", "version": 2, "id": 2, "schema": "{\"type\":\"record\",\"name\":\"Transaction\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"customer_id\",\"type\":\"string\",\"default\":null}]}"
Use Confluent Control Center to examine schemas and messages.
Messages that were successfully produced also show on Control Center (http://localhost:9021/) in Topics > <topicName> > Messages. You may have to select a partition or jump to a timestamp to see messages sent earlier. (For timestamp, type in a number, which will default to partition
1/Partition: 0
, and press return. To get the message view shown here, select the cards icon on the upper right.)Schemas you create are available on the Schemas tab for the selected topic.
Run shutdown and cleanup tasks.
- You can stop the consumer and producer with Ctl-C in their respective command windows.
- To stop Confluent Platform, type
confluent local services stop
. - If you would like to clear out existing data (topics, schemas, and messages) before starting again with another test, type
confluent local destroy
.
Create a Kafka topic:
confluent kafka topic create transactions-avro --cluster $CLUSTER_ID
Copy the following schema and store it in a file called
schema.txt
:{ "type": "record", "name": "Transaction", "fields": [ {"name": "id", "type": "string"}, {"name": "amount", "type": "double"} ] }
Run the following command to create a producer with the schema created in the previous step:
confluent kafka topic produce transactions-avro \ --cluster $CLUSTER_ID \ --schema "<full path to file>/schema.txt" --schema-registry-endpoint $SCHEMA_REGISTRY_URL \ --schema-registry-api-key $SR_APIKEY \ --schema-registry-api-secret $SR_APISECRET \ --api-key $APIKEY --api-secret $APISECRET \ --value-format "avro"
Your output should resemble:
Successfully registered schema with ID 100001 Starting Kafka Producer. Use Ctrl-C or Ctrl-D to exit.
Tip
- You must provide the full path to the schema file even if it resides in the current directory.
- The examples assume you are using the latest version of the Confluent CLI,
where the deprecated
--sr-endpoint
,--sr-api-key
, and--sr-api-secret
string have been superseded by the new--schema-registry-endpoint
,--schema-registry-api-key
, and--schema-registry-api-secret
, respectively. If the examples don’t work because these flags aren’t recognized, you must either update to the new CLI, or use the deprecated flags.
Type the following command in the shell, and hit return.
{ "id":"1000", "amount":500 }
Open another terminal and run a consumer to read from topic
transactions-avro
and get the value of the message in JSON:confluent kafka topic consume transactions-avro \ --cluster $CLUSTER_ID \ --from-beginning \ --value-format "avro" \ --schema-registry-endpoint $SCHEMA_REGISTRY_URL \ --schema-registry-api-key $SR_APIKEY \ --schema-registry-api-secret $SR_APISECRET \ --api-key $APIKEY --api-secret $APISECRET
Your output should be:
{"id":"1000","amount":500}
Register a new schema version under the same subject by adding a new field,
customer_id
.Since the default subject level compatibility is BACKWARD, you must add the new field as “optional” in order for it to be compatible with the previous version. Create a new file as
schema2.txt
and copy the following schema in it:{ "type": "record", "name": "Transaction", "fields": [ {"name": "id", "type": "string"}, {"name": "amount", "type": "double"}, {"name": "customer_id", "type": "string", "default":"null"} ] }
Open another terminal, and run the following command:
confluent kafka topic produce transactions-avro \ --cluster $CLUSTER_ID \ --schema "<full path to file>/schema2.txt" \ --schema-registry-endpoint $SCHEMA_REGISTRY_URL \ --schema-registry-api-key $SR_APIKEY \ --schema-registry-api-secret $SR_APISECRET \ --api-key $APIKEY \ --api-secret "$APISECRET" \ --value-format "avro"
Your output should resemble:
Successfully registered schema with ID 100002 Starting Kafka Producer. Use Ctrl-C or Ctrl-D to exit.
Type the following into your producer, and hit return:
{ "id":"1001", "amount":500, "customer_id":"1221" }
Switch to the terminal with your running consumer to read from topic
transactions-avro
and get the new message.You should see the new output added to the original.:
{"id":"1000","amount":500.0} {"id":"1001","amount":500.0,"customer_id":"1221"}
(If by chance you closed the original consumer, just restart it using the same command shown in step 5.)
View the schemas that were registered with Schema Registry as versions 1 and 2.
confluent schema-registry schema describe --subject transactions-avro-value --version 1
Your output should be similar to the following, showing the
id
andamount
fields added in version 1 of the schema:Schema ID: 100003 Type: JSON Schema: {"type":"object","properties":{"id":{"type":"string"},"amount":{"type":"number"}}}
To view version 2:
confluent schema-registry schema describe --subject transactions-avro-value --version 2
Output for version 2 will include the
customer_id
field:Schema ID: 100002 Schema: {"type":"record","name":"Transaction","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"customer_id","type":"string","default":"null"}]}
Use the Confluent Cloud Console to examine schemas and messages.
Messages that were successfully produced also show on the Confluent Cloud Console (https://confluent.cloud/). in Topics > <topicName> > Messages. You may have to select a partition or jump to a timestamp to see messages sent earlier. (For timestamp, type in a number, which will default to partition
1/Partition: 0
, and press return. To get the message view shown here, select the cards icon on the upper right.)Schemas you create are available on the Schemas tab for the selected topic.
Run shutdown and cleanup tasks.
- You can stop the consumer and producer with Ctl-C in their respective command windows.
- If you were using shell environment variables and want to keep them for later, remember to store them in a safe, persistent location.
- You can remove topics, clusters, and environments from the command line or from the Confluent Cloud Console.
Configurations reference¶
The following configuration properties are available for producers and consumers. These are not specific to a particular schema format, but applicable to any Kafka producers and consumers.
Adding security credentials¶
The test drive examples show how to use the producer and consumer console clients as serializers and deserializers by passing Schema Registry properties on the command line and in config files. In addition to examples given in the “Test Drives”, you can pass truststore and keystore credentials for the Schema Registry, as described in Additional configurations for HTTPS Here is an example for the producer on Confluent Platform:
kafka-avro-console-producer --bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 --topic transactions-avro \
--property value.schema='{"type":"record","name":"Transaction","fields":[{"name":"id","type":"string"},{"name": "amount", "type": "double"}]}' \
--property schema.registry.ssl.truststore.location=/etc/kafka/security/schema.registry.client.truststore.jks \
--property schema.registry.ssl.truststore.password=myTrustStorePassword
Kafka producer configurations¶
A complete reference of producer configuration properties is available in Kafka Producer Configurations.
Kafka consumer configurations¶
A complete reference of consumer configuration properties is available in Kafka Consumer Configurations.
Schema Registry configuration options¶
A complete reference for Schema Registry configuration is available in the Confluent Platform documentation at Schema Registry Configuration Options.
Using Schema Registry with Connect¶
If you are using serializers and deserializers with Kafka Connect, you will need information on key and value converters. To learn more, see Configuring key and value converters. in the Connect documentation.
Schema references in Avro¶
Confluent Platform provides full support for the notion of schema references, the ability of a schema to refer to other schemas.
Tip
Schema references are also supported in Confluent Cloud on Avro, Protobuf, and JSON Schema formats. On the Confluent CLI, you can use the --refs <file>
flag on confluent schema-registry schema create to reference another schema.
To learn more, see the example given below in Multiple event types in the same topic, the associated blog post that goes into further detail on this, and the API example for how to register (create) a new schema in POST /subjects/(string: subject)/versions. That example includes a referenced schema.
Multiple event types in the same topic¶
In addition to providing a way for one schema to call other schemas, schema references can be used to efficiently combine multiple event types in the same topic and still maintain subject-topic constraints.
In Avro, this is accomplished as follows:
Use the default subject naming strategy,
TopicNameStrategy
, which uses the topic name to determine the subject to be used for schema lookups, and helps to enforce subject-topic constraints.Use an Avro union to define the schema references as a list of schema names, for example:
[ "io.confluent.examples.avro.Customer", "io.confluent.examples.avro.Product", "io.confluent.examples.avro.Payment" ]
When the schema is registered, send an array of reference versions. For example:
[ { "name": "io.confluent.examples.avro.Customer", "subject": "customer", "version": 1 }, { "name": "io.confluent.examples.avro.Product", "subject": "product", "version": 1 }, { "name": "io.confluent.examples.avro.Order", "subject": "order", "version": 1 } ]
Configure the Avro serializer to use your Avro union for serialization, and not the event type, by configuring the following properties in your producer application:
auto.register.schemas=false use.latest.version=true
For example:
props.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false); props.put(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true);
Tip
- Setting
auto.register.schemas
to false disables auto-registration of the event type, so that it does not override the union as the latest schema in the subject. Settinguse.latest.version
to true causes the Avro serializer to look up the latest schema version in the subject (which will be the union) and use that for serialization. Otherwise, if set to false, the serializer will look for the event type in the subject and fail to find it. (For examples and more about these settings, see Auto Schema Registration in the Schema Registry tutorials.) - To learn more about configuring Schema Registry with Connect, see Configuration Options for Kafka Connect.
- Setting
Limitations for librdkafka clients¶
librdkafka clients do not currently support AVRO Unions in (de)serialization. A workaround is to reference the schema as a field for each type in the union.
Fields can contain referenced types. For example, a union such as ["typeA", "typeB"]
must be converted to have a field for each element of the union represented as:
{ "type": "A", "typeA": <typeA instance> }, { "type": "B", "typeA": <typeB instance> }
Type A can be described as:
{
"type": "A",
"typeA": <typeA instance>,
"typeB": null
}
Type B can be described as:
{
"type": "B",
"typeA": null,
"typeB": <typeB instance>
}
To learn more about clients, see:
Reflection based Avro serializer and deserializer¶
Starting with version 5.4.0, Confluent Platform also provides a ReflectionAvroSerializer
and ReflectionAvroDeserializer
for reading and writing data in reflection Avro format.
The serializer writes data in wire format defined here, and the deserializer reads data per the same wire format.
The serde for the reflection-based Avro serializer and deserializer is ReflectionAvroSerde
.
To learn more, see Kafka Streams Data Types and Serialization.
Allow for null fields¶
Starting with version 6.2.0 of Confluent Platform, a new configuration option, avro.reflection.allow.null
, was added to support null fields when using the Reflection based Avro serializer and deserializer.
If avro.reflection.allow.null
is set to true
, null values are allowed in fields. (The default is false
.)
Avro schema compatibility rules¶
The compatibility rules for Avro are detailed in the specification under Schema Resolution.
To learn more, see Schema Evolution and Compatibility for Schema Registry on Confluent Platform and Compatibility checks in the overview.