Имеется продюсер, который пишет в топик кафки сообщения нетфлоу версии 9, далее я реализовал потребителя который в 4 горутины обрабатывает сообщения из кафки, так как я новичок в это деле хотелось бы узнать как правильно все это сделать? Создать 4 подраздела с одной темой и на каждый подраздел сделать потребителя в горутине? или же оставить все в одном потоке? Хотелось бы услышать совета, как правильно все реализовать код ниже...
package main
import (
"context"
"database/sql"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/kshvakov/clickhouse"
"github.com/segmentio/kafka-go"
"log"
"net"
netflow "netflowproject/pb-ext"
"sync"
"time"
)
const (
DBURL = "127.0.0.1"
USER = "default"
PASS = "123"
DBNAME = "netflow"
)
type Worker struct {
messageCount int
last time.Time
dur time.Duration
db *sql.DB
flows [][]interface{}
consumer func(func()*kafka.Reader,chan<- kafka.Message)
}
func (this *Worker)Buffer(msg *kafka.Message)(bool,error){
this.messageCount += 1
var flowMSG netflow.FlowMessage
err := proto.Unmarshal(msg.Value,&flowMSG)
if err == nil{
src_ip := net.IP(flowMSG.SrcIP).String()
dst_ip := net.IP(flowMSG.DstIP).String()
next_hop := net.IP(flowMSG.NextHop).String()
unzip := []interface{}{time.Now(),time.Now(),src_ip,dst_ip, int32(flowMSG.SrcPort), int32(flowMSG.DstPort),
next_hop,flowMSG.Bytes, flowMSG.Packets,uint8(flowMSG.Proto),int32(flowMSG.Etype),flowMSG.SamplingRate,flowMSG.SequenceNum}
this.flows = append(this.flows,unzip)
}else{
log.Printf("unmarshaling error: ",err)
}
return false,nil
}
func (this *Worker)Flush()bool{
fmt.Printf("Processed %d records in the last iteration.\n", this.messageCount)
this.messageCount = 0
tx, err := this.db.Begin()
if err != nil{
log.Fatal("Could not open transaction.", err)
}
stmt, err := tx.Prepare(
"INSERT INTO flow (date,time,src_ip,dst_ip,src_port,dst_port,next_hop,bytes,packet,protocol,etype,sampling_rate,sequencer_num) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)")
if err != nil {
log.Fatal(err)
}
for _, curFlow := range this.flows{
_, err := stmt.Exec(curFlow...)
if err != nil{
fmt.Println(err)
}
}
if err := tx.Commit();err!=nil{
fmt.Println(err)
}
this.flows = make([][]interface{},0)
return true
}
func main(){
var(
workersCount = 4
ch = make(chan kafka.Message,10000)
wg sync.WaitGroup
count = 0
FlushCount = 10000
FlushTime = "5s"
)
dur, _ := time.ParseDuration(FlushTime)
timer := time.After(dur)
wg.Add(workersCount)
for i:=1; i <= workersCount; i++{
go func() {
w := &Worker{
last: time.Time{},
db: dbConn(DBURL,USER, PASS ,DBNAME),
consumer: kafkaMessage,
}
go w.consumer(GetKafkaReader,ch)
for {
select {
case <-timer:
w.Flush()
timer = time.After(dur)
case msg, ok := <- ch:
if ok {
flush, err := w.Buffer(&msg)
if flush{
w.Flush()
}
if err != nil {
log.Fatal("Error while porcessing: %v", err)
}
count++
if count == FlushCount{
w.Flush()
count = 0
}
}
}
}
}()
defer wg.Done()
}
wg.Wait()
}
var kafkaMessage = func(
reader func()*kafka.Reader,
messageCh chan<- kafka.Message){
r := reader()
r.SetOffset(47)
for {
m, err := r.ReadMessage(context.Background())
if err != nil{fmt.Println(err)}
messageCh <- m
}
}
var dbConn = func(dbURL,user, password ,dbNAME string)(*sql.DB){
dataSourceName := fmt.Sprintf("tcp://%s:9000?username=%s&password=%s&database=%s&debug=false",
dbURL,user,password,dbNAME)
connect, err := sql.Open("clickhouse",dataSourceName)
if err != nil {
log.Fatal(err)
}
if err := connect.Ping(); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
log.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
} else {
log.Println(err)
}
return nil
}
return connect
}
func GetKafkaReader()*kafka.Reader{
return kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"127.0.0.1:9092"},
Topic: "netflow",
Partition: 0,
MinBytes: 10e3,
MaxBytes: 10e6,
})
}