Flume: десериализатор событий Avro для эластичного поиска

Я хочу взять запись, созданную десериализатором AVRO, и отправить ее в ElasticSearch. Я понимаю, что мне нужно написать собственный код, чтобы сделать это.

Используя параметр LITERAL, у меня есть схема JSON, которая является первым шагом в использовании GenericRecord. Однако, просматривая AVRO Java API, я не вижу возможности использовать GenericRecord для одной записи. Во всех примерах используется DataFileReader.

Короче говоря, я не могу получить поля из события Flume.

Кто-нибудь делал это раньше? ТИА.


person Ed Software Architect    schedule 11.06.2014    source источник
comment
Я разобрался и выложил решение. Большое спасибо. Это был мой первый пост, но я много читал сайт.   -  person Ed Software Architect    schedule 12.06.2014


Ответы (1)


Я смог понять это. Я сделал следующее:

// Get the schema
String strSchema = event.getHeader("flume.avro.schema.literal");
// Get the body
byte[] body = event.getBody();

// Create the avro schema
Schema schema = Schema.Parser.parse(strSchema);

// Get the decoder to use to get the "record" from the event stream in object form
BinaryDecoder decoder = DecoderFactory.binaryDecoder(body, null); 

// Get the datum reader
GenericDatumReader reader = new GenericDatumReader(schema);

// Get the Avro record in object form
GenericRecord record = reader.read(null, decoder);

// Now you can iterate over the fields
for (Schema.Field field : schema.getFields()) {
   Object value = record.get(field.name());

   // Code to add field to JSON to send to ElasticSearch not listed
   // ...

} // for (Schema.Field field : schema.getFields()) {

Это хорошо работает.

person Ed Software Architect    schedule 12.06.2014