package kafka import ( "encoding/json" "fmt" "github.com/pkg/errors" "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" "time" ) // Client represents the Kafka client type Client interface { Consume(topics map[string]func(msg []byte) error) error Send(topic string, msg interface{}) error Close() } // Consumer represents the Kafka consumer type Consumer interface { SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) (err error) ReadMessage(timeout time.Duration) (*kafka.Message, error) CommitMessage(m *kafka.Message) ([]kafka.TopicPartition, error) Close() error } var _ Consumer = &kafka.Consumer{} // Producer represents the Kafka producer type Producer interface { Events() chan kafka.Event ProduceChannel() chan *kafka.Message Close() } var _ Producer = &kafka.Producer{} type defaultClient struct { log Logger consumer Consumer producer Producer } func (k *defaultClient) Consume(topics map[string]func(msg []byte) error) error { s := make([]string, 0, len(topics)) for k := range topics { s = append(s, k) } err := k.consumer.SubscribeTopics(s, nil) if err != nil { return err } for { msg, err := k.consumer.ReadMessage(-1) if err == nil { fn, exists := topics[*msg.TopicPartition.Topic] if !exists { return fmt.Errorf("func for topic %s is not provided", *msg.TopicPartition.Topic) } if err := fn(msg.Value); err != nil { return err } _, err := k.consumer.CommitMessage(msg) if err != nil { return err } } else { if kafkaErr, ok := err.(kafka.Error); ok { if !kafkaErr.IsRetriable() { return errors.Wrap(err, "fatal kafka error detected") } } // The client will automatically try to recover from all errors. k.log.Errorf("Consumer error: %+v (%+v)", err, msg) } } } func (k *defaultClient) Send(topic string, msg interface{}) error { value, err := json.Marshal(msg) if err != nil { return err } doneChan := make(chan error) go func() { defer close(doneChan) for e := range k.producer.Events() { switch ev := e.(type) { case *kafka.Message: m := ev if m.TopicPartition.Error != nil { k.log.Infof("Delivery failed: %v", m.TopicPartition.Error) doneChan <- m.TopicPartition.Error } else { k.log.Infof("Delivered message to topic %s [%d] at offset %v", *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) } return default: k.log.Infof("Ignored event: %s", ev) } } }() k.producer.ProduceChannel() <- &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: value} // wait for delivery report goroutine to finish return <-doneChan } func (k *defaultClient) Close() { _ = k.consumer.Close() k.producer.Close() } var _ Client = &defaultClient{} // New creates a new kafka Client func New(bootstrapServers, serviceName, username, password string, log Logger) (Client, error) { if log == nil { log = &noopLogger{} } consumerConfig := &kafka.ConfigMap{ "bootstrap.servers": bootstrapServers, "broker.address.family": "v4", "client.id": serviceName, "group.id": serviceName, "auto.offset.reset": "earliest", "enable.auto.commit": false, } producerConfig := &kafka.ConfigMap{ "bootstrap.servers": bootstrapServers, "broker.address.family": "v4", "client.id": serviceName, "message.timeout.ms": "1000", } if username != "" && password != "" { _ = consumerConfig.SetKey("security.protocol", "SASL_SSL") _ = consumerConfig.SetKey("sasl.mechanisms", "PLAIN") _ = consumerConfig.SetKey("sasl.username", username) _ = consumerConfig.SetKey("sasl.password", password) _ = producerConfig.SetKey("security.protocol", "SASL_SSL") _ = producerConfig.SetKey("sasl.mechanisms", "PLAIN") _ = producerConfig.SetKey("sasl.username", username) _ = producerConfig.SetKey("sasl.password", password) } return connect( consumerConfig, producerConfig, log, ) } func connect(consumerConfig, producerConfig *kafka.ConfigMap, log Logger) (Client, error) { c, err := kafka.NewConsumer(consumerConfig) if err != nil { return nil, errors.Wrap(err, "failed to create consuner") } p, err := kafka.NewProducer(producerConfig) if err != nil { return nil, errors.Wrap(err, "failed to create producer") } _, err = c.GetMetadata(nil, true, 1000) if err != nil { return nil, errors.Wrapf(err, "failed to get meta data from kafka with config: %+v", consumerConfig) } return &defaultClient{ log: log, consumer: c, producer: p, }, nil }