Avro
- Data is fully typed
- Schema (defined using JSON) comes along with the data
- There is no data that is lonely, it always comes with the schema nearby
- Schema can evolver over time, in a safe manner (schema evolution)
- Avro schema comes in
.avsc
format
{
"namespace": "example.avro",
"type": "record",
"name": "user",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "favorite_number", "type": "int" }
]
}
- Avro requires the schema both for
serializing
as fordeserializing
Avro Types
- Common fields
name
: name of your schemanamespace
: equivalent of package in javadoc
: documentation to explain your schemaaliases
: optional other names for your schemafields
: array of fieldsname
: name of the fielddoc
: documentation for that fieldtype
: data type for that fielddefault
: default value for that field
- Avro Primitive types
null
: no valueboolean
: binary valueint
: 32 bit signed integerlong
: 64 bit signed integerfloat
: single precision (32 bit) floating-point numberdouble
: double precision (64 bit) floating-point numberbytes
: sequence of 8-bit unsigned bytesstring
: unicode character sequence- Avro Complex Types
enums
: enumerated set of possible valuesarray
: list of multiple items (undefined size)maps
: list of key:value pairsunions
: allow a field value to take different typescalling other schemas as types
- Avro Logical Types
decimals
: floating decimal point type. It's more precise than float and double. Used for money. The underlying type is bytesdate
: int number of days since unix epochtime-millis
: long number of milliseconds after midnighttimpestamp-millis
: long number of milliseconds from unix epoch
[
{
"type": "record",
"namespace": "com.example",
"name": "CustomerAddress",
"fields": [
{ "name": "address", "type": "string" },
{ "name": "city", "type": "string" },
{ "name": "postcode", "type": ["string", "int"] },
{
"name": "type",
"type": "enum",
"symbols": ["PO BOX", "RESIDENTIAL", "ENTERPRISE"]
}
]
},
{
"type": "record",
"namespace": "com.example",
"name": "Customer",
"doc": "Avro Schema for our customer",
"fields": [
{
"name": "first_name",
"type": "string",
"doc": "First name of customer"
},
{ "name": "last_name", "type": "string", "doc": "Last name of customer" },
{
"name": "age",
"type": "int",
"doc": "Age at the time of registration"
},
{ "name": "height", "type": "float", "doc": "Height in centimeters" },
{ "name": "weight", "type": "float", "doc": "Weight in kilograms" },
{
"name": "automated_email",
"type": "boolean",
"default": true,
"doc": "True if user wants to receive automated emails"
},
{
"name": "customer_emails",
"type": "array",
"items": "string",
"default": []
},
{ "name": "customer_address", "type": "com.example.CustomerAddress" },
{
"name": "signup_ts",
"type": "long",
"logicalType": "timestamp-millis",
"doc": "EPOCH millis Timestamp at which the user signed up"
}
]
}
]
[
{
"type": "record",
"namespace": "com.example",
"name": "Customer",
"fields": [
{ "name": "first_name", "type": "string" },
{ "name": "middle_name", "type": ["null", "string"], "default": null },
{ "name": "last_name", "type": "string" },
{ "name": "age", "type": "int" },
{ "name": "height", "type": "float" },
{ "name": "weight", "type": "float" },
{ "name": "automated_email", "type": "boolean", "default": true },
{
"name": "customer_emails",
"type": "array",
"items": "string",
"default": []
},
{
"name": "customer_address",
"type": {
"type": "record",
"name": "CustomerAddress",
"fields": [
{ "name": "address", "type": "string" },
{ "name": "city", "type": "string" },
{ "name": "postcode", "type": ["string", "int"] },
{
"name": "type",
"type": "enum",
"symbols": ["PO BOX", "RESIDENTIAL", "ENTERPRISE"]
}
]
}
}
]
}
]
Avro Records
GenericRecord
(avoided)- It's used to create an avro object from a schema
- The schema can be imported from a file or a string
SpecificRecord
(recommended)- It's used to create an avro object from an Avro Schema + a Maven Plugin
- The avro class code can be generated from an avro schema using a maven plugin
ReflectedRecord
(use to get an initial schema)- Build avro schemas and object from a POJO (plain old java object)
All Records can be written to and read from Avro Files
Avro Tools
# Download avro tools
wget https://repo1.maven.org/maven2/org/apache/avro/avro-tools/1.10.2/avro-tools-1.10.2.jar
# Show possible commands
java -jar "avro-tools-1.10.2.jar"
# Decode avro file
java -jar "~/avro-tools-1.8.2.jar" tojson "customer-generic.avro" --pretty
java -jar "~/avro-tools-1.8.2.jar" tojson "customer-specific.avro" --pretty
# Get schema from avro file
java -jar "~/avro-tools-1.8.2.jar" getschema "customer-specific.avro"
Avro Console Client (Confluent)
Option 1
: Download the confluent binaries at https://www.confluent.io/download/Option 2
: Use a docker image to have access to all the binaries right away
docker run -it --rm --net=host "confluentinc/cp-schema-registry:3.3.1" bash
# Avro Producer
kafka-avro-console-producer \
--bootstrap-server "localhost:9092" \
--topic "test-avro" \
--property "schema.registry.url=http://localhost:8081" \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1": "value1"} # pass
{"f1": "value2"} # pass
{"f1": "value3"} # pass
{"f2": "value4"} # fail
{"f1": 1} # fail
# Avro consumer
kafka-avro-console-consumer \
--bootstrap-server "localhost:9092" \
--topic "test-avro" \
--property "schema.registry.url=http://localhost:8081" \
--from-beginning
# Avro producer with a new incompatible schema
kafka-avro-console-producer \
--bootstrap-server "localhost:9092" \
--topic "test-avro" \
--property "schema.registry.url=http://localhost:8081" \
--property value.schema='{"type":"int"}' # Error code 409 (Schema being registered is incompatible with an earlier schema)
# Avro producer with a new compatible schema
kafka-avro-console-producer \
--bootstrap-server "localhost:9092" \
--topic "test-avro" \
--property "schema.registry.url=http://localhost:8081" \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"},{"name": "f2", "type": "int", "default": 0}]}'