This repository has been archived on 2026-03-07. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
go-kafka/kafka_test.go
T

444 lines
11 KiB
Go

package kafka
import (
"errors"
"fmt"
"github.com/sanity-io/litter"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"reflect"
"testing"
"time"
)
func Test_connect(t *testing.T) {
type args struct {
consumerConfig *kafka.ConfigMap
producerConfig *kafka.ConfigMap
}
tests := []struct {
name string
args args
want Client
wantErr bool
}{
{
name: "invalid consumer config",
args: args{
consumerConfig: &kafka.ConfigMap{},
},
want: nil,
wantErr: true,
},
{
name: "invalid producer config",
args: args{
consumerConfig: &kafka.ConfigMap{"group.id": "test"},
producerConfig: &kafka.ConfigMap{"delivery.report.only.error": "abc"},
},
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := connect(tt.args.consumerConfig, tt.args.producerConfig, nil)
if (err != nil) != tt.wantErr {
t.Errorf("connect() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("connect() got = %v, want %v", got, tt.want)
}
})
}
}
func TestDefaultClient_Consume(t *testing.T) {
type fields struct {
consumer Consumer
}
type args struct {
topic string
fn func(msg []byte) error
}
tests := []struct {
name string
fields fields
args args
wantErr bool
wantLogged []string
}{
{
name: "error subscribing to topics",
fields: fields{
consumer: &MockConsumer{
subscribe: func(topics []string, rebalanceCb kafka.RebalanceCb) (err error) {
expected := []string{"bookingsystem.fct.booking.1"}
if !reflect.DeepEqual(topics, expected) {
t.Errorf("Consume() got %s, want %s", topics, expected)
}
return errors.New("error")
},
},
},
args: args{
topic: "bookingsystem.fct.booking.1",
fn: func(msg []byte) error {
return nil
},
},
wantErr: true,
},
{
name: "error reading message",
fields: fields{
consumer: &MockConsumer{
subscribe: func(topics []string, rebalanceCb kafka.RebalanceCb) (err error) {
return nil
},
read: func(count int) func(time.Duration) (*kafka.Message, error) {
return func(timeout time.Duration) (*kafka.Message, error) {
count = count - 1
if count >= 0 {
return nil, errors.New("error")
}
return &kafka.Message{Value: []byte(`{"a":`), TopicPartition: kafka.TopicPartition{Topic: sptr("bookingsystem.fct.booking.1")}}, nil
}
}(1),
},
},
args: args{
topic: "bookingsystem.fct.booking.1",
fn: func(msg []byte) error {
return errors.New("error")
},
},
wantErr: true,
wantLogged: []string{"ERROR: Consumer error: error (<nil>)"},
},
{
name: "no func for topic",
fields: fields{
consumer: &MockConsumer{
subscribe: func(topics []string, rebalanceCb kafka.RebalanceCb) (err error) {
return nil
},
read: func(count int) func(time.Duration) (*kafka.Message, error) {
return func(timeout time.Duration) (*kafka.Message, error) {
count = count - 1
if count >= 0 {
return nil, errors.New("error")
}
return &kafka.Message{Value: []byte(`{}`), TopicPartition: kafka.TopicPartition{Topic: sptr("unexpected")}}, nil
}
}(1),
},
},
args: args{
topic: "bookingsystem.fct.booking.1",
fn: func(msg []byte) error {
return errors.New("error")
},
},
wantErr: true,
wantLogged: []string{"ERROR: Consumer error: error (<nil>)"},
},
{
name: "error commiting",
fields: fields{
consumer: &MockConsumer{
subscribe: func(topics []string, rebalanceCb kafka.RebalanceCb) (err error) {
return nil
},
read: func(count int) func(time.Duration) (*kafka.Message, error) {
return func(timeout time.Duration) (*kafka.Message, error) {
count = count - 1
if count >= 0 {
return nil, errors.New("error")
}
return &kafka.Message{Value: []byte(`{}`), TopicPartition: kafka.TopicPartition{Topic: sptr("bookingsystem.fct.booking.1")}}, nil
}
}(1),
commit: func(msg *kafka.Message) ([]kafka.TopicPartition, error) {
return nil, errors.New("error")
},
},
},
args: args{
topic: "bookingsystem.fct.booking.1",
fn: func(msg []byte) error {
return nil
},
},
wantErr: true,
wantLogged: []string{"ERROR: Consumer error: error (<nil>)"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger := &MockLogger{}
k := &defaultClient{
consumer: tt.fields.consumer,
log: logger,
}
if err := k.Consume(map[string]func(msg []byte) error{
tt.args.topic: tt.args.fn,
}); (err != nil) != tt.wantErr {
t.Errorf("Consume() error = %v, wantErr %v", err, tt.wantErr)
}
logger.Check(t, tt.wantLogged)
})
}
}
func TestDefaultClient_Send(t *testing.T) {
type fields struct {
producer Producer
}
type args struct {
topic string
msg interface{}
}
tests := []struct {
name string
fields fields
args args
events []kafka.Event
wantErr bool
wantLogged []string
}{
{
name: "error marshalling",
fields: fields{
producer: NewProducer(func() {}),
},
args: args{
topic: "bookingsystem.fct.booking.1",
msg: BrokenMarshal(true),
},
wantErr: true,
},
{
name: "topic partition error is returned",
fields: fields{
producer: NewProducer(func() {}),
},
args: args{
topic: "bookingsystem.fct.booking.1",
msg: "blutti",
},
events: []kafka.Event{&kafka.Message{TopicPartition: kafka.TopicPartition{Error: errors.New("error")}}},
wantErr: true,
wantLogged: []string{"INFO: Delivery failed: error"},
},
{
name: "success",
fields: fields{
producer: NewProducer(func() {}),
},
args: args{
topic: "bookingsystem.fct.booking.1",
msg: "blutti",
},
events: []kafka.Event{IgnoredEvent(true), &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: sptr("topic"), Partition: 1, Offset: kafka.Offset(23)}}},
wantErr: false,
wantLogged: []string{
"INFO: Ignored event: ignored",
"INFO: Delivered message to topic topic [1] at offset 23",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger := &MockLogger{}
k := &defaultClient{
producer: tt.fields.producer,
log: logger,
}
if len(tt.events) > 0 {
go func() {
time.Sleep(time.Second)
for _, e := range tt.events {
tt.fields.producer.Events() <- e
}
}()
}
if err := k.Send(tt.args.topic, tt.args.msg); (err != nil) != tt.wantErr {
t.Errorf("Send() error = %v, wantErr %v", err, tt.wantErr)
}
logger.Check(t, tt.wantLogged)
})
}
}
func TestKafka_ConsumerErrorReturns(t *testing.T) {
k := &defaultClient{
consumer: &MockConsumer{
read: func(timeout time.Duration) (*kafka.Message, error) {
return &kafka.Message{}, kafka.NewError(kafka.ErrUnknownTopicOrPart, "test", false)
},
subscribe: func(topics []string, rebalanceCb kafka.RebalanceCb) (err error) {
return nil
},
},
}
handlers := map[string]func(msg []byte) error{
"topic": func(msg []byte) error {
return nil
},
}
err := k.Consume(handlers)
if err == nil {
t.Error("error expected")
}
}
func TestDefaultClient_Close(t *testing.T) {
consumerClosed := false
producerClosed := false
k := &defaultClient{
consumer: &MockConsumer{close: func() error {
consumerClosed = true
return nil
}},
producer: NewProducer(func() {
producerClosed = true
}),
}
k.Close()
if !consumerClosed || !producerClosed {
t.Error("Close() expected close to be called on both consumer and producer but wasn't")
}
}
type MockConsumer struct {
subscribe func(topics []string, rebalanceCb kafka.RebalanceCb) (err error)
read func(timeout time.Duration) (*kafka.Message, error)
commit func(msg *kafka.Message) ([]kafka.TopicPartition, error)
close func() error
}
func (m MockConsumer) SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) (err error) {
return m.subscribe(topics, rebalanceCb)
}
func (m MockConsumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
return m.read(timeout)
}
func (m MockConsumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, error) {
return m.commit(msg)
}
func (m MockConsumer) Close() error {
return m.close()
}
var _ Consumer = &MockConsumer{}
type MockProducer struct {
events chan kafka.Event
produce chan *kafka.Message
close func()
}
func (m *MockProducer) Events() chan kafka.Event {
return m.events
}
func (m *MockProducer) ProduceChannel() chan *kafka.Message {
return m.produce
}
func (m *MockProducer) Close() {
m.close()
}
var _ Producer = &MockProducer{}
func NewProducer(close func()) *MockProducer {
return &MockProducer{
events: make(chan kafka.Event, 10),
produce: make(chan *kafka.Message, 10),
close: close,
}
}
type BrokenMarshal bool
func (BrokenMarshal) MarshalJSON() ([]byte, error) {
return nil, errors.New("error")
}
type IgnoredEvent bool
func (IgnoredEvent) String() string {
return "ignored"
}
func sptr(s string) *string {
return &s
}
type MockLogger struct {
Logged []string
}
func (m *MockLogger) Check(t *testing.T, wantLogged []string) {
t.Helper()
if len(m.Logged) != 0 || len(wantLogged) != 0 {
got := litter.Sdump(m.Logged)
want := litter.Sdump(wantLogged)
if got != want {
t.Errorf("MockLogger() got %s, want %s", got, want)
}
}
}
func (m *MockLogger) Debug(s string) {
m.Logged = append(m.Logged, fmt.Sprintf("DEBUG: %s", s))
}
func (m *MockLogger) Info(s string) {
m.Logged = append(m.Logged, fmt.Sprintf("INFO: %s", s))
}
func (m *MockLogger) Warn(s string) {
m.Logged = append(m.Logged, fmt.Sprintf("WARN: %s", s))
}
func (m *MockLogger) Error(s string) {
m.Logged = append(m.Logged, fmt.Sprintf("ERROR: %s", s))
}
func (m *MockLogger) Fatal(s string) {
m.Logged = append(m.Logged, fmt.Sprintf("FATAL: %s", s))
}
func (m *MockLogger) Debugf(s string, i ...interface{}) {
m.Logged = append(m.Logged, fmt.Sprintf("DEBUG: %s", fmt.Sprintf(s, i...)))
}
func (m *MockLogger) Infof(s string, i ...interface{}) {
m.Logged = append(m.Logged, fmt.Sprintf("INFO: %s", fmt.Sprintf(s, i...)))
}
func (m *MockLogger) Warnf(s string, i ...interface{}) {
m.Logged = append(m.Logged, fmt.Sprintf("WARN: %s", fmt.Sprintf(s, i...)))
}
func (m *MockLogger) Errorf(s string, i ...interface{}) {
m.Logged = append(m.Logged, fmt.Sprintf("ERROR: %s", fmt.Sprintf(s, i...)))
}
func (m *MockLogger) Fatalf(s string, i ...interface{}) {
m.Logged = append(m.Logged, fmt.Sprintf("FATAL: %s", fmt.Sprintf(s, i...)))
}
var _ Logger = &MockLogger{}