Skip to main content

How to use the Kafka Schema Registry

Gosoline provides the option to use the Kafka schema registry when reading from or writing to a Kafka topic. This will ensure that changes to your schema do not break compatability with the previous schemas according to the specified compatability mode.

Using the shema registry is optional and gosoline will let you read and write anything if you choose to.

Moreover, keep in mind that the Apache Kafka broker does not have any server side schema validation and will still let you write anything to a topic if you do not configure gosoline to use the schema registry, even if there are already existing consumers and producers of that topic which do use the schema registry.

Registering a schema

Gosoline itself will not register your schema and expects that any provided schema is already registered, as otherwise your app may crash on runtime when trying to register an incompatible schema.

Thus, you will have to externally register every schema that you want to use and check for compatability, e.g. via a CI/CD pipeline.

Configure a Producer/Publisher to use the schema registry

As usual, you need to specify the encoding in your config according to your schema type.

In addition to the already natively supported application/json and application/x-protobuf encodings, you also have the option to use application/avro when using the Kafka schema registry.

stream:
producer:
producerName:
encoding: application/avro

Now you need to provide the schema settings via the stream.WithSchemaSettings option when creating your Producer/Publisher.

//go:embed ExampleEvent.avsc
var exampleEventSchema string

schemaSettings := stream.SchemaSettings{
Subject: "exampleEvent",
Schema: exampleEventSchema,
Model: &exampleEvent{},
}

producer, err := stream.NewProducer(ctx, config, logger, "producerName", stream.WithSchemaSettings(schemaSettings))

Where the ExampleEvent.avsc file would contain the avro schema.

Note that depending on whether your model is specified as a struct (exampleEvent{}) or a pointer to a struct (&exampleEvent{}) you will also need to write your events accordingly via producer.Write/producer.WriteOne because the internal serializer of the franz-go kafka library will otherwise complain that there is no registered encoder for your model if you mix it up.

Configure a Consumer to use the schema registry

Configure the encoding in your config.

stream:
consumer:
default:
encoding: application/avro

Like the producer, the consumer needs to provide the schema settings but the consumer has to do this by implementing the stream.SchemaSettingsAwareCallback interface, i.e. by implementing the GetSchemaSettings() method.

func (c consumer) GetSchemaSettings() (*stream.SchemaSettings, error) {
return &stream.SchemaSettings{
Subject: "exampleEvent",
Schema: exampleEventSchema,
Model: &exampleEvent{},
}, nil
}

Configure a Subscriber to use the schema registry

This case works similar to the consumer case.

Again configure the encoding in your config.

mdlsub:
subscribers:
targetModelName:
input: kafka
output: ddb
source: { group: source-group, name: exampleEvent }

stream:
consumer:
subscriber-targetModelName:
encoding: application/avro

And again you need to provide the schema settings by implementing the stream.SchemaSettingsAwareCallback interface but in this case the interface has to be implemented by your transformer.

func (t transformer) GetSchemaSettings() (*stream.SchemaSettings, error) {
return &stream.SchemaSettings{
Subject: "exampleEvent",
Schema: exampleEventSchema,
Model: &exampleEvent{},
}, nil
}

Note that gosoline will only allow you to have one transformer per model when using the schema registry as proper schema evolution via the registry should make versioning with multiple transformers obsolete.