@jajabin

Как определить правильное количество подразделов Apache Kafka?

Имеется продюсер, который пишет в топик кафки сообщения нетфлоу версии 9, далее я реализовал потребителя который в 4 горутины обрабатывает сообщения из кафки, так как я новичок в это деле хотелось бы узнать как правильно все это сделать? Создать 4 подраздела с одной темой и на каждый подраздел сделать потребителя в горутине? или же оставить все в одном потоке? Хотелось бы услышать совета, как правильно все реализовать код ниже...
package main

import (
	"context"
	"database/sql"
	"fmt"
	"github.com/golang/protobuf/proto"
	"github.com/kshvakov/clickhouse"
	"github.com/segmentio/kafka-go"
	"log"
	"net"
	netflow "netflowproject/pb-ext"
	"sync"
	"time"
)

const (
	DBURL = "127.0.0.1"
	USER  = "default"
	PASS = "123"
	DBNAME = "netflow"
)

type Worker struct {
	messageCount int
	last 		time.Time
	dur			time.Duration
	db 			*sql.DB
	flows		[][]interface{}
	consumer	func(func()*kafka.Reader,chan<- kafka.Message)
}
func (this *Worker)Buffer(msg *kafka.Message)(bool,error){
	this.messageCount += 1
	var flowMSG netflow.FlowMessage
	err := proto.Unmarshal(msg.Value,&flowMSG)
	if err == nil{
		src_ip   := net.IP(flowMSG.SrcIP).String()
		dst_ip   := net.IP(flowMSG.DstIP).String()
		next_hop := net.IP(flowMSG.NextHop).String()

		unzip := []interface{}{time.Now(),time.Now(),src_ip,dst_ip, int32(flowMSG.SrcPort), int32(flowMSG.DstPort),
			next_hop,flowMSG.Bytes, flowMSG.Packets,uint8(flowMSG.Proto),int32(flowMSG.Etype),flowMSG.SamplingRate,flowMSG.SequenceNum}

		this.flows = append(this.flows,unzip)
	}else{
		log.Printf("unmarshaling error: ",err)
	}
	return false,nil
}

func (this *Worker)Flush()bool{
	fmt.Printf("Processed %d records in the last iteration.\n", this.messageCount)
	this.messageCount = 0
	tx, err := this.db.Begin()
	if err != nil{
		log.Fatal("Could not open transaction.", err)
	}
	stmt, err := tx.Prepare(
		"INSERT INTO flow (date,time,src_ip,dst_ip,src_port,dst_port,next_hop,bytes,packet,protocol,etype,sampling_rate,sequencer_num) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)")
	if err != nil {
		log.Fatal(err)
	}
	for _, curFlow := range this.flows{
		_, err := stmt.Exec(curFlow...)
		if err != nil{
			fmt.Println(err)
		}
	}

	if err := tx.Commit();err!=nil{
		fmt.Println(err)
	}

	this.flows = make([][]interface{},0)
	return true

}


func main(){

	var(
		workersCount = 4
		ch = make(chan kafka.Message,10000)
	 	wg sync.WaitGroup
		count = 0
		FlushCount = 10000
		FlushTime = "5s"

	)
	dur, _ := time.ParseDuration(FlushTime)
	timer := time.After(dur)
	wg.Add(workersCount)
	for i:=1; i <= workersCount; i++{
		go func() {
			w := &Worker{
				last:		time.Time{},
				db:			dbConn(DBURL,USER, PASS ,DBNAME),
				consumer:	kafkaMessage,
			}
			go w.consumer(GetKafkaReader,ch)
			for {
				select {
				case <-timer:
					w.Flush()
					timer = time.After(dur)
				case msg, ok := <- ch:
					if ok {
						flush, err := w.Buffer(&msg)
						if flush{
							w.Flush()
						}
						if err != nil {
							log.Fatal("Error while porcessing: %v", err)
						}
						count++
						if count == FlushCount{
							w.Flush()
							count = 0
						}
					}
				}
			}
		}()
		defer wg.Done()
	}

	wg.Wait()



}

var kafkaMessage = func(
	reader func()*kafka.Reader,
	messageCh chan<- kafka.Message){
	r := reader()
	r.SetOffset(47)
	for {
		m, err := r.ReadMessage(context.Background())
		if err != nil{fmt.Println(err)}
		messageCh <- m
	}

}

var dbConn = func(dbURL,user, password ,dbNAME string)(*sql.DB){
	dataSourceName := fmt.Sprintf("tcp://%s:9000?username=%s&password=%s&database=%s&debug=false",
		dbURL,user,password,dbNAME)
	connect, err := sql.Open("clickhouse",dataSourceName)
	if err != nil {
		log.Fatal(err)
	}
	if err := connect.Ping(); err != nil {
		if exception, ok := err.(*clickhouse.Exception); ok {
			log.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
		} else {
			log.Println(err)
		}
		return nil
	}
	return connect
}

func GetKafkaReader()*kafka.Reader{
	return kafka.NewReader(kafka.ReaderConfig{
		Brokers: []string{"127.0.0.1:9092"},
		Topic:	  "netflow",
		Partition: 0,
		MinBytes: 10e3,
		MaxBytes: 10e6,
	})
}
  • Вопрос задан
  • 163 просмотра
Пригласить эксперта
Ответы на вопрос 1
xEpozZ
@xEpozZ
Веб-разработчик
Я думаю, это зависит от вашей конкретной задачи
oxickaiivr5j6hey5fahjkcvsxc.png
Consumer process может быть сколько угодно, зависит от нагрузки на узел.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы