Добрый день
Сейчас у меня такая схема: postgres -> elasticsearch -> kibana
Необходимо синхронизировать данные с postgres в elasticsearch.
Ранее использовал библиотеку pgsync. но она требует перенастройки PG сервера. Требует переключения wal_level в logical
Сисадмины против.
Поэтому теперь мучаю logstash
Использую его как отдельный контейнер.
И вроде после запуска всё в логах норм, но синхронизация не идёт.
То что я передаю в env -> logstash.yml
http.host="0.0.0.0"
xpack.monitoring.elasticsearch.hosts=["http://elasticserver:9200"]
xpack.monitoring.enabled=true
xpack.monitoring.elasticsearch.username="logstash_system"
xpack.monitoring.elasticsearch.password="logstash_pass"
LS_JAVA_OPTS=-Xmx512m -Xms512m
config.support_escapes=true
То что получаю внутри контейнера
~/config$ cat logstash.yml
config.support_escapes: true
http.host: 0.0.0.0
xpack.monitoring.elasticsearch.hosts:
- http://elasticserver:9200
xpack.monitoring.elasticsearch.password: logstash_pass
xpack.monitoring.elasticsearch.username: logstash_system
xpack.monitoring.enabled: true
Что в пайплайне
pipeline$ cat logstash.conf
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://server:5432/database"
jdbc_user => "db_user"
jdbc_password => "db_pass"
jdbc_driver_class => "org.postgresql.Driver"
statement => "SELECT * from public.pdf_store"
}
}
output {
elasticsearch {
hosts => ["http://elasticserver:9200"]
index => "test_index"
document_id => "%{id}"
user => "elastic"
password => "elastic_pass"
}
}
В логах после сборки и старта контейнера
[INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sources"=>["/usr/share/logstash/pipeline/logstash.conf"], :thread=>"#<Thread:0x2cee4126 run>"}
[INFO ][logstash.javapipeline ][.monitoring-logstash] Starting pipeline {:pipeline_id=>".monitoring-logstash", "pipeline.workers"=>1, "pipeline.batch.size"=>2, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>2, "pipeline.sources"=>["monitoring pipeline"], :thread=>"#<Thread:0x69b2349e run>"}
[INFO ][org.logstash.beats.Server][main][0710cad...7229d04aa] Starting server on port: 5044
[INFO ][org.logstash.beats.BeatsHandler][main][0710cad67...ca8cf87229d04aa] [local: <local_container_ip>:5044, remote: <outer_pg_server_ip>:52946] Handling exception: io.netty.handler.codec.DecoderException: org.logstash.beats.InvalidFrameProtocolException: Invalid version of beats protocol: 71 (caused by: org.logstash.beats.InvalidFrameProtocolException: Invalid version of beats protocol: 71)
[2022-12-15T18:10:44,963][WARN ][io.netty.channel.DefaultChannelPipeline][main][0710cad...a8cf87229d04aa] An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.DecoderException: org.logstash.beats.InvalidFrameProtocolException: Invalid version of beats protocol: 71
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:477) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.65.Final.jar:4.1.65.Final]
at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: org.logstash.beats.InvalidFrameProtocolException: Invalid version of beats protocol: 71
at org.logstash.beats.Protocol.version(Protocol.java:22) ~[logstash-input-beats-6.4.1.jar:?]
at org.logstash.beats.BeatsParser.decode(BeatsParser.java:62) ~[logstash-input-beats-6.4.1.jar:?]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
... 9 more
ВОПРОС
Почему не работает синхронизация?