Skip to main content

Create a consumer

One of the primary use cases for gosoline is to create a message queue consumer. In this tutorial, you'll do just that!

To build a consumer of async message queues you'll' implement the ConsumerCallback interface of the stream package.

Before you begin

Before you begin, make sure you have Golang installed on your machine.

Set up your file structure

First, you need to set up the following file structure:

consumer/
├── consumer.go
├── main.go
├── events.json
└── config.dist.yml

For example, in Unix, run:

mkdir consumer; cd consumer
touch consumer.go
touch main.go
touch events.json
touch config.dist.yml

Those are all the files you need to build your first consumer with gosoline! Next, you'll implement each of these files, starting with consumer.go.

Implement consumer.go

In consumer.go, add the following code:

consumer.go

package main

import (
"context"

"github.com/justtrackio/gosoline/pkg/cfg"
"github.com/justtrackio/gosoline/pkg/log"
"github.com/justtrackio/gosoline/pkg/stream"
)

type Input struct {
Id string `json:"id"`
Body string `json:"body"`
}

type Consumer struct {
logger log.Logger
}

func NewConsumer(ctx context.Context, config cfg.Config, logger log.Logger) (stream.ConsumerCallback, error) {
return &Consumer{
logger: logger,
}, nil
}

func (c Consumer) GetModel(attributes map[string]string) interface{} {
return &Input{}
}

func (c Consumer) Consume(ctx context.Context, model interface{}, attributes map[string]string) (bool, error) {
input := model.(*Input)

c.logger.WithContext(ctx).Info("got input with id %q and body %q", input.Id, input.Body)

return true, nil
}

Now, you'll walkthrough this file in detail to learn how it works.

Import dependencies

At the top of consumer.go, you declared the package and imported some dependencies:

consumer.go
package main

import (
"context"

"github.com/justtrackio/gosoline/pkg/cfg"
"github.com/justtrackio/gosoline/pkg/log"
"github.com/justtrackio/gosoline/pkg/stream"
)

Here, you declared the package as main. Then, you imported the context module along with three gosoline dependencies:

Implement your data structs

Then, you created an Input struct and a Consumer struct:

consumer.go
// 1
type Input struct {
Id string `json:"id"`
Body string `json:"body"`
}

// 2
type Consumer struct {
logger log.Logger
}

You'll use these to:

  1. Bind data from the message queue. Note that you read an id and body from Json keys.
  2. Store logger information about your consumer.

Implement Consumer methods

Next, you implemented some methods for the Consumer:

consumer.go
// 1
func NewConsumer(ctx context.Context, config cfg.Config, logger log.Logger) (stream.ConsumerCallback, error) {
return &Consumer{
logger: logger,
}, nil
}

// 2
func (c Consumer) GetModel(attributes map[string]interface{}) interface{} {
return &Input{}
}

// 3
func (c Consumer) Consume(ctx context.Context, model interface{}, attributes map[string]interface{}) (bool, error) {
input := model.(*Input)

c.logger.WithContext(ctx).Info("got input with id %q and body %q", input.Id, input.Body)

return true, nil
}

Here, you implemented:

  1. A constructor for creating new Consumer objects. This implements the stream.ConsumerCallbackFactory type and is used to add the callback to your application.
  2. GetModal(), a method that returns the expected input model struct which is used to unmarshal the data.
  3. Consume(), a method that loads the model (Input) with data, logs the data, and returns true because it successfully handled the message. This is called for every incoming message.

Together, these methods implement the ConsumerCallback interface.

Implement main.go

In main.go, add the following code:

main.go
package main

import "github.com/justtrackio/gosoline/pkg/application"

func main() {
application.RunConsumer(NewConsumer)
}

Here, you execute your consumer. RunConsumer() expects a parameter of the type stream.ConsumerCallbackFactory to create and run the consumer. NewConsumer() implements this interface.

Configure your consumer

In config.dist.yml, configure your server:

config.dist.yml
env: dev
app_project: gosoline
app_family: how-to
app_group: grp
app_name: consumer

stream:
input:
consumer:
type: file
filename: events.json

Here, you set some minimal configurations for your web server. By default, the gosoline expects that there is an input configured with the config key stream.input.consumer. This defines the input source. In this tutorial, you'll use a file as source with the configured filename, "events.json".

Add your data

In events.json, add some mock events stream data:

{"body": "{\"id\": \"1a0a960f-f04f-4c41-9b9a-a5ca0e2637b2\", \"body\": \"Lorem ipsum dolor sit amet.\"}"}

Now, the final step is to test it to confirm that it works as expected.

Test your consumer

In the consumer directory, run:

go mod init consumer/m
go mod tidy
go run .

In the output, you'll see a log that indicates your consumer handled the event data:

10:23:57.648 consumerCallback info    got input with id "1a0a960f-f04f-4c41-9b9a-a5ca0e2637b2" and body "Lorem ipsum dolor sit amet."  application: consumer

Conclusion

That's it! You created your first gosoline message queue consumer.