From f159114dce3383c007585b3c9e3f540c94176a88 Mon Sep 17 00:00:00 2001 From: Peter Svensson Date: Mon, 23 Nov 2020 14:18:15 +0100 Subject: [PATCH] init --- .editorconfig | 11 ++ .gitignore | 1 + .gitlab-ci.yml | 50 ++++++ LICENSE | 21 +++ README.md | 10 ++ go.mod | 11 ++ go.sum | 80 ++++++++++ kafka.go | 175 +++++++++++++++++++++ kafka_test.go | 420 +++++++++++++++++++++++++++++++++++++++++++++++++ logger.go | 52 ++++++ 10 files changed, 831 insertions(+) create mode 100644 .editorconfig create mode 100644 .gitignore create mode 100644 .gitlab-ci.yml create mode 100644 LICENSE create mode 100644 README.md create mode 100644 go.mod create mode 100644 go.sum create mode 100644 kafka.go create mode 100644 kafka_test.go create mode 100644 logger.go diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..06f4bc7 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,11 @@ +root = true + +[*] +end_of_line = lf +insert_final_newline = true +charset = utf-8 +trim_trailing_whitespace = true + +[*.go] +indent_style = tab +indent_size = 4 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..485dee6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000..7e548ff --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,50 @@ +# Copyright (c) 2020 Unbound Software Development Svenska AB +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +# the Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +image: golang:1.15 + +variables: + GOFLAGS: -mod=readonly + +stages: + - deps + - test + +deps: + stage: deps + script: + - go mod download + +test: + stage: test + dependencies: + - deps + script: + - go get golang.org/x/lint/golint + - go fmt $(go list ./...) + - go vet $(go list ./...) + - golint -set_exit_status ./... + - CGO_ENABLED=1 go test -mod=readonly -race -coverprofile=coverage.txt -covermode=atomic -coverpkg=$(go list ./... | tr '\n' , | sed 's/,$//') ./... + - go tool cover -html=coverage.txt -o coverage.html + - go tool cover -func=coverage.txt + - bash <(curl -s https://codecov.io/bash) + artifacts: + paths: + - coverage.html + - coverage.txt diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..1501635 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2020 Unbound Software Development Svenska AB + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..1b397bb --- /dev/null +++ b/README.md @@ -0,0 +1,10 @@ +# gokafka + +[![GoReportCard](https://goreportcard.com/badge/gitlab.com/unboundsoftware/go-kafka)](https://goreportcard.com/report/gitlab.com/unboundsoftware/go-kafka) [![GoDoc](https://godoc.org/gitlab.com/unboundsoftware/go-kafka?status.svg)](https://godoc.org/gitlab.com/unboundsoftware/go-kafka) [![Build Status](https://gitlab.com/unboundsoftware/go-kafka/badges/master/pipeline.svg)](https://gitlab.com/unboundsoftware/go-kafka/commits/master)[![coverage report](https://gitlab.com/unboundsoftware/go-kafka/badges/master/coverage.svg)](https://gitlab.com/unboundsoftware/go-kafka/commits/master) + + + +Download: +```shell +go get gitlab.com/unboundsoftware/kafka +``` diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..f170a22 --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module gitlab.com/unboundsoftware/go-kafka + +go 1.15 + +require ( + github.com/apex/log v1.9.0 + github.com/confluentinc/confluent-kafka-go v1.4.2 // indirect + github.com/pkg/errors v0.9.1 + github.com/sanity-io/litter v1.3.0 + gopkg.in/confluentinc/confluent-kafka-go.v1 v1.5.2 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..3d92a63 --- /dev/null +++ b/go.sum @@ -0,0 +1,80 @@ +github.com/apex/log v1.9.0 h1:FHtw/xuaM8AgmvDDTI9fiwoAL25Sq2cxojnZICUU8l0= +github.com/apex/log v1.9.0/go.mod h1:m82fZlWIuiWzWP04XCTXmnX0xRkYYbCdYn8jbJeLBEA= +github.com/apex/logs v1.0.0/go.mod h1:XzxuLZ5myVHDy9SAmYpamKKRNApGj54PfYLcFrXqDwo= +github.com/aphistic/golf v0.0.0-20180712155816-02c07f170c5a/go.mod h1:3NqKYiepwy8kCu4PNA+aP7WUV72eXWJeP9/r3/K9aLE= +github.com/aphistic/sweet v0.2.0/go.mod h1:fWDlIh/isSE9n6EPsRmC0det+whmX6dJid3stzu0Xys= +github.com/aws/aws-sdk-go v1.20.6/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59/go.mod h1:q/89r3U2H7sSsE2t6Kca0lfwTK8JdoNGS/yzM/4iH5I= +github.com/confluentinc/confluent-kafka-go v1.4.2 h1:13EK9RTujF7lVkvHQ5Hbu6bM+Yfrq8L0MkJNnjHSd4Q= +github.com/confluentinc/confluent-kafka-go v1.4.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= +github.com/confluentinc/confluent-kafka-go v1.5.2 h1:l+qt+a0Okmq0Bdr1P55IX4fiwFJyg0lZQmfHkAFkv7E= +github.com/confluentinc/confluent-kafka-go v1.5.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= +github.com/davecgh/go-spew v0.0.0-20161028175848-04cdfd42973b/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= +github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/fastuuid v1.1.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= +github.com/sanity-io/litter v1.3.0 h1:5ZO+weUsqdSWMUng5JnpkW/Oz8iTXiIdeumhQr1sSjs= +github.com/sanity-io/litter v1.3.0/go.mod h1:5Z71SvaYy5kcGtyglXOC9rrUi3c1E8CamFWjQsazTh0= +github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= +github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM= +github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM= +github.com/smartystreets/gunit v1.0.0/go.mod h1:qwPWnhz6pn0NnRBP++URONOVyNkPyr4SauJk4cUOwJs= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v0.0.0-20161117074351-18a02ba4a312/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tj/assert v0.0.0-20171129193455-018094318fb0/go.mod h1:mZ9/Rh9oLWpLLDRpvE+3b7gP/C2YyLFYxNmcLnPTMe0= +github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk= +github.com/tj/go-buffer v1.1.0/go.mod h1:iyiJpfFcR2B9sXu7KvjbT9fpM4mOelRSDTbntVj52Uc= +github.com/tj/go-elastic v0.0.0-20171221160941-36157cbbebc2/go.mod h1:WjeM0Oo1eNAjXGDx2yma7uG2XoyRZTq1uv3M/o7imD0= +github.com/tj/go-kinesis v0.0.0-20171128231115-08b17f58cb1b/go.mod h1:/yhzCV0xPfx6jb1bBgRFjl5lytqVqZXEaeqWP8lTEao= +github.com/tj/go-spin v1.1.0/go.mod h1:Mg1mzmePZm4dva8Qz60H2lHwmJ2loum4VIrLgVnKwh4= +gitlab.com/unboundsoftware/eventsourced/eventsourced v1.6.2/go.mod h1:Mq3mnpRLakpU9IP6zEkV8dzye6XUd0vywv7BJMXqsDA= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/confluentinc/confluent-kafka-go.v1 v1.5.2 h1:g0WBLy6fobNUU8W/e9zx6I0Yl79Ya+BDW1NwzAlTiiQ= +gopkg.in/confluentinc/confluent-kafka-go.v1 v1.5.2/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kafka.go b/kafka.go new file mode 100644 index 0000000..8bf8321 --- /dev/null +++ b/kafka.go @@ -0,0 +1,175 @@ +package kafka + +import ( + "encoding/json" + "fmt" + "github.com/pkg/errors" + "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" + "time" +) + +// Client represents the Kafka client +type Client interface { + Consume(topics map[string]func(msg []byte) error) error + Send(topic string, msg interface{}) error + Close() +} + +// Consumer represents the Kafka consumer +type Consumer interface { + SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) (err error) + ReadMessage(timeout time.Duration) (*kafka.Message, error) + CommitMessage(m *kafka.Message) ([]kafka.TopicPartition, error) + Close() error +} + +var _ Consumer = &kafka.Consumer{} + +// Producer represents the Kafka producer +type Producer interface { + Events() chan kafka.Event + ProduceChannel() chan *kafka.Message + Close() +} + +var _ Producer = &kafka.Producer{} + +type defaultClient struct { + log Logger + consumer Consumer + producer Producer +} + +func (k *defaultClient) Consume(topics map[string]func(msg []byte) error) error { + s := make([]string, 0, len(topics)) + for k := range topics { + s = append(s, k) + } + err := k.consumer.SubscribeTopics(s, nil) + if err != nil { + return err + } + + for { + msg, err := k.consumer.ReadMessage(-1) + if err == nil { + fn, exists := topics[*msg.TopicPartition.Topic] + if !exists { + return fmt.Errorf("func for topic %s is not provided", *msg.TopicPartition.Topic) + } + if err := fn(msg.Value); err != nil { + return err + } + _, err := k.consumer.CommitMessage(msg) + if err != nil { + return err + } + } else { + // The client will automatically try to recover from all errors. + k.log.Errorf("Consumer error: %v (%v)", err, msg) + } + } +} + +func (k *defaultClient) Send(topic string, msg interface{}) error { + value, err := json.Marshal(msg) + if err != nil { + return err + } + + doneChan := make(chan error) + + go func() { + defer close(doneChan) + for e := range k.producer.Events() { + switch ev := e.(type) { + case *kafka.Message: + m := ev + if m.TopicPartition.Error != nil { + k.log.Infof("Delivery failed: %v", m.TopicPartition.Error) + doneChan <- m.TopicPartition.Error + } else { + k.log.Infof("Delivered message to topic %s [%d] at offset %v", + *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) + } + return + + default: + k.log.Infof("Ignored event: %s", ev) + } + } + }() + + k.producer.ProduceChannel() <- &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: value} + + // wait for delivery report goroutine to finish + return <-doneChan +} + +func (k *defaultClient) Close() { + _ = k.consumer.Close() + k.producer.Close() +} + +var _ Client = &defaultClient{} + +// New creates a new kafka Client +func New(bootstrapServers, serviceName, username, password string, log Logger) (Client, error) { + if log == nil { + log = &noopLogger{} + } + consumerConfig := &kafka.ConfigMap{ + "bootstrap.servers": bootstrapServers, + "broker.address.family": "v4", + "client.id": serviceName, + "group.id": serviceName, + "auto.offset.reset": "earliest", + "message.timeout.ms": "1000", + "enable.auto.commit": false, + } + producerConfig := &kafka.ConfigMap{ + "bootstrap.servers": bootstrapServers, + "broker.address.family": "v4", + "client.id": serviceName, + "message.timeout.ms": "1000", + } + if username != "" && password != "" { + _ = consumerConfig.SetKey("security.protocol", "SASL_SSL") + _ = consumerConfig.SetKey("sasl.mechanisms", "PLAIN") + _ = consumerConfig.SetKey("sasl.username", username) + _ = consumerConfig.SetKey("sasl.password", password) + _ = producerConfig.SetKey("security.protocol", "SASL_SSL") + _ = producerConfig.SetKey("sasl.mechanisms", "PLAIN") + _ = producerConfig.SetKey("sasl.username", username) + _ = producerConfig.SetKey("sasl.password", password) + } + return connect( + consumerConfig, + producerConfig, + log, + ) +} + +func connect(consumerConfig, producerConfig *kafka.ConfigMap, log Logger) (Client, error) { + c, err := kafka.NewConsumer(consumerConfig) + if err != nil { + return nil, errors.Wrap(err, "failed to create consuner") + } + + p, err := kafka.NewProducer(producerConfig) + if err != nil { + return nil, errors.Wrap(err, "failed to create producer") + } + + _, err = c.GetMetadata(nil, true, 1000) + + if err != nil { + return nil, errors.Wrapf(err, "failed to get meta data from kafka with config: %+v", consumerConfig) + } + + return &defaultClient{ + log: log, + consumer: c, + producer: p, + }, nil +} diff --git a/kafka_test.go b/kafka_test.go new file mode 100644 index 0000000..6f02ab9 --- /dev/null +++ b/kafka_test.go @@ -0,0 +1,420 @@ +package kafka + +import ( + "errors" + "fmt" + "github.com/sanity-io/litter" + "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" + "reflect" + "testing" + "time" +) + +func Test_connect(t *testing.T) { + type args struct { + consumerConfig *kafka.ConfigMap + producerConfig *kafka.ConfigMap + } + tests := []struct { + name string + args args + want Client + wantErr bool + }{ + { + name: "invalid consumer config", + args: args{ + consumerConfig: &kafka.ConfigMap{}, + }, + want: nil, + wantErr: true, + }, + { + name: "invalid producer config", + args: args{ + consumerConfig: &kafka.ConfigMap{"group.id": "test"}, + producerConfig: &kafka.ConfigMap{"delivery.report.only.error": "abc"}, + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := connect(tt.args.consumerConfig, tt.args.producerConfig, nil) + if (err != nil) != tt.wantErr { + t.Errorf("connect() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("connect() got = %v, want %v", got, tt.want) + } + }) + } +} + +func TestDefaultClient_Consume(t *testing.T) { + type fields struct { + consumer Consumer + } + type args struct { + topic string + fn func(msg []byte) error + } + tests := []struct { + name string + fields fields + args args + wantErr bool + wantLogged []string + }{ + { + name: "error subscribing to topics", + fields: fields{ + consumer: &MockConsumer{ + subscribe: func(topics []string, rebalanceCb kafka.RebalanceCb) (err error) { + expected := []string{"bookingsystem.fct.booking.1"} + if !reflect.DeepEqual(topics, expected) { + t.Errorf("Consume() got %s, want %s", topics, expected) + } + return errors.New("error") + }, + }, + }, + args: args{ + topic: "bookingsystem.fct.booking.1", + fn: func(msg []byte) error { + return nil + }, + }, + wantErr: true, + }, + { + name: "error reading message", + fields: fields{ + consumer: &MockConsumer{ + subscribe: func(topics []string, rebalanceCb kafka.RebalanceCb) (err error) { + return nil + }, + read: func(count int) func(time.Duration) (*kafka.Message, error) { + return func(timeout time.Duration) (*kafka.Message, error) { + count = count - 1 + if count >= 0 { + return nil, errors.New("error") + } + return &kafka.Message{Value: []byte(`{"a":`), TopicPartition: kafka.TopicPartition{Topic: sptr("bookingsystem.fct.booking.1")}}, nil + + } + }(1), + }, + }, + args: args{ + topic: "bookingsystem.fct.booking.1", + fn: func(msg []byte) error { + return errors.New("error") + }, + }, + wantErr: true, + wantLogged: []string{"ERROR: Consumer error: error ()"}, + }, + { + name: "no func for topic", + fields: fields{ + consumer: &MockConsumer{ + subscribe: func(topics []string, rebalanceCb kafka.RebalanceCb) (err error) { + return nil + }, + read: func(count int) func(time.Duration) (*kafka.Message, error) { + return func(timeout time.Duration) (*kafka.Message, error) { + count = count - 1 + if count >= 0 { + return nil, errors.New("error") + } + return &kafka.Message{Value: []byte(`{}`), TopicPartition: kafka.TopicPartition{Topic: sptr("unexpected")}}, nil + + } + }(1), + }, + }, + args: args{ + topic: "bookingsystem.fct.booking.1", + fn: func(msg []byte) error { + return errors.New("error") + }, + }, + wantErr: true, + wantLogged: []string{"ERROR: Consumer error: error ()"}, + }, + { + name: "error commiting", + fields: fields{ + consumer: &MockConsumer{ + subscribe: func(topics []string, rebalanceCb kafka.RebalanceCb) (err error) { + return nil + }, + read: func(count int) func(time.Duration) (*kafka.Message, error) { + return func(timeout time.Duration) (*kafka.Message, error) { + count = count - 1 + if count >= 0 { + return nil, errors.New("error") + } + return &kafka.Message{Value: []byte(`{}`), TopicPartition: kafka.TopicPartition{Topic: sptr("bookingsystem.fct.booking.1")}}, nil + + } + }(1), + commit: func(msg *kafka.Message) ([]kafka.TopicPartition, error) { + return nil, errors.New("error") + }, + }, + }, + args: args{ + topic: "bookingsystem.fct.booking.1", + fn: func(msg []byte) error { + return nil + }, + }, + wantErr: true, + wantLogged: []string{"ERROR: Consumer error: error ()"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger := &MockLogger{} + k := &defaultClient{ + consumer: tt.fields.consumer, + log: logger, + } + if err := k.Consume(map[string]func(msg []byte) error{ + tt.args.topic: tt.args.fn, + }); (err != nil) != tt.wantErr { + t.Errorf("Consume() error = %v, wantErr %v", err, tt.wantErr) + } + logger.Check(t, tt.wantLogged) + }) + } +} + +func TestDefaultClient_Send(t *testing.T) { + type fields struct { + producer Producer + } + type args struct { + topic string + msg interface{} + } + tests := []struct { + name string + fields fields + args args + events []kafka.Event + wantErr bool + wantLogged []string + }{ + { + name: "error marshalling", + fields: fields{ + producer: NewProducer(func() {}), + }, + args: args{ + topic: "bookingsystem.fct.booking.1", + msg: BrokenMarshal(true), + }, + wantErr: true, + }, + { + name: "topic partition error is returned", + fields: fields{ + producer: NewProducer(func() {}), + }, + args: args{ + topic: "bookingsystem.fct.booking.1", + msg: "blutti", + }, + events: []kafka.Event{&kafka.Message{TopicPartition: kafka.TopicPartition{Error: errors.New("error")}}}, + wantErr: true, + wantLogged: []string{"INFO: Delivery failed: error"}, + }, + { + name: "success", + fields: fields{ + producer: NewProducer(func() {}), + }, + args: args{ + topic: "bookingsystem.fct.booking.1", + msg: "blutti", + }, + events: []kafka.Event{IgnoredEvent(true), &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: sptr("topic"), Partition: 1, Offset: kafka.Offset(23)}}}, + wantErr: false, + wantLogged: []string{ + "INFO: Ignored event: ignored", + "INFO: Delivered message to topic topic [1] at offset 23", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger := &MockLogger{} + k := &defaultClient{ + producer: tt.fields.producer, + log: logger, + } + if len(tt.events) > 0 { + go func() { + time.Sleep(time.Second) + for _, e := range tt.events { + tt.fields.producer.Events() <- e + } + }() + } + if err := k.Send(tt.args.topic, tt.args.msg); (err != nil) != tt.wantErr { + t.Errorf("Send() error = %v, wantErr %v", err, tt.wantErr) + } + logger.Check(t, tt.wantLogged) + }) + } +} + +func TestDefaultClient_Close(t *testing.T) { + consumerClosed := false + producerClosed := false + k := &defaultClient{ + consumer: &MockConsumer{close: func() error { + consumerClosed = true + return nil + }}, + producer: NewProducer(func() { + producerClosed = true + }), + } + k.Close() + + if !consumerClosed || !producerClosed { + t.Error("Close() expected close to be called on both consumer and producer but wasn't") + } +} + +type MockConsumer struct { + subscribe func(topics []string, rebalanceCb kafka.RebalanceCb) (err error) + read func(timeout time.Duration) (*kafka.Message, error) + commit func(msg *kafka.Message) ([]kafka.TopicPartition, error) + close func() error +} + +func (m MockConsumer) SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) (err error) { + return m.subscribe(topics, rebalanceCb) +} + +func (m MockConsumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) { + return m.read(timeout) +} + +func (m MockConsumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, error) { + return m.commit(msg) +} + +func (m MockConsumer) Close() error { + return m.close() +} + +var _ Consumer = &MockConsumer{} + +type MockProducer struct { + events chan kafka.Event + produce chan *kafka.Message + close func() +} + +func (m *MockProducer) Events() chan kafka.Event { + return m.events +} + +func (m *MockProducer) ProduceChannel() chan *kafka.Message { + return m.produce +} + +func (m *MockProducer) Close() { + m.close() +} + +var _ Producer = &MockProducer{} + +func NewProducer(close func()) *MockProducer { + return &MockProducer{ + events: make(chan kafka.Event, 10), + produce: make(chan *kafka.Message, 10), + close: close, + } +} + +type BrokenMarshal bool + +func (BrokenMarshal) MarshalJSON() ([]byte, error) { + return nil, errors.New("error") +} + +type IgnoredEvent bool + +func (IgnoredEvent) String() string { + return "ignored" +} + +func sptr(s string) *string { + return &s +} + +type MockLogger struct { + Logged []string +} + +func (m *MockLogger) Check(t *testing.T, wantLogged []string) { + t.Helper() + if len(m.Logged) != 0 || len(wantLogged) != 0 { + got := litter.Sdump(m.Logged) + want := litter.Sdump(wantLogged) + if got != want { + t.Errorf("MockLogger() got %s, want %s", got, want) + } + } +} + +func (m *MockLogger) Debug(s string) { + m.Logged = append(m.Logged, fmt.Sprintf("DEBUG: %s", s)) +} + +func (m *MockLogger) Info(s string) { + m.Logged = append(m.Logged, fmt.Sprintf("INFO: %s", s)) +} + +func (m *MockLogger) Warn(s string) { + m.Logged = append(m.Logged, fmt.Sprintf("WARN: %s", s)) +} + +func (m *MockLogger) Error(s string) { + m.Logged = append(m.Logged, fmt.Sprintf("ERROR: %s", s)) +} + +func (m *MockLogger) Fatal(s string) { + m.Logged = append(m.Logged, fmt.Sprintf("FATAL: %s", s)) +} + +func (m *MockLogger) Debugf(s string, i ...interface{}) { + m.Logged = append(m.Logged, fmt.Sprintf("DEBUG: %s", fmt.Sprintf(s, i...))) +} + +func (m *MockLogger) Infof(s string, i ...interface{}) { + m.Logged = append(m.Logged, fmt.Sprintf("INFO: %s", fmt.Sprintf(s, i...))) +} + +func (m *MockLogger) Warnf(s string, i ...interface{}) { + m.Logged = append(m.Logged, fmt.Sprintf("WARN: %s", fmt.Sprintf(s, i...))) +} + +func (m *MockLogger) Errorf(s string, i ...interface{}) { + m.Logged = append(m.Logged, fmt.Sprintf("ERROR: %s", fmt.Sprintf(s, i...))) +} + +func (m *MockLogger) Fatalf(s string, i ...interface{}) { + m.Logged = append(m.Logged, fmt.Sprintf("FATAL: %s", fmt.Sprintf(s, i...))) +} + +var _ Logger = &MockLogger{} diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..c71b24d --- /dev/null +++ b/logger.go @@ -0,0 +1,52 @@ +package kafka + +// Logger represents the logging API +// Maps to Apex log interface for convenience +// https://github.com/apex/log/blob/master/interface.go +type Logger interface { + Debug(string) + Info(string) + Warn(string) + Error(string) + Fatal(string) + Debugf(string, ...interface{}) + Infof(string, ...interface{}) + Warnf(string, ...interface{}) + Errorf(string, ...interface{}) + Fatalf(string, ...interface{}) +} + +type noopLogger struct { +} + +func (m *noopLogger) Debug(s string) { +} + +func (m *noopLogger) Info(s string) { +} + +func (m *noopLogger) Warn(s string) { +} + +func (m *noopLogger) Error(s string) { +} + +func (m *noopLogger) Fatal(s string) { +} + +func (m *noopLogger) Debugf(s string, i ...interface{}) { +} + +func (m *noopLogger) Infof(s string, i ...interface{}) { +} + +func (m *noopLogger) Warnf(s string, i ...interface{}) { +} + +func (m *noopLogger) Errorf(s string, i ...interface{}) { +} + +func (m *noopLogger) Fatalf(s string, i ...interface{}) { +} + +var _ Logger = &noopLogger{}