180 lines
4.5 KiB
Go
180 lines
4.5 KiB
Go
package kafka
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/pkg/errors"
|
|
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
|
|
"time"
|
|
)
|
|
|
|
// Client represents the Kafka client
|
|
type Client interface {
|
|
Consume(topics map[string]func(msg []byte) error) error
|
|
Send(topic string, msg interface{}) error
|
|
Close()
|
|
}
|
|
|
|
// Consumer represents the Kafka consumer
|
|
type Consumer interface {
|
|
SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) (err error)
|
|
ReadMessage(timeout time.Duration) (*kafka.Message, error)
|
|
CommitMessage(m *kafka.Message) ([]kafka.TopicPartition, error)
|
|
Close() error
|
|
}
|
|
|
|
var _ Consumer = &kafka.Consumer{}
|
|
|
|
// Producer represents the Kafka producer
|
|
type Producer interface {
|
|
Events() chan kafka.Event
|
|
ProduceChannel() chan *kafka.Message
|
|
Close()
|
|
}
|
|
|
|
var _ Producer = &kafka.Producer{}
|
|
|
|
type defaultClient struct {
|
|
log Logger
|
|
consumer Consumer
|
|
producer Producer
|
|
}
|
|
|
|
func (k *defaultClient) Consume(topics map[string]func(msg []byte) error) error {
|
|
s := make([]string, 0, len(topics))
|
|
for k := range topics {
|
|
s = append(s, k)
|
|
}
|
|
err := k.consumer.SubscribeTopics(s, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
msg, err := k.consumer.ReadMessage(-1)
|
|
if err == nil {
|
|
fn, exists := topics[*msg.TopicPartition.Topic]
|
|
if !exists {
|
|
return fmt.Errorf("func for topic %s is not provided", *msg.TopicPartition.Topic)
|
|
}
|
|
if err := fn(msg.Value); err != nil {
|
|
return err
|
|
}
|
|
_, err := k.consumer.CommitMessage(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if kafkaErr, ok := err.(kafka.Error); ok {
|
|
if !kafkaErr.IsRetriable() {
|
|
return errors.Wrap(err, "fatal kafka error detected")
|
|
}
|
|
}
|
|
// The client will automatically try to recover from all errors.
|
|
k.log.Errorf("Consumer error: %+v (%+v)", err, msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (k *defaultClient) Send(topic string, msg interface{}) error {
|
|
value, err := json.Marshal(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
doneChan := make(chan error)
|
|
|
|
go func() {
|
|
defer close(doneChan)
|
|
for e := range k.producer.Events() {
|
|
switch ev := e.(type) {
|
|
case *kafka.Message:
|
|
m := ev
|
|
if m.TopicPartition.Error != nil {
|
|
k.log.Infof("Delivery failed: %v", m.TopicPartition.Error)
|
|
doneChan <- m.TopicPartition.Error
|
|
} else {
|
|
k.log.Infof("Delivered message to topic %s [%d] at offset %v",
|
|
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
|
|
}
|
|
return
|
|
|
|
default:
|
|
k.log.Infof("Ignored event: %s", ev)
|
|
}
|
|
}
|
|
}()
|
|
|
|
k.producer.ProduceChannel() <- &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: value}
|
|
|
|
// wait for delivery report goroutine to finish
|
|
return <-doneChan
|
|
}
|
|
|
|
func (k *defaultClient) Close() {
|
|
_ = k.consumer.Close()
|
|
k.producer.Close()
|
|
}
|
|
|
|
var _ Client = &defaultClient{}
|
|
|
|
// New creates a new kafka Client
|
|
func New(bootstrapServers, serviceName, username, password string, log Logger) (Client, error) {
|
|
if log == nil {
|
|
log = &noopLogger{}
|
|
}
|
|
consumerConfig := &kafka.ConfigMap{
|
|
"bootstrap.servers": bootstrapServers,
|
|
"broker.address.family": "v4",
|
|
"client.id": serviceName,
|
|
"group.id": serviceName,
|
|
"auto.offset.reset": "earliest",
|
|
"enable.auto.commit": false,
|
|
}
|
|
producerConfig := &kafka.ConfigMap{
|
|
"bootstrap.servers": bootstrapServers,
|
|
"broker.address.family": "v4",
|
|
"client.id": serviceName,
|
|
"message.timeout.ms": "1000",
|
|
}
|
|
if username != "" && password != "" {
|
|
_ = consumerConfig.SetKey("security.protocol", "SASL_SSL")
|
|
_ = consumerConfig.SetKey("sasl.mechanisms", "PLAIN")
|
|
_ = consumerConfig.SetKey("sasl.username", username)
|
|
_ = consumerConfig.SetKey("sasl.password", password)
|
|
_ = producerConfig.SetKey("security.protocol", "SASL_SSL")
|
|
_ = producerConfig.SetKey("sasl.mechanisms", "PLAIN")
|
|
_ = producerConfig.SetKey("sasl.username", username)
|
|
_ = producerConfig.SetKey("sasl.password", password)
|
|
}
|
|
return connect(
|
|
consumerConfig,
|
|
producerConfig,
|
|
log,
|
|
)
|
|
}
|
|
|
|
func connect(consumerConfig, producerConfig *kafka.ConfigMap, log Logger) (Client, error) {
|
|
c, err := kafka.NewConsumer(consumerConfig)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to create consuner")
|
|
}
|
|
|
|
p, err := kafka.NewProducer(producerConfig)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to create producer")
|
|
}
|
|
|
|
_, err = c.GetMetadata(nil, true, 1000)
|
|
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "failed to get meta data from kafka with config: %+v", consumerConfig)
|
|
}
|
|
|
|
return &defaultClient{
|
|
log: log,
|
|
consumer: c,
|
|
producer: p,
|
|
}, nil
|
|
}
|