about summary refs log tree commit diff
path: root/src/payload-processor/processor/processor.go
blob: 1203feada5e92e878b83679873abd0579c14d44e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package processor

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/Baitinq/fs-tracer-backend/lib"
	"github.com/jmoiron/sqlx"
	"github.com/segmentio/kafka-go"
)

type Processor struct {
	kafka_reader *kafka.Reader
	db           DB
	concurrency  int
}

func NewProcessor(kafka_reader *kafka.Reader, db *sqlx.DB, concurrency int) Processor {
	log.Println("Created processor with concurrency: ", concurrency)
	return Processor{
		kafka_reader: kafka_reader,
		db:           NewDB(db),
		concurrency:  concurrency,
	}
}

func (p Processor) ProcessMessages() {
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL)

	ctx, cancel := context.WithCancel(context.Background())

	// go routine for getting signals asynchronously
	go func() {
		sig := <-signals
		log.Println("Got signal: ", sig)
		cancel()
	}()

	wg := sync.WaitGroup{}
	wg.Add(p.concurrency)
	for i := 0; i < p.concurrency; i++ {
		go func() {
			defer wg.Done()
			p.process(ctx, cancel)
		}()
	}

	wg.Wait()

	if err := p.kafka_reader.Close(); err != nil {
		log.Fatal("failed to close reader:", err)
	}
}

func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
	for {
		m, err := p.kafka_reader.FetchMessage(ctx)
		if err != nil {
			log.Println("failed to fetch message:", err)
			cancel()
			break
		}

		user_id, err := getHeaderValue(m.Headers, "user_id")
		if err != nil {
			log.Fatal("failed to get user_id from headers:", err)
		}

		// TODO: Remove after testing
		if string(m.Value) == "" {
			m.Value = []byte(fmt.Sprintf(`{
				"absolute_path": "/home/user/file.txt",
				"contents": "Hello, World!",
				"timestamp": "%s"
			}`, time.Now().Format(time.RFC3339)))
		}

		err = p.handleMessage(ctx, m, string(user_id))
		if err != nil {
			log.Println("failed to handle message:", err)
			p.handleError(ctx, m, err)
			return
		}
		p.kafka_reader.CommitMessages(ctx, m)
	}
}

func (p Processor) handleMessage(ctx context.Context, m kafka.Message, user_id string) error {
	fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s, user_id = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value), user_id)

	var file lib.File
	err := json.Unmarshal(m.Value, &file)
	if err != nil {
		return err
	}

	err = p.db.InsertFile(ctx, file, user_id)
	if err != nil {
		return err
	}
	return nil
}

func (p Processor) handleError(ctx context.Context, m kafka.Message, err error) {
	switch err {
	// TODO: If its a recoverable error, don't commit.
	default:
		p.kafka_reader.CommitMessages(ctx, m)
	}
}

func getHeaderValue(headers []kafka.Header, key string) ([]byte, error) {
	for _, header := range headers {
		if header.Key == key {
			return header.Value, nil
		}
	}
	return []byte{}, fmt.Errorf("Header %s not found", key)
}