librdkafka-0.11.1-1.el7.x86_64
php-pecl-rdkafka-3.0.5-1.el7.remi.7.1.x86_64
kafka_2.11-0.11.0.2
PHP 7.1.12
Не пойму как заставить сохранять offset не в момент чтения, а только после обработки записи.
Согласно документации делаю $topicConf->set("auto.commit.enable", 'false') и $topic->offsetStore($msg->partition, ($msg->offset+1) );
Но в файл всё-равно пишется offset в момент чтения ($topic->consume(0, 1000);). Пробовал много разных вариантов конфигурации, вот последний.
<?php
$conf = new RdKafka\Conf();
$conf->set('group.id', 'my-alarm-group');
$rk = new RdKafka\Consumer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("kafka");
$topicConf = new RdKafka\TopicConf();
$topicConf->set("auto.commit.enable", 'false');
##$topicConf->set("enable.auto.offset.store", "true");
#$topicConf->set("auto.commit.interval.ms", 0);
#$topicConf->set("offset.store.sync.interval.ms", -1);
#$topicConf->set("offset.store.method", 'broker');
#$topicConf->set("request.required.acks", 1);
$topicConf->set('offset.store.method', 'file');
#$topicConf->set('offset.store.path', sys_get_temp_dir());
$topicConf->set("auto.offset.reset", 'smallest');
$topic = $rk->newTopic("status", $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while(true) {
$msg = $topic->consume(0, 1000);
// var_dump($msg); die();
if($msg->err) {
var_dump($msg->errstr() );
break;
}
var_dump($msg->offset);
var_dump('commit ' . $msg->offset . ' ' . ($msg->offset+1) );
$topic->offsetStore($msg->partition, ($msg->offset+1) );
}