Задать вопрос
@jajabin

Как определить правильное количество подразделов Apache Kafka?

Имеется продюсер, который пишет в топик кафки сообщения нетфлоу версии 9, далее я реализовал потребителя который в 4 горутины обрабатывает сообщения из кафки, так как я новичок в это деле хотелось бы узнать как правильно все это сделать? Создать 4 подраздела с одной темой и на каждый подраздел сделать потребителя в горутине? или же оставить все в одном потоке? Хотелось бы услышать совета, как правильно все реализовать код ниже...
package main

import (
	"context"
	"database/sql"
	"fmt"
	"github.com/golang/protobuf/proto"
	"github.com/kshvakov/clickhouse"
	"github.com/segmentio/kafka-go"
	"log"
	"net"
	netflow "netflowproject/pb-ext"
	"sync"
	"time"
)

const (
	DBURL = "127.0.0.1"
	USER  = "default"
	PASS = "123"
	DBNAME = "netflow"
)

type Worker struct {
	messageCount int
	last 		time.Time
	dur			time.Duration
	db 			*sql.DB
	flows		[][]interface{}
	consumer	func(func()*kafka.Reader,chan<- kafka.Message)
}
func (this *Worker)Buffer(msg *kafka.Message)(bool,error){
	this.messageCount += 1
	var flowMSG netflow.FlowMessage
	err := proto.Unmarshal(msg.Value,&flowMSG)
	if err == nil{
		src_ip   := net.IP(flowMSG.SrcIP).String()
		dst_ip   := net.IP(flowMSG.DstIP).String()
		next_hop := net.IP(flowMSG.NextHop).String()

		unzip := []interface{}{time.Now(),time.Now(),src_ip,dst_ip, int32(flowMSG.SrcPort), int32(flowMSG.DstPort),
			next_hop,flowMSG.Bytes, flowMSG.Packets,uint8(flowMSG.Proto),int32(flowMSG.Etype),flowMSG.SamplingRate,flowMSG.SequenceNum}

		this.flows = append(this.flows,unzip)
	}else{
		log.Printf("unmarshaling error: ",err)
	}
	return false,nil
}

func (this *Worker)Flush()bool{
	fmt.Printf("Processed %d records in the last iteration.\n", this.messageCount)
	this.messageCount = 0
	tx, err := this.db.Begin()
	if err != nil{
		log.Fatal("Could not open transaction.", err)
	}
	stmt, err := tx.Prepare(
		"INSERT INTO flow (date,time,src_ip,dst_ip,src_port,dst_port,next_hop,bytes,packet,protocol,etype,sampling_rate,sequencer_num) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)")
	if err != nil {
		log.Fatal(err)
	}
	for _, curFlow := range this.flows{
		_, err := stmt.Exec(curFlow...)
		if err != nil{
			fmt.Println(err)
		}
	}

	if err := tx.Commit();err!=nil{
		fmt.Println(err)
	}

	this.flows = make([][]interface{},0)
	return true

}


func main(){

	var(
		workersCount = 4
		ch = make(chan kafka.Message,10000)
	 	wg sync.WaitGroup
		count = 0
		FlushCount = 10000
		FlushTime = "5s"

	)
	dur, _ := time.ParseDuration(FlushTime)
	timer := time.After(dur)
	wg.Add(workersCount)
	for i:=1; i <= workersCount; i++{
		go func() {
			w := &Worker{
				last:		time.Time{},
				db:			dbConn(DBURL,USER, PASS ,DBNAME),
				consumer:	kafkaMessage,
			}
			go w.consumer(GetKafkaReader,ch)
			for {
				select {
				case <-timer:
					w.Flush()
					timer = time.After(dur)
				case msg, ok := <- ch:
					if ok {
						flush, err := w.Buffer(&msg)
						if flush{
							w.Flush()
						}
						if err != nil {
							log.Fatal("Error while porcessing: %v", err)
						}
						count++
						if count == FlushCount{
							w.Flush()
							count = 0
						}
					}
				}
			}
		}()
		defer wg.Done()
	}

	wg.Wait()



}

var kafkaMessage = func(
	reader func()*kafka.Reader,
	messageCh chan<- kafka.Message){
	r := reader()
	r.SetOffset(47)
	for {
		m, err := r.ReadMessage(context.Background())
		if err != nil{fmt.Println(err)}
		messageCh <- m
	}

}

var dbConn = func(dbURL,user, password ,dbNAME string)(*sql.DB){
	dataSourceName := fmt.Sprintf("tcp://%s:9000?username=%s&password=%s&database=%s&debug=false",
		dbURL,user,password,dbNAME)
	connect, err := sql.Open("clickhouse",dataSourceName)
	if err != nil {
		log.Fatal(err)
	}
	if err := connect.Ping(); err != nil {
		if exception, ok := err.(*clickhouse.Exception); ok {
			log.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
		} else {
			log.Println(err)
		}
		return nil
	}
	return connect
}

func GetKafkaReader()*kafka.Reader{
	return kafka.NewReader(kafka.ReaderConfig{
		Brokers: []string{"127.0.0.1:9092"},
		Topic:	  "netflow",
		Partition: 0,
		MinBytes: 10e3,
		MaxBytes: 10e6,
	})
}
  • Вопрос задан
  • 171 просмотр
Подписаться 2 Простой Комментировать
Пригласить эксперта
Ответы на вопрос 1
xEpozZ
@xEpozZ
Веб-разработчик
Я думаю, это зависит от вашей конкретной задачи
oxickaiivr5j6hey5fahjkcvsxc.png
Consumer process может быть сколько угодно, зависит от нагрузки на узел.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы