@Fenderas

Как исправить ошибки при импорте данных из MySQL в Elasticsearch?

Elasticsearch Distr Details
===========================
Installed Packages
Name : elasticsearch
Arch : noarch
Version : 2.4.0
Release : 1
Size : 29 M
Repo : installed
From repo : elasticsearch-2.x

Logstash Distr Details
======================
Installed Packages
Name : logstash
Arch : noarch
Epoch : 1
Version : 2.4.0
Release : 1
Size : 137 M
Repo : installed
From repo : logstash-2.4

Добрый день. Есть небольшая проблема в работе связки logstash + elasticsearch.
На данный момент использую эту связку для формирования аналитики для HelpDesk систему OTRS.

Logstash использует JDBC плагин для получения данных напрямую из базы данных MySql. Далее эти данные немного обрабатываются и отсылаются в c после чего визуализируем информацию через Kibana.
Запрос и правила отправки в elasticsearch можно найти в конфигурационном файле имеет - otrs-statistics.conf.
Все работало замечательно пока я не решил добавить вторую сущность в аналитику и укладывать данные в elasticsearch.

Я добавил еще один конфигурационный файл - otrs-statistics_actions.conf, который делает немного другую выборку в базе и отправляет данные в другой index.
По отдельности оба файла работают замечательно, но когда я отправляю оба файла в logstash начинается магия, разобраться в которой я не могу.

Вот вывод запроса к индексу до подключения второго файла (otrs-statistics.conf), работал только otrs-statistics_actions.conf
curl -get http://a5a5a5a5a5:9200/logstash-otrs_tickets_actions-2016.09/_search?size=1
{"took":1,"timed_out":false,"_shards":{"total":1,"successful":1,"failed":0},"hits":{"total":608,"max_score":1.0,"hits":[{"_index":"logstash-otrs_tickets_actions-2016.09","_type":"logs","_id":"123282","_score":1.0,"_source":{"ticket_id":4342,"ticket_number":"1111111","ticket_status":"closed successful","ticket_type":"Hourly_Rate","agent_name":"Ivan","ticket_action_id":123282,"ticket_action_spent_time":3.0,"ticket_action_create_time":"2016-09-01T03:18:02.000000Z","ticket_customer_id":"00-000666","customer_company_name":"Bari","customer_user_name":"Bari","customer_user_email":"a5@a5a5.a5","@version":"1","@timestamp":"2016-09-01T03:18:02.000Z"}}


Вот вывод запроса к индексу с двумя файлами
curl -get http://a5a5a5a5a5:9200/logstash-otrs_tickets_actions-2016.09/_search?size=1
{"took":1,"timed_out":false,"_shards":{"total":1,"successful":1,"failed":0},"hits":{"total":608,"max_score":1.0,"hits":[{"_index":"logstash-otrs_tickets_actions-2016.09","_type":"logs","_id":"123282","_score":1.0,"_source":{"ticket_id":4342,"ticket_number":"1111111","ticket_status":"closed successful","ticket_type":"Hourly_Rate","agent_name":"Ivan","ticket_action_id":123282,"ticket_action_spent_time":3.0,"ticket_action_create_time":"2016-09-01T03:18:02.000000Z","ticket_customer_id":"00-000666","customer_company_name":"Bari","customer_user_name":"Bari","customer_user_email":"a5@a5a5.a5","@version":"1","@timestamp":"2016-09-01T03:18:02.000Z","ticket_time_registration":"not registred","otrs_link":"https://a5a5a5a5a5/otrs/index.pl?Action=AgentTicketZoom;TicketID=4342"}}]}}


Почему-то начинают добавляться поля из другого запроса, otrs_link и ticket_time_registration
Подскажите в какую сторону можно капнуть, чтобы поправить это чудо. Спасибо.

Содержание конфига logstash - otrs-statistics_actions.conf
input {
  jdbc {
    jdbc_driver_library => "/tmp/mysql-connector-java-5.1.36/mysql-connector-java-5.1.36-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"

    jdbc_connection_string => "jdbc:mysql://blabla:3306/otrs"

    jdbc_user => "blabla"
    jdbc_password => "blabla"

    schedule => "*/3 * * * *"

    last_run_metadata_path => "/tmp/logstash/.logstash_jdbc_last_run"

    statement => "
					запрос
                "
  }
}

filter {
        date {
                match => [ "ticket_action_create_time", "yyyy-MM-dd HH:mm:ss Z","ISO8601" ]
                target => "@timestamp"
        }

        if [ticket_action_spent_time] {
                ruby {
                        code => "event['ticket_action_spent_time'].gsub!(/^%%/,'')"
                }
                ruby {
                        code => "event['ticket_action_spent_time'].gsub!(/%%.*/,'')"
                }
        }

        mutate {
            convert => [ "ticket_action_spent_time", "float" ]
        }
}

output {
    elasticsearch {
                hosts => "blabla:9200"
                document_id => "%{ticket_action_id}"
                index => "logstash-otrs_tickets_actions-%{+YYYY.MM}"
        }

        #stdout {
        #       codec => rubydebug
        #}
}


Содержание конфига logstash - otrs-statistics.conf
input {
  jdbc {
    jdbc_driver_library => "/tmp/mysql-connector-java-5.1.36/mysql-connector-java-5.1.36-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"

    jdbc_connection_string => "jdbc:mysql://blabla:3306/otrs"

    jdbc_user => "blabla"
    jdbc_password => "blabla"

    schedule => "*/3 * * * *"

    last_run_metadata_path => "/tmp/logstash/.logstash_jdbc_last_run"

    statement => "
		запрос
    "
  }
}

filter {
        date {
                match => ["ticket_create_time","yyyy-MM-dd HH:mm:ss Z","ISO8601"]
                target => "@timestamp"
        }

        # Оцениваем час работы каждеого сотрудника
        if [agent_name] in ["blabla", "blabla", "blabla"] {
                mutate {
                        add_field => { "cost_hour" => 10}
                }
        }
        else if [agent_name] in ["blabla", "blabla"] {
                mutate {
                        add_field => { "cost_hour" => 20}
                }
        }

        else if [agent_name] == "blabla" {
                mutate {
                        add_field => { "cost_hour" => 30 }
                }
        }

    if [cost_hour] {
                mutate {
                        convert => [ "cost_hour", "float" ]
                }
    }

        # Проверяем было ли зафиксированно время по заявке
        mutate {
                add_field => {"ticket_time_registration" => ""}
        }
        if [time_accounting_time_unit] {
                mutate {
                        convert => [ "time_accounting_time_unit", "float" ]
                        update => { "ticket_time_registration" => "registred" }
                }
        }
        else {
                mutate {
                        update => { "ticket_time_registration" => "not registred" }
                }
        }

        # Преобразуем поле в часы из минут
    if [ticket_transportation_time] {
                mutate {
                        convert => [ "ticket_transportation_time", "float" ]
                }
                ruby {
                        code => "event['ticket_transportation_time']/=60.0"
                }
    }

        # В случае если заявка открыта то убиваем время закрытия
    if ([ticket_status] in ["open", "new"]) {
                mutate {
                        remove_field => [ "%{ticket_close_time}" ]
                }
    }

        # Украшательства
    if [agent_name] {
                mutate {
                        gsub => [agent_name, "IT Support", "Default Agent"]
                }
    }

        if [ticket_service] {
                mutate {
                        gsub => [ticket_service,"blabla::","IT "]
                        gsub => [ticket_service,"blabla::","ERP "]
                }
    }

        # Добавляем поле для быстрого перехода в заявку из Kibana
        mutate {
                add_field => {"otrs_link" => "https://blabla/otrs/index.pl?Action=AgentTicketZoom;TicketID=%{ticket_id}"}
        }
}

output {
    elasticsearch {
                hosts => "blabla:9200"
                document_id => "%{ticket_number}"
                index => "logstash-otrs_tickets-%{+YYYY.MM}"
        }

        #stdout {
    #   codec => rubydebug
    #}
}
  • Вопрос задан
  • 453 просмотра
Пригласить эксперта
Ответы на вопрос 1
MaxDukov
@MaxDukov
впишусь в проект как SRE/DevOps.
у логсташа один поток инпутов-фильтров-аутпутов.
т.е. если вы одновременно загрузили 2 конфига - он вам и начал и читать из обоих селектов, и писать в оба индекса.
используйте
type => "sometype"в input, а потом проверку
if [type] == "sometype" {
  ...
}

в фильтрах и блоке output.
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы