zilevsky
@zilevsky

Как настроить Logstash?

Добрый день
Сейчас у меня такая схема: postgres -> elasticsearch -> kibana
Необходимо синхронизировать данные с postgres в elasticsearch.

Ранее использовал библиотеку pgsync. но она требует перенастройки PG сервера. Требует переключения wal_level в logical
Сисадмины против.
Поэтому теперь мучаю logstash
Использую его как отдельный контейнер.
И вроде после запуска всё в логах норм, но синхронизация не идёт.

То что я передаю в env -> logstash.yml
http.host="0.0.0.0"
xpack.monitoring.elasticsearch.hosts=["http://elasticserver:9200"]
xpack.monitoring.enabled=true
xpack.monitoring.elasticsearch.username="logstash_system"
xpack.monitoring.elasticsearch.password="logstash_pass"
LS_JAVA_OPTS=-Xmx512m -Xms512m
config.support_escapes=true


То что получаю внутри контейнера
~/config$ cat logstash.yml

config.support_escapes: true
http.host: 0.0.0.0
xpack.monitoring.elasticsearch.hosts:
- http://elasticserver:9200
xpack.monitoring.elasticsearch.password: logstash_pass
xpack.monitoring.elasticsearch.username: logstash_system
xpack.monitoring.enabled: true


Что в пайплайне
pipeline$ cat logstash.conf

input {
    jdbc {
         jdbc_connection_string => "jdbc:postgresql://server:5432/database"
         jdbc_user => "db_user"
         jdbc_password => "db_pass"
         jdbc_driver_class => "org.postgresql.Driver"
         statement => "SELECT * from public.pdf_store"
     }
}

output {
  elasticsearch {
        hosts => ["http://elasticserver:9200"]
        index => "test_index"
        document_id => "%{id}"
        user => "elastic"
        password => "elastic_pass"
    }
}


В логах после сборки и старта контейнера
[INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sources"=>["/usr/share/logstash/pipeline/logstash.conf"], :thread=>"#<Thread:0x2cee4126 run>"}
[INFO ][logstash.javapipeline    ][.monitoring-logstash] Starting pipeline {:pipeline_id=>".monitoring-logstash", "pipeline.workers"=>1, "pipeline.batch.size"=>2, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>2, "pipeline.sources"=>["monitoring pipeline"], :thread=>"#<Thread:0x69b2349e run>"}
[INFO ][org.logstash.beats.Server][main][0710cad...7229d04aa] Starting server on port: 5044

[INFO ][org.logstash.beats.BeatsHandler][main][0710cad67...ca8cf87229d04aa] [local: <local_container_ip>:5044, remote: <outer_pg_server_ip>:52946] Handling exception: io.netty.handler.codec.DecoderException: org.logstash.beats.InvalidFrameProtocolException: Invalid version of beats protocol: 71 (caused by: org.logstash.beats.InvalidFrameProtocolException: Invalid version of beats protocol: 71)
[2022-12-15T18:10:44,963][WARN ][io.netty.channel.DefaultChannelPipeline][main][0710cad...a8cf87229d04aa] An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.DecoderException: org.logstash.beats.InvalidFrameProtocolException: Invalid version of beats protocol: 71
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:477) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.65.Final.jar:4.1.65.Final]
        at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: org.logstash.beats.InvalidFrameProtocolException: Invalid version of beats protocol: 71
        at org.logstash.beats.Protocol.version(Protocol.java:22) ~[logstash-input-beats-6.4.1.jar:?]
        at org.logstash.beats.BeatsParser.decode(BeatsParser.java:62) ~[logstash-input-beats-6.4.1.jar:?]
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
        ... 9 more


ВОПРОС
Почему не работает синхронизация?
  • Вопрос задан
  • 504 просмотра
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы