Создал таблицу в клике:
CREATE TABLE queue (
foo Nested (
code String,
value UInt32
),
bla UInt32)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'test',
kafka_group_name = 'group1',
kafka_format = 'CapnProto',
kafka_schema = 'bar:BarStruct',
kafka_num_consumers = 1;
Создал схему capn:
struct BarStruct
{
foo @0 :FooStruct;
bla @1 :UInt32;
}
struct FooStruct
{
code @0 :Text;
value @1 :UInt32;
}
Пытаюсь лить из java в кафку следующим образом:
public class KafkaTest {
public static Producer<Long, byte[]> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "testClient");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
return new KafkaProducer<>(props);
}
public static void main(String[] args) throws InterruptedException {
org.capnproto.MessageBuilder message = new org.capnproto.MessageBuilder();
Bar.BarStruct.Builder barStruct = message.initRoot(Bar.BarStruct.factory);
barStruct.setBla(32);
Bar.FooStruct.Builder fooBuilder = barStruct.initFoo();
fooBuilder.setCode("Blabla");
fooBuilder.setValue(42);
final Producer<Long, byte[]> producer = createProducer();
while (true) {
for (ByteBuffer buffer : message.getSegmentsForOutput()) {
final byte[] bytes = buffer.array();
System.out.println(Arrays.toString(bytes));
ProducerRecord<Long, byte[]> producerRecord = new ProducerRecord<>("test", bytes);
producer.send(producerRecord);
}
TimeUnit.SECONDS.sleep(1);
}
}
}
В итоге результат селекта примерно такой:
host :) select * from queue
SELECT *
FROM queue5
Received exception from server (version 19.1.6):
Code: 33. DB::Exception: Received from localhost:9090, ::1. DB::Exception: Cannot read all data. Bytes read: 8192. Bytes expected: 524304.: (Input format doesn't allow to skip errors): (at row 1)
.
0 rows in set. Elapsed: 0.504 sec.
host :) select * from queue
SELECT *
FROM queue
Ok.