Настройка приемника elasticsearch apache-flume

Это мой первый раз здесь, так что извините, если я не пишу нормально, и извините за мой плохой английский.

Я пытаюсь настроить приемники Apache Flume и Elasticsearch. Все ок, вроде работает нормально, но при запуске агента 2 предупреждения; следующие:

2015-11-16 09:11:22,122 (lifecycleSupervisor-1-3) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchMethodError: org.elasticsearch.common.transport.InetSocketTransportAddress.<init>(Ljava/lang/String;I)V
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:143)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:77)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48)
    at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:357)
    at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
    at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
2015-11-16 09:11:22,137 (lifecycleSupervisor-1-3) [WARN - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:260)] Component SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } stopped, since it could not besuccessfully started due to missing dependencies

Моя конфигурация агента:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink ES
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = items
a1.sinks.k1.indexType = item
a1.sinks.k1.clusterName = elasticsearch
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Он запускает netcat, и все в порядке, но я боюсь этих предупреждений, я этого не понимаю.


person Lifestorm    schedule 16.11.2015    source источник
comment
Вы уверены, что данная конфигурация работает правильно? Первая трассировка журнала — это не предупреждение, а ошибка, сообщающая вам, что у ElasticSearchSink есть какая-то проблема, скорее всего, связанная с какой-то проблемой зависимости (есть метод, который не найден).   -  person frb    schedule 17.11.2015
comment
Я не заметил конкретного сообщения, выдаваемого трассировкой предупреждений, но оно подтверждает мой диагноз: Component SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } stopped, since it could not besuccessfully started due to missing dependencies   -  person frb    schedule 17.11.2015


Ответы (3)


Я нашел причину, кажется, что Apache Flume 1.6.0 и Elasticsearch 2.0 не могут правильно общаться.

Нашел хороший сток от 3-го лица, который доработал.

Вот ссылка

И это моя окончательная конфигурация,

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink ES
a1.sinks = k1
a1.sinks.k1.type = com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink
a1.sinks.k1.hostNames = 127.0.0.1:9300
a1.sinks.k1.indexName = items
a1.sinks.k1.indexType = item
a1.sinks.k1.clusterName = elasticsearch
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = com.frontier45.flume.sink.elasticsearch2.ElasticSearchDynamicSerializer
a1.sinks.k1.indexNameBuilder = com.frontier45.flume.sink.elasticsearch2.TimeBasedIndexNameBuilder
a1.sinks.k1.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Меня устраивает.

Спасибо за ответы.

P.S. да, мне пришлось перенести библиотеки.

person Lifestorm    schedule 17.11.2015

При просмотре журналов возникла проблема с отсутствующей зависимостью.

Если вы посмотрите документацию ElasticSearchSink, вы увидите следующее:

JAR-файлы elasticsearch и lucene-core, необходимые для вашей среды, должны быть помещены в каталог lib установки Apache Flume. Elasticsearch требует, чтобы основная версия JAR-файла клиента совпадала с версией JAR-файла сервера и чтобы на обоих работала одна и та же дополнительная версия JVM. SerializationExceptions появятся, если это неверно. Чтобы выбрать нужную версию, сначала определите версию elasticsearch и версию JVM, на которой работает целевой кластер. Затем выберите клиентскую библиотеку elasticsearch, соответствующую основной версии. Клиент 0.19.x может взаимодействовать с кластером 0.19.x; 0.20.x может общаться с 0.20.x, а 0.90.x может общаться с 0.90.x. После определения версии elasticsearch прочитайте файл pom.xml, чтобы определить правильную версию JAR lucene-core для использования. Агент Flume, на котором работает ElasticSearchSink, также должен соответствовать JVM, на которой работает целевой кластер, до младшей версии.

Скорее всего вы не поставили нужные джавы, или версия не та.

person frb    schedule 17.11.2015

Добавлено ниже 2 JAR-файлов только в каталоге flume/lib, и это сработало, не нужно добавлять все остальные JAR-файлы Lucene:

