How to use the Kafka Schema Registry
Gosoline provides the option to use the Kafka schema registry when reading from or writing to a Kafka topic. This will ensure that changes to your schema do not break compatability with the previous schemas according to the specified compatability mode.
Using the shema registry is optional and gosoline will let you read and write anything if you choose to.
Moreover, keep in mind that the Apache Kafka broker does not have any server side schema validation and will still let you write anything to a topic if you do not configure gosoline to use the schema registry, even if there are already existing consumers and producers of that topic which do use the schema registry.
Registering a schema
Gosoline itself will not register your schema and expects that any provided schema is already registered, as otherwise your app may crash on runtime when trying to register an incompatible schema.
Thus, you will have to externally register every schema that you want to use and check for compatability, e.g. via a CI/CD pipeline.
Configure a Producer/Publisher to use the schema registry
As usual, you need to specify the encoding in your config according to your schema type.
In addition to the already natively supported application/json
and application/x-protobuf
encodings, you also have the option to use application/avro
when using the Kafka schema registry.
stream:
producer:
producerName:
encoding: application/avro
Now you need to provide the schema settings via the stream.WithSchemaSettings
option when creating your Producer/Publisher.
//go:embed ExampleEvent.avsc
var exampleEventSchema string
schemaSettings := stream.SchemaSettings{
Subject: "exampleEvent",
Schema: exampleEventSchema,
Model: &exampleEvent{},
}
producer, err := stream.NewProducer(ctx, config, logger, "producerName", stream.WithSchemaSettings(schemaSettings))
Where the ExampleEvent.avsc
file would contain the avro schema.
Note that depending on whether your model is specified as a struct (exampleEvent{}
) or a pointer to a struct (&exampleEvent{}
) you will also need to write your events accordingly via producer.Write
/producer.WriteOne
because the internal serializer of the franz-go kafka library will otherwise complain that there is no registered encoder for your model if you mix it up.
Configure a Consumer to use the schema registry
Configure the encoding in your config.
stream:
consumer:
default:
encoding: application/avro
Like the producer, the consumer needs to provide the schema settings but the consumer has to do this by implementing the stream.SchemaSettingsAwareCallback
interface,
i.e. by implementing the GetSchemaSettings()
method.
func (c consumer) GetSchemaSettings() (*stream.SchemaSettings, error) {
return &stream.SchemaSettings{
Subject: "exampleEvent",
Schema: exampleEventSchema,
Model: &exampleEvent{},
}, nil
}
Configure a Subscriber to use the schema registry
This case works similar to the consumer case.
Again configure the encoding in your config.
mdlsub:
subscribers:
targetModelName:
input: kafka
output: ddb
source: { group: source-group, name: exampleEvent }
stream:
consumer:
subscriber-targetModelName:
encoding: application/avro
And again you need to provide the schema settings by implementing the stream.SchemaSettingsAwareCallback
interface
but in this case the interface has to be implemented by your transformer.
func (t transformer) GetSchemaSettings() (*stream.SchemaSettings, error) {
return &stream.SchemaSettings{
Subject: "exampleEvent",
Schema: exampleEventSchema,
Model: &exampleEvent{},
}, nil
}
Note that gosoline will only allow you to have one transformer per model when using the schema registry as proper schema evolution via the registry should make versioning with multiple transformers obsolete.