diff options
author | Baitinq <manuelpalenzuelamerino@gmail.com> | 2024-05-04 21:22:54 +0200 |
---|---|---|
committer | Baitinq <manuelpalenzuelamerino@gmail.com> | 2024-05-04 21:22:54 +0200 |
commit | 642195d99e0e8fa862ec476573a7064d91ffce36 (patch) | |
tree | b178f045fbf6730161b9f4335604df1825f14c23 | |
parent | payload-processor: add concurrency (diff) | |
download | fs-tracer-backend-642195d99e0e8fa862ec476573a7064d91ffce36.tar.gz fs-tracer-backend-642195d99e0e8fa862ec476573a7064d91ffce36.tar.bz2 fs-tracer-backend-642195d99e0e8fa862ec476573a7064d91ffce36.zip |
payload-processor: set up testing
-rw-r--r-- | MODULE.bazel | 1 | ||||
-rw-r--r-- | go.mod | 9 | ||||
-rw-r--r-- | go.sum | 4 | ||||
-rw-r--r-- | src/payload-processor/processor/BUILD.bazel | 12 | ||||
-rw-r--r-- | src/payload-processor/processor/processor.go | 11 | ||||
-rw-r--r-- | src/payload-processor/processor/processor_test.go | 18 |
6 files changed, 51 insertions, 4 deletions
diff --git a/MODULE.bazel b/MODULE.bazel index caa1d1b..c4a151b 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -28,4 +28,5 @@ go_deps.from_file(go_mod = "//:go.mod") use_repo( go_deps, "com_github_segmentio_kafka_go", + "com_github_stretchr_testify", ) diff --git a/go.mod b/go.mod index f42c9cf..855f601 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,14 @@ module github.com/Baitinq/fs-tracer-backend go 1.22.2 require ( + github.com/segmentio/kafka-go v0.4.47 + github.com/stretchr/testify v1.9.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/klauspost/compress v1.15.9 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect - github.com/segmentio/kafka-go v0.4.47 + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 3c1b808..ed6fcfd 100644 --- a/go.sum +++ b/go.sum @@ -12,8 +12,9 @@ github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVO github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= @@ -62,6 +63,7 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/src/payload-processor/processor/BUILD.bazel b/src/payload-processor/processor/BUILD.bazel index 123059d..f957fd3 100644 --- a/src/payload-processor/processor/BUILD.bazel +++ b/src/payload-processor/processor/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "processor", @@ -7,3 +7,13 @@ go_library( visibility = ["//visibility:public"], deps = ["@com_github_segmentio_kafka_go//:kafka-go"], ) + +go_test( + name = "processor_test", + srcs = ["processor_test.go"], + embed = [":processor"], + deps = [ + "@com_github_segmentio_kafka_go//:kafka-go", + "@com_github_stretchr_testify//require", + ], +) diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index f2e4e80..fc6792a 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -63,7 +63,16 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { cancel() break } - fmt.Printf("(%s): message at offset %d: %s = %s\n", time.Now().String(), m.Offset, string(m.Key), string(m.Value)) + err = p.handleMessage(m) + if err != nil { + log.Println("failed to handle message:", err) + continue + } p.kafka_reader.CommitMessages(ctx, m) } } + +func (p Processor) handleMessage(m kafka.Message) error { + fmt.Printf("(%s): message at offset %d: %s = %s\n", time.Now().String(), m.Offset, string(m.Key), string(m.Value)) + return nil +} diff --git a/src/payload-processor/processor/processor_test.go b/src/payload-processor/processor/processor_test.go new file mode 100644 index 0000000..ee2ef5f --- /dev/null +++ b/src/payload-processor/processor/processor_test.go @@ -0,0 +1,18 @@ +package processor + +import ( + "testing" + + "github.com/segmentio/kafka-go" + "github.com/stretchr/testify/require" +) + +func TestProcessMessage(t *testing.T) { + processor := Processor{} + + message := []byte("test") + + err := processor.handleMessage(kafka.Message{Value: message}) + + require.NoError(t, err) +} |