Flink Как использовать - FasterXML / jackson-dataformats-text - Для преобразования CSV в POJO

Я получаю CSV о своем классе, и мне нужно получить значения для создания POJO. Мне не нужно открывать file.csv в каталог, элементы, разделенные запятыми, передаются Flink в EventDeserializationSchema, а этот используется в классе событий для обработки каждого отдельного события.

Вот пример:

IN: 'Adam', 'Smith', 66, .... '12: 01: 00.000' - ›OUT: pojo

Для этого я использую: https://github.com/FasterXML/jackson-dataformats-text/tree/master/csv

Это мой класс событий, который должен помочь, на самом деле в данный момент ничего не делает.

import java.io.Serializable;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;

public class Event implements Serializable {

    CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();
    
    CsvSchema schema = CsvSchema.emptySchema().withHeader();

    CsvSchema bootstrapSchema = CsvSchema.emptySchema().withHeader();
    ObjectMapper mapper = new CsvMapper();
    mapper.readerFor(Pojo.class).with(bootstrapSchema).readValue(??);
    
    return Pojo
}

Это мой класс Pojo:

public class Pojo {
    
        public String firstName;
        public String lastName;
        private int age;
        public String time;

        public Pojo(String firstName, String lastName, int age, String time) {
            this.firstName = firstName;
            this.lastName = lastName;
            this.age = age;
            this.time =time;
            
        }

}

Любая помощь, чтобы заставить класс вернуть Pojo, была бы очень признательна.

Это пример с JSON: https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/игровыеплощадки/ops/clickcount/records/ClickEventDeserializationSchema.java

ClickEvenClass https://github.com/apache/flink/blob/9dd04a25bd300a725486ff08560920f548f3b1d9/flink-end-to-end-tests/flink-streaming-kafka-test-base/sravac/main/mainapache/flink/streaming/kafka/test/base/KafkaEvent.java#L27


person SimAzz    schedule 29.10.2020    source источник


Ответы (1)


Чтобы это работало, вам нужно иметь конструктор по умолчанию и средства получения / установки для полей. Я не понимаю, что вы собираетесь делать в Event и почему есть еще Pojo, но если вы хотите десериализовать входящую строку в Event, должно сработать что-то вроде этого:

  1. Event Класс Pojo:
public class Event implements Serializable {
    public String firstName;
    public String lastName;
    private int age;
    public String time;

    public Event() {
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }
}
  1. EventDeserializationSchema из этого вопроса с реализованным deserialize()
public class EventDeserializationSchema implements DeserializationSchema<Event> {

    private static final long serialVersionUID = 1L;

    private static final CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();

    private static final ObjectMapper mapper = new CsvMapper();

    @Override
    public Event deserialize(byte[] message) throws IOException {
        return mapper.readerFor(Event.class).with(schema).readValue(message);
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {
        return TypeInformation.of(Event.class);
    }
}
person Mikalai Lushchytski    schedule 31.10.2020
comment
Большое спасибо! Это тоже было очень полезно !! - person SimAzz; 31.10.2020