@konchober

Как в PHP и rdkafka сохранять offset не во время чтения, а только после обработки?

librdkafka-0.11.1-1.el7.x86_64
php-pecl-rdkafka-3.0.5-1.el7.remi.7.1.x86_64
kafka_2.11-0.11.0.2
PHP 7.1.12

Не пойму как заставить сохранять offset не в момент чтения, а только после обработки записи.
Согласно документации делаю $topicConf->set("auto.commit.enable", 'false') и $topic->offsetStore($msg->partition, ($msg->offset+1) );
Но в файл всё-равно пишется offset в момент чтения ($topic->consume(0, 1000);). Пробовал много разных вариантов конфигурации, вот последний.

<?php

$conf = new RdKafka\Conf();
$conf->set('group.id', 'my-alarm-group');

$rk = new RdKafka\Consumer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("kafka");


$topicConf = new RdKafka\TopicConf();
$topicConf->set("auto.commit.enable", 'false');
##$topicConf->set("enable.auto.offset.store", "true");
#$topicConf->set("auto.commit.interval.ms", 0);
#$topicConf->set("offset.store.sync.interval.ms", -1);
#$topicConf->set("offset.store.method", 'broker');
#$topicConf->set("request.required.acks", 1);

$topicConf->set('offset.store.method', 'file');
#$topicConf->set('offset.store.path', sys_get_temp_dir());

$topicConf->set("auto.offset.reset", 'smallest');

$topic = $rk->newTopic("status", $topicConf);

$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);


while(true) {

    $msg = $topic->consume(0, 1000);
//	var_dump($msg); die();

	if($msg->err) {
		var_dump($msg->errstr() );
		break;
	}

	var_dump($msg->offset);

	var_dump('commit ' . $msg->offset . ' ' . ($msg->offset+1) );
	$topic->offsetStore($msg->partition, ($msg->offset+1) );
}
  • Вопрос задан
  • 796 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы