package kafka import ( "errors" "fmt" "github.com/sanity-io/litter" "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" "reflect" "testing" "time" ) func Test_connect(t *testing.T) { type args struct { consumerConfig *kafka.ConfigMap producerConfig *kafka.ConfigMap } tests := []struct { name string args args want Client wantErr bool }{ { name: "invalid consumer config", args: args{ consumerConfig: &kafka.ConfigMap{}, }, want: nil, wantErr: true, }, { name: "invalid producer config", args: args{ consumerConfig: &kafka.ConfigMap{"group.id": "test"}, producerConfig: &kafka.ConfigMap{"delivery.report.only.error": "abc"}, }, want: nil, wantErr: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got, err := connect(tt.args.consumerConfig, tt.args.producerConfig, nil) if (err != nil) != tt.wantErr { t.Errorf("connect() error = %v, wantErr %v", err, tt.wantErr) return } if !reflect.DeepEqual(got, tt.want) { t.Errorf("connect() got = %v, want %v", got, tt.want) } }) } } func TestDefaultClient_Consume(t *testing.T) { type fields struct { consumer Consumer } type args struct { topic string fn func(msg []byte) error } tests := []struct { name string fields fields args args wantErr bool wantLogged []string }{ { name: "error subscribing to topics", fields: fields{ consumer: &MockConsumer{ subscribe: func(topics []string, rebalanceCb kafka.RebalanceCb) (err error) { expected := []string{"bookingsystem.fct.booking.1"} if !reflect.DeepEqual(topics, expected) { t.Errorf("Consume() got %s, want %s", topics, expected) } return errors.New("error") }, }, }, args: args{ topic: "bookingsystem.fct.booking.1", fn: func(msg []byte) error { return nil }, }, wantErr: true, }, { name: "error reading message", fields: fields{ consumer: &MockConsumer{ subscribe: func(topics []string, rebalanceCb kafka.RebalanceCb) (err error) { return nil }, read: func(count int) func(time.Duration) (*kafka.Message, error) { return func(timeout time.Duration) (*kafka.Message, error) { count = count - 1 if count >= 0 { return nil, errors.New("error") } return &kafka.Message{Value: []byte(`{"a":`), TopicPartition: kafka.TopicPartition{Topic: sptr("bookingsystem.fct.booking.1")}}, nil } }(1), }, }, args: args{ topic: "bookingsystem.fct.booking.1", fn: func(msg []byte) error { return errors.New("error") }, }, wantErr: true, wantLogged: []string{"ERROR: Consumer error: error ()"}, }, { name: "no func for topic", fields: fields{ consumer: &MockConsumer{ subscribe: func(topics []string, rebalanceCb kafka.RebalanceCb) (err error) { return nil }, read: func(count int) func(time.Duration) (*kafka.Message, error) { return func(timeout time.Duration) (*kafka.Message, error) { count = count - 1 if count >= 0 { return nil, errors.New("error") } return &kafka.Message{Value: []byte(`{}`), TopicPartition: kafka.TopicPartition{Topic: sptr("unexpected")}}, nil } }(1), }, }, args: args{ topic: "bookingsystem.fct.booking.1", fn: func(msg []byte) error { return errors.New("error") }, }, wantErr: true, wantLogged: []string{"ERROR: Consumer error: error ()"}, }, { name: "error commiting", fields: fields{ consumer: &MockConsumer{ subscribe: func(topics []string, rebalanceCb kafka.RebalanceCb) (err error) { return nil }, read: func(count int) func(time.Duration) (*kafka.Message, error) { return func(timeout time.Duration) (*kafka.Message, error) { count = count - 1 if count >= 0 { return nil, errors.New("error") } return &kafka.Message{Value: []byte(`{}`), TopicPartition: kafka.TopicPartition{Topic: sptr("bookingsystem.fct.booking.1")}}, nil } }(1), commit: func(msg *kafka.Message) ([]kafka.TopicPartition, error) { return nil, errors.New("error") }, }, }, args: args{ topic: "bookingsystem.fct.booking.1", fn: func(msg []byte) error { return nil }, }, wantErr: true, wantLogged: []string{"ERROR: Consumer error: error ()"}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { logger := &MockLogger{} k := &defaultClient{ consumer: tt.fields.consumer, log: logger, } if err := k.Consume(map[string]func(msg []byte) error{ tt.args.topic: tt.args.fn, }); (err != nil) != tt.wantErr { t.Errorf("Consume() error = %v, wantErr %v", err, tt.wantErr) } logger.Check(t, tt.wantLogged) }) } } func TestDefaultClient_Send(t *testing.T) { type fields struct { producer Producer } type args struct { topic string msg interface{} } tests := []struct { name string fields fields args args events []kafka.Event wantErr bool wantLogged []string }{ { name: "error marshalling", fields: fields{ producer: NewProducer(func() {}), }, args: args{ topic: "bookingsystem.fct.booking.1", msg: BrokenMarshal(true), }, wantErr: true, }, { name: "topic partition error is returned", fields: fields{ producer: NewProducer(func() {}), }, args: args{ topic: "bookingsystem.fct.booking.1", msg: "blutti", }, events: []kafka.Event{&kafka.Message{TopicPartition: kafka.TopicPartition{Error: errors.New("error")}}}, wantErr: true, wantLogged: []string{"INFO: Delivery failed: error"}, }, { name: "success", fields: fields{ producer: NewProducer(func() {}), }, args: args{ topic: "bookingsystem.fct.booking.1", msg: "blutti", }, events: []kafka.Event{IgnoredEvent(true), &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: sptr("topic"), Partition: 1, Offset: kafka.Offset(23)}}}, wantErr: false, wantLogged: []string{ "INFO: Ignored event: ignored", "INFO: Delivered message to topic topic [1] at offset 23", }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { logger := &MockLogger{} k := &defaultClient{ producer: tt.fields.producer, log: logger, } if len(tt.events) > 0 { go func() { time.Sleep(time.Second) for _, e := range tt.events { tt.fields.producer.Events() <- e } }() } if err := k.Send(tt.args.topic, tt.args.msg); (err != nil) != tt.wantErr { t.Errorf("Send() error = %v, wantErr %v", err, tt.wantErr) } logger.Check(t, tt.wantLogged) }) } } func TestKafka_ConsumerErrorReturns(t *testing.T) { k := &defaultClient{ consumer: &MockConsumer{ read: func(timeout time.Duration) (*kafka.Message, error) { return &kafka.Message{}, kafka.NewError(kafka.ErrUnknownTopicOrPart, "test", false) }, subscribe: func(topics []string, rebalanceCb kafka.RebalanceCb) (err error) { return nil }, }, } handlers := map[string]func(msg []byte) error{ "topic": func(msg []byte) error { return nil }, } err := k.Consume(handlers) if err == nil { t.Error("error expected") } } func TestDefaultClient_Close(t *testing.T) { consumerClosed := false producerClosed := false k := &defaultClient{ consumer: &MockConsumer{close: func() error { consumerClosed = true return nil }}, producer: NewProducer(func() { producerClosed = true }), } k.Close() if !consumerClosed || !producerClosed { t.Error("Close() expected close to be called on both consumer and producer but wasn't") } } type MockConsumer struct { subscribe func(topics []string, rebalanceCb kafka.RebalanceCb) (err error) read func(timeout time.Duration) (*kafka.Message, error) commit func(msg *kafka.Message) ([]kafka.TopicPartition, error) close func() error } func (m MockConsumer) SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) (err error) { return m.subscribe(topics, rebalanceCb) } func (m MockConsumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) { return m.read(timeout) } func (m MockConsumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, error) { return m.commit(msg) } func (m MockConsumer) Close() error { return m.close() } var _ Consumer = &MockConsumer{} type MockProducer struct { events chan kafka.Event produce chan *kafka.Message close func() } func (m *MockProducer) Events() chan kafka.Event { return m.events } func (m *MockProducer) ProduceChannel() chan *kafka.Message { return m.produce } func (m *MockProducer) Close() { m.close() } var _ Producer = &MockProducer{} func NewProducer(close func()) *MockProducer { return &MockProducer{ events: make(chan kafka.Event, 10), produce: make(chan *kafka.Message, 10), close: close, } } type BrokenMarshal bool func (BrokenMarshal) MarshalJSON() ([]byte, error) { return nil, errors.New("error") } type IgnoredEvent bool func (IgnoredEvent) String() string { return "ignored" } func sptr(s string) *string { return &s } type MockLogger struct { Logged []string } func (m *MockLogger) Check(t *testing.T, wantLogged []string) { t.Helper() if len(m.Logged) != 0 || len(wantLogged) != 0 { got := litter.Sdump(m.Logged) want := litter.Sdump(wantLogged) if got != want { t.Errorf("MockLogger() got %s, want %s", got, want) } } } func (m *MockLogger) Debug(s string) { m.Logged = append(m.Logged, fmt.Sprintf("DEBUG: %s", s)) } func (m *MockLogger) Info(s string) { m.Logged = append(m.Logged, fmt.Sprintf("INFO: %s", s)) } func (m *MockLogger) Warn(s string) { m.Logged = append(m.Logged, fmt.Sprintf("WARN: %s", s)) } func (m *MockLogger) Error(s string) { m.Logged = append(m.Logged, fmt.Sprintf("ERROR: %s", s)) } func (m *MockLogger) Fatal(s string) { m.Logged = append(m.Logged, fmt.Sprintf("FATAL: %s", s)) } func (m *MockLogger) Debugf(s string, i ...interface{}) { m.Logged = append(m.Logged, fmt.Sprintf("DEBUG: %s", fmt.Sprintf(s, i...))) } func (m *MockLogger) Infof(s string, i ...interface{}) { m.Logged = append(m.Logged, fmt.Sprintf("INFO: %s", fmt.Sprintf(s, i...))) } func (m *MockLogger) Warnf(s string, i ...interface{}) { m.Logged = append(m.Logged, fmt.Sprintf("WARN: %s", fmt.Sprintf(s, i...))) } func (m *MockLogger) Errorf(s string, i ...interface{}) { m.Logged = append(m.Logged, fmt.Sprintf("ERROR: %s", fmt.Sprintf(s, i...))) } func (m *MockLogger) Fatalf(s string, i ...interface{}) { m.Logged = append(m.Logged, fmt.Sprintf("FATAL: %s", fmt.Sprintf(s, i...))) } var _ Logger = &MockLogger{}