This repository has been archived on 2026-03-07. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files

180 lines
4.5 KiB
Go

package kafka
import (
"encoding/json"
"fmt"
"github.com/pkg/errors"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"time"
)
// Client represents the Kafka client
type Client interface {
Consume(topics map[string]func(msg []byte) error) error
Send(topic string, msg interface{}) error
Close()
}
// Consumer represents the Kafka consumer
type Consumer interface {
SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) (err error)
ReadMessage(timeout time.Duration) (*kafka.Message, error)
CommitMessage(m *kafka.Message) ([]kafka.TopicPartition, error)
Close() error
}
var _ Consumer = &kafka.Consumer{}
// Producer represents the Kafka producer
type Producer interface {
Events() chan kafka.Event
ProduceChannel() chan *kafka.Message
Close()
}
var _ Producer = &kafka.Producer{}
type defaultClient struct {
log Logger
consumer Consumer
producer Producer
}
func (k *defaultClient) Consume(topics map[string]func(msg []byte) error) error {
s := make([]string, 0, len(topics))
for k := range topics {
s = append(s, k)
}
err := k.consumer.SubscribeTopics(s, nil)
if err != nil {
return err
}
for {
msg, err := k.consumer.ReadMessage(-1)
if err == nil {
fn, exists := topics[*msg.TopicPartition.Topic]
if !exists {
return fmt.Errorf("func for topic %s is not provided", *msg.TopicPartition.Topic)
}
if err := fn(msg.Value); err != nil {
return err
}
_, err := k.consumer.CommitMessage(msg)
if err != nil {
return err
}
} else {
if kafkaErr, ok := err.(kafka.Error); ok {
if !kafkaErr.IsRetriable() {
return errors.Wrap(err, "fatal kafka error detected")
}
}
// The client will automatically try to recover from all errors.
k.log.Errorf("Consumer error: %+v (%+v)", err, msg)
}
}
}
func (k *defaultClient) Send(topic string, msg interface{}) error {
value, err := json.Marshal(msg)
if err != nil {
return err
}
doneChan := make(chan error)
go func() {
defer close(doneChan)
for e := range k.producer.Events() {
switch ev := e.(type) {
case *kafka.Message:
m := ev
if m.TopicPartition.Error != nil {
k.log.Infof("Delivery failed: %v", m.TopicPartition.Error)
doneChan <- m.TopicPartition.Error
} else {
k.log.Infof("Delivered message to topic %s [%d] at offset %v",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
return
default:
k.log.Infof("Ignored event: %s", ev)
}
}
}()
k.producer.ProduceChannel() <- &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: value}
// wait for delivery report goroutine to finish
return <-doneChan
}
func (k *defaultClient) Close() {
_ = k.consumer.Close()
k.producer.Close()
}
var _ Client = &defaultClient{}
// New creates a new kafka Client
func New(bootstrapServers, serviceName, username, password string, log Logger) (Client, error) {
if log == nil {
log = &noopLogger{}
}
consumerConfig := &kafka.ConfigMap{
"bootstrap.servers": bootstrapServers,
"broker.address.family": "v4",
"client.id": serviceName,
"group.id": serviceName,
"auto.offset.reset": "earliest",
"enable.auto.commit": false,
}
producerConfig := &kafka.ConfigMap{
"bootstrap.servers": bootstrapServers,
"broker.address.family": "v4",
"client.id": serviceName,
"message.timeout.ms": "1000",
}
if username != "" && password != "" {
_ = consumerConfig.SetKey("security.protocol", "SASL_SSL")
_ = consumerConfig.SetKey("sasl.mechanisms", "PLAIN")
_ = consumerConfig.SetKey("sasl.username", username)
_ = consumerConfig.SetKey("sasl.password", password)
_ = producerConfig.SetKey("security.protocol", "SASL_SSL")
_ = producerConfig.SetKey("sasl.mechanisms", "PLAIN")
_ = producerConfig.SetKey("sasl.username", username)
_ = producerConfig.SetKey("sasl.password", password)
}
return connect(
consumerConfig,
producerConfig,
log,
)
}
func connect(consumerConfig, producerConfig *kafka.ConfigMap, log Logger) (Client, error) {
c, err := kafka.NewConsumer(consumerConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to create consuner")
}
p, err := kafka.NewProducer(producerConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to create producer")
}
_, err = c.GetMetadata(nil, true, 1000)
if err != nil {
return nil, errors.Wrapf(err, "failed to get meta data from kafka with config: %+v", consumerConfig)
}
return &defaultClient{
log: log,
consumer: c,
producer: p,
}, nil
}