init
This commit is contained in:
@@ -0,0 +1,175 @@
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/pkg/errors"
|
||||
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Client represents the Kafka client
|
||||
type Client interface {
|
||||
Consume(topics map[string]func(msg []byte) error) error
|
||||
Send(topic string, msg interface{}) error
|
||||
Close()
|
||||
}
|
||||
|
||||
// Consumer represents the Kafka consumer
|
||||
type Consumer interface {
|
||||
SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) (err error)
|
||||
ReadMessage(timeout time.Duration) (*kafka.Message, error)
|
||||
CommitMessage(m *kafka.Message) ([]kafka.TopicPartition, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
var _ Consumer = &kafka.Consumer{}
|
||||
|
||||
// Producer represents the Kafka producer
|
||||
type Producer interface {
|
||||
Events() chan kafka.Event
|
||||
ProduceChannel() chan *kafka.Message
|
||||
Close()
|
||||
}
|
||||
|
||||
var _ Producer = &kafka.Producer{}
|
||||
|
||||
type defaultClient struct {
|
||||
log Logger
|
||||
consumer Consumer
|
||||
producer Producer
|
||||
}
|
||||
|
||||
func (k *defaultClient) Consume(topics map[string]func(msg []byte) error) error {
|
||||
s := make([]string, 0, len(topics))
|
||||
for k := range topics {
|
||||
s = append(s, k)
|
||||
}
|
||||
err := k.consumer.SubscribeTopics(s, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
msg, err := k.consumer.ReadMessage(-1)
|
||||
if err == nil {
|
||||
fn, exists := topics[*msg.TopicPartition.Topic]
|
||||
if !exists {
|
||||
return fmt.Errorf("func for topic %s is not provided", *msg.TopicPartition.Topic)
|
||||
}
|
||||
if err := fn(msg.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := k.consumer.CommitMessage(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// The client will automatically try to recover from all errors.
|
||||
k.log.Errorf("Consumer error: %v (%v)", err, msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (k *defaultClient) Send(topic string, msg interface{}) error {
|
||||
value, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
doneChan := make(chan error)
|
||||
|
||||
go func() {
|
||||
defer close(doneChan)
|
||||
for e := range k.producer.Events() {
|
||||
switch ev := e.(type) {
|
||||
case *kafka.Message:
|
||||
m := ev
|
||||
if m.TopicPartition.Error != nil {
|
||||
k.log.Infof("Delivery failed: %v", m.TopicPartition.Error)
|
||||
doneChan <- m.TopicPartition.Error
|
||||
} else {
|
||||
k.log.Infof("Delivered message to topic %s [%d] at offset %v",
|
||||
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
|
||||
}
|
||||
return
|
||||
|
||||
default:
|
||||
k.log.Infof("Ignored event: %s", ev)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
k.producer.ProduceChannel() <- &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: value}
|
||||
|
||||
// wait for delivery report goroutine to finish
|
||||
return <-doneChan
|
||||
}
|
||||
|
||||
func (k *defaultClient) Close() {
|
||||
_ = k.consumer.Close()
|
||||
k.producer.Close()
|
||||
}
|
||||
|
||||
var _ Client = &defaultClient{}
|
||||
|
||||
// New creates a new kafka Client
|
||||
func New(bootstrapServers, serviceName, username, password string, log Logger) (Client, error) {
|
||||
if log == nil {
|
||||
log = &noopLogger{}
|
||||
}
|
||||
consumerConfig := &kafka.ConfigMap{
|
||||
"bootstrap.servers": bootstrapServers,
|
||||
"broker.address.family": "v4",
|
||||
"client.id": serviceName,
|
||||
"group.id": serviceName,
|
||||
"auto.offset.reset": "earliest",
|
||||
"message.timeout.ms": "1000",
|
||||
"enable.auto.commit": false,
|
||||
}
|
||||
producerConfig := &kafka.ConfigMap{
|
||||
"bootstrap.servers": bootstrapServers,
|
||||
"broker.address.family": "v4",
|
||||
"client.id": serviceName,
|
||||
"message.timeout.ms": "1000",
|
||||
}
|
||||
if username != "" && password != "" {
|
||||
_ = consumerConfig.SetKey("security.protocol", "SASL_SSL")
|
||||
_ = consumerConfig.SetKey("sasl.mechanisms", "PLAIN")
|
||||
_ = consumerConfig.SetKey("sasl.username", username)
|
||||
_ = consumerConfig.SetKey("sasl.password", password)
|
||||
_ = producerConfig.SetKey("security.protocol", "SASL_SSL")
|
||||
_ = producerConfig.SetKey("sasl.mechanisms", "PLAIN")
|
||||
_ = producerConfig.SetKey("sasl.username", username)
|
||||
_ = producerConfig.SetKey("sasl.password", password)
|
||||
}
|
||||
return connect(
|
||||
consumerConfig,
|
||||
producerConfig,
|
||||
log,
|
||||
)
|
||||
}
|
||||
|
||||
func connect(consumerConfig, producerConfig *kafka.ConfigMap, log Logger) (Client, error) {
|
||||
c, err := kafka.NewConsumer(consumerConfig)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to create consuner")
|
||||
}
|
||||
|
||||
p, err := kafka.NewProducer(producerConfig)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to create producer")
|
||||
}
|
||||
|
||||
_, err = c.GetMetadata(nil, true, 1000)
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to get meta data from kafka with config: %+v", consumerConfig)
|
||||
}
|
||||
|
||||
return &defaultClient{
|
||||
log: log,
|
||||
consumer: c,
|
||||
producer: p,
|
||||
}, nil
|
||||
}
|
||||
Reference in New Issue
Block a user