Skip to content

Avro

  • Data is fully typed
  • Schema (defined using JSON) comes along with the data
  • There is no data that is lonely, it always comes with the schema nearby
  • Schema can evolver over time, in a safe manner (schema evolution)
  • Avro schema comes in .avsc format
{
  "namespace": "example.avro",
  "type": "record",
  "name": "user",
  "fields": [
    { "name": "name", "type": "string" },
    { "name": "favorite_number", "type": "int" }
  ]
}
  • Avro requires the schema both for serializing as for deserializing

Avro Types

  • Common fields
  • name: name of your schema
  • namespace: equivalent of package in java
  • doc: documentation to explain your schema
  • aliases: optional other names for your schema
  • fields: array of fields
    • name: name of the field
    • doc: documentation for that field
    • type: data type for that field
    • default: default value for that field
  • Avro Primitive types
  • null: no value
  • boolean: binary value
  • int: 32 bit signed integer
  • long: 64 bit signed integer
  • float: single precision (32 bit) floating-point number
  • double: double precision (64 bit) floating-point number
  • bytes: sequence of 8-bit unsigned bytes
  • string: unicode character sequence
  • Avro Complex Types
  • enums: enumerated set of possible values
  • array: list of multiple items (undefined size)
  • maps: list of key:value pairs
  • unions: allow a field value to take different types
  • calling other schemas as types
  • Avro Logical Types
  • decimals: floating decimal point type. It's more precise than float and double. Used for money. The underlying type is bytes
  • date: int number of days since unix epoch
  • time-millis: long number of milliseconds after midnight
  • timpestamp-millis: long number of milliseconds from unix epoch
[
  {
    "type": "record",
    "namespace": "com.example",
    "name": "CustomerAddress",
    "fields": [
      { "name": "address", "type": "string" },
      { "name": "city", "type": "string" },
      { "name": "postcode", "type": ["string", "int"] },
      {
        "name": "type",
        "type": "enum",
        "symbols": ["PO BOX", "RESIDENTIAL", "ENTERPRISE"]
      }
    ]
  },
  {
    "type": "record",
    "namespace": "com.example",
    "name": "Customer",
    "doc": "Avro Schema for our customer",
    "fields": [
      {
        "name": "first_name",
        "type": "string",
        "doc": "First name of customer"
      },
      { "name": "last_name", "type": "string", "doc": "Last name of customer" },
      {
        "name": "age",
        "type": "int",
        "doc": "Age at the time of registration"
      },
      { "name": "height", "type": "float", "doc": "Height in centimeters" },
      { "name": "weight", "type": "float", "doc": "Weight in kilograms" },
      {
        "name": "automated_email",
        "type": "boolean",
        "default": true,
        "doc": "True if user wants to receive automated emails"
      },
      {
        "name": "customer_emails",
        "type": "array",
        "items": "string",
        "default": []
      },
      { "name": "customer_address", "type": "com.example.CustomerAddress" },
      {
        "name": "signup_ts",
        "type": "long",
        "logicalType": "timestamp-millis",
        "doc": "EPOCH millis Timestamp at which the user signed up"
      }
    ]
  }
]
[
  {
    "type": "record",
    "namespace": "com.example",
    "name": "Customer",
    "fields": [
      { "name": "first_name", "type": "string" },
      { "name": "middle_name", "type": ["null", "string"], "default": null },
      { "name": "last_name", "type": "string" },
      { "name": "age", "type": "int" },
      { "name": "height", "type": "float" },
      { "name": "weight", "type": "float" },
      { "name": "automated_email", "type": "boolean", "default": true },
      {
        "name": "customer_emails",
        "type": "array",
        "items": "string",
        "default": []
      },
      {
        "name": "customer_address",
        "type": {
          "type": "record",
          "name": "CustomerAddress",
          "fields": [
            { "name": "address", "type": "string" },
            { "name": "city", "type": "string" },
            { "name": "postcode", "type": ["string", "int"] },
            {
              "name": "type",
              "type": "enum",
              "symbols": ["PO BOX", "RESIDENTIAL", "ENTERPRISE"]
            }
          ]
        }
      }
    ]
  }
]

Avro Records

  • GenericRecord (avoided)
  • It's used to create an avro object from a schema
  • The schema can be imported from a file or a string
  • SpecificRecord (recommended)
  • It's used to create an avro object from an Avro Schema + a Maven Plugin
  • The avro class code can be generated from an avro schema using a maven plugin
  • ReflectedRecord (use to get an initial schema)
  • Build avro schemas and object from a POJO (plain old java object)

All Records can be written to and read from Avro Files

Avro Tools

# Download avro tools
wget https://repo1.maven.org/maven2/org/apache/avro/avro-tools/1.10.2/avro-tools-1.10.2.jar

# Show possible commands
java -jar "avro-tools-1.10.2.jar"

# Decode avro file
java -jar "~/avro-tools-1.8.2.jar" tojson "customer-generic.avro" --pretty
java -jar "~/avro-tools-1.8.2.jar" tojson "customer-specific.avro" --pretty

# Get schema from avro file
java -jar "~/avro-tools-1.8.2.jar" getschema "customer-specific.avro"

Avro Console Client (Confluent)

docker run -it --rm --net=host "confluentinc/cp-schema-registry:3.3.1" bash
# Avro Producer
kafka-avro-console-producer \
    --bootstrap-server "localhost:9092" \
    --topic "test-avro" \
    --property "schema.registry.url=http://localhost:8081" \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

{"f1": "value1"} # pass
{"f1": "value2"} # pass
{"f1": "value3"} # pass
{"f2": "value4"} # fail
{"f1": 1} # fail

# Avro consumer
kafka-avro-console-consumer \
    --bootstrap-server "localhost:9092" \
    --topic "test-avro" \
    --property "schema.registry.url=http://localhost:8081" \
    --from-beginning

# Avro producer with a new incompatible schema
kafka-avro-console-producer \
    --bootstrap-server "localhost:9092" \
    --topic "test-avro" \
    --property "schema.registry.url=http://localhost:8081" \
    --property value.schema='{"type":"int"}' # Error code 409 (Schema being registered is incompatible with an earlier schema)

# Avro producer with a new compatible schema
kafka-avro-console-producer \
    --bootstrap-server "localhost:9092" \
    --topic "test-avro" \
    --property "schema.registry.url=http://localhost:8081" \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"},{"name": "f2", "type": "int", "default": 0}]}'