elasticsearch-1.7.1.jar

lucene-core-4.10.4.jar

команда для запуска канала:

bin/flume-ng agent --conf conf --conf-file conf/flume-aggregator.conf --name agent2 -Dflume.root.logger=INFO,console

не забудьте добавить ниже в flume-env.sh

export JAVA_HOME=/usr/java/default

export JAVA_OPTS="-Xms3072m -Xmx3072m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

FLUME_CLASSPATH="/usr/flume/flume1.6/apache-flume-1.6.0-bin/;/usr/flume/flume1.6/apache-flume-1.6.0-bin/lib"

Конфигурация агрегатора Flume для загрузки данных в ES: flume-aggregator.conf

agent2.sources = source1
agent2.sinks = sink1
agent2.channels = channel1

################################################
# Describe Source
################################################

# Source Avro
agent2.sources.source1.type = avro
agent2.sources.source1.bind = 0.0.0.0 
agent2.sources.source1.port = 9997

################################################
# Describe Interceptors
################################################
# an example of nginx access log regex match
# agent2.sources.source1.interceptors = interceptor1
# agent2.sources.source1.interceptors.interceptor1.type = regex_extractor
# 
# agent2.sources.source1.interceptors.interceptor1.regex = "^(\\S+) \\[(.*?)\\] \"(.*?)\" (\\S+) (\\S+)( \"(.*?)\" \"(.*?)\")?"
# 
# # agent2.sources.source1.interceptors.interceptor1.regex = ^(.*) ([a-zA-Z\\.\\@\\-\\+_%]+) ([a-zA-Z\\.\\@\\-\\+_%]+) \\[(.*)\\] \\"(POST|GET) ([A-Za-z0-9\\$\\.\\+\\@#%_\\/\\-]*)\\??(.*) (.*)\\" ([a-zA-Z0-9\\.\\/\\s\-]*) (.*) ([0-9]+) ([0-9]+) ([0-9\\.]+)
# # agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11 s12 s13
# 
# agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8
# agent2.sources.source1.interceptors.interceptor1.serializers.s1.name = clientip
# agent2.sources.source1.interceptors.interceptor1.serializers.s2.name = datetime
# agent2.sources.source1.interceptors.interceptor1.serializers.s3.name = method
# agent2.sources.source1.interceptors.interceptor1.serializers.s4.name = request
# agent2.sources.source1.interceptors.interceptor1.serializers.s5.name = response
# agent2.sources.source1.interceptors.interceptor1.serializers.s6.name = status
# agent2.sources.source1.interceptors.interceptor1.serializers.s7.name = bytes
# agent2.sources.source1.interceptors.interceptor1.serializers.s8.name = requesttime
#  

################################################
# Describe Sink
################################################

# Sink ElasticSearch
# Elasticsearch lib ---> flume/lib
# elasticsearch/config/elasticsearch.yml cluster.name clusterName. data/clustername data.
agent2.sinks.sink1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent2.sinks.sink1.hostNames = 10.20.156.16:9300,10.20.176.20:9300
agent2.sinks.sink1.indexName = pdi
agent2.sinks.sink1.indexType = pdi_metrics
agent2.sinks.sink1.clusterName = My-ES-CLUSTER
agent2.sinks.sink1.batchSize = 1000
agent2.sinks.sink1.ttl = 2
#this serializer is crucial in order to use kibana
agent2.sinks.sink1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer



################################################
# Describe Channel
################################################

# Channel Memory
agent2.channels.channel1.type = memory
agent2.channels.channel1.capacity = 10000000
agent2.channels.channel1.transactionCapacity = 1000

################################################
# Bind the source and sink to the channel
################################################

agent2.sources.source1.channels = channel1
agent2.sinks.sink1.channel = channel1
person Dean Jain    schedule 31.03.2017
comment
Спасибо, что указали точные банки. - person user99999991; 24.01.2018