Create a consumer
One of the primary use cases for gosoline is to create a message queue consumer. In this tutorial, you'll do just that!
To build a consumer of async message queues you'll' implement the ConsumerCallback
interface of the stream
package.
Before you begin
Before you begin, make sure you have Golang installed on your machine.
Set up your file structure
First, you need to set up the following file structure:
consumer/
├── consumer.go
├── main.go
├── events.json
└── config.dist.yml
For example, in Unix, run:
mkdir consumer; cd consumer
touch consumer.go
touch main.go
touch events.json
touch config.dist.yml
Those are all the files you need to build your first consumer with gosoline! Next, you'll implement each of these files, starting with consumer.go
.
Implement consumer.go
In consumer.go
, add the following code:
consumer.go
package main
import (
"context"
"github.com/justtrackio/gosoline/pkg/cfg"
"github.com/justtrackio/gosoline/pkg/log"
"github.com/justtrackio/gosoline/pkg/stream"
)
type Input struct {
Id string `json:"id"`
Body string `json:"body"`
}
type Consumer struct {
logger log.Logger
}
func NewConsumer(ctx context.Context, config cfg.Config, logger log.Logger) (stream.ConsumerCallback, error) {
return &Consumer{
logger: logger,
}, nil
}
func (c Consumer) GetModel(attributes map[string]string) interface{} {
return &Input{}
}
func (c Consumer) Consume(ctx context.Context, model interface{}, attributes map[string]string) (bool, error) {
input := model.(*Input)
c.logger.WithContext(ctx).Info("got input with id %q and body %q", input.Id, input.Body)
return true, nil
}
Now, you'll walkthrough this file in detail to learn how it works.
Import dependencies
At the top of consumer.go
, you declared the package and imported some dependencies:
package main
import (
"context"
"github.com/justtrackio/gosoline/pkg/cfg"
"github.com/justtrackio/gosoline/pkg/log"
"github.com/justtrackio/gosoline/pkg/stream"
)
Here, you declared the package as main
. Then, you imported the context
module along with three gosoline dependencies:
Implement your data structs
Then, you created an Input
struct and a Consumer
struct:
// 1
type Input struct {
Id string `json:"id"`
Body string `json:"body"`
}
// 2
type Consumer struct {
logger log.Logger
}
You'll use these to:
- Bind data from the message queue. Note that you read an
id
andbody
from Json keys. - Store logger information about your consumer.
Implement Consumer
methods
Next, you implemented some methods for the Consumer
:
// 1
func NewConsumer(ctx context.Context, config cfg.Config, logger log.Logger) (stream.ConsumerCallback, error) {
return &Consumer{
logger: logger,
}, nil
}
// 2
func (c Consumer) GetModel(attributes map[string]interface{}) interface{} {
return &Input{}
}
// 3
func (c Consumer) Consume(ctx context.Context, model interface{}, attributes map[string]interface{}) (bool, error) {
input := model.(*Input)
c.logger.WithContext(ctx).Info("got input with id %q and body %q", input.Id, input.Body)
return true, nil
}
Here, you implemented:
- A constructor for creating new
Consumer
objects. This implements thestream.ConsumerCallbackFactory
type and is used to add the callback to your application. GetModal()
, a method that returns the expected input model struct which is used to unmarshal the data.Consume()
, a method that loads the model (Input
) with data, logs the data, and returnstrue
because it successfully handled the message. This is called for every incoming message.
Together, these methods implement the ConsumerCallback
interface.
Implement main.go
In main.go
, add the following code:
package main
import "github.com/justtrackio/gosoline/pkg/application"
func main() {
application.RunConsumer(NewConsumer)
}
Here, you execute your consumer. RunConsumer()
expects a parameter of the type stream.ConsumerCallbackFactory
to create and run the consumer. NewConsumer()
implements this interface.
Configure your consumer
In config.dist.yml
, configure your server:
env: dev
app_project: gosoline
app_family: how-to
app_group: grp
app_name: consumer
stream:
input:
consumer:
type: file
filename: events.json
Here, you set some minimal configurations for your web server. By default, the gosoline expects that there is an input configured with the config key stream.input.consumer
. This defines the input source. In this tutorial, you'll use a file as source with the configured filename, "events.json".
Add your data
In events.json
, add some mock events stream data:
{"body": "{\"id\": \"1a0a960f-f04f-4c41-9b9a-a5ca0e2637b2\", \"body\": \"Lorem ipsum dolor sit amet.\"}"}
Now, the final step is to test it to confirm that it works as expected.
Test your consumer
In the consumer
directory, run:
go mod init consumer/m
go mod tidy
go run .
In the output, you'll see a log that indicates your consumer handled the event data:
10:23:57.648 consumerCallback info got input with id "1a0a960f-f04f-4c41-9b9a-a5ca0e2637b2" and body "Lorem ipsum dolor sit amet." application: consumer
Conclusion
That's it! You created your first gosoline message queue consumer.