zipWithIndex на Apache Flink

Я хотел бы присвоить каждой строке моего ввода id - это должно быть число от 0 до N - 1, где N - количество строк во вводе.

Грубо говоря, я хотел бы иметь возможность делать что-то вроде следующего:

val data = sc.textFile(textFilePath, numPartitions)
val rdd = data.map(line => process(line))
val rddMatrixLike = rdd.zipWithIndex.map { case (v, idx) => someStuffWithIndex(idx, v) }

Но в Apache Flink. Является ли это возможным?


person Alexey Grigorev    schedule 02.06.2015    source источник
comment
Это интересный вопрос. Я попробую придумать реализацию.   -  person Robert Metzger    schedule 02.06.2015


Ответы (2)


Теперь это часть версии 0.10-SNAPSHOT Apache Flink. Примеры для zipWithIndex(in) и zipWithUniqueId(in) доступны в официальной документации Flink < / а>.

person peterschrott    schedule 19.07.2015

Вот простая реализация функции:

public class ZipWithIndex {

public static void main(String[] args) throws Exception {

    ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment();

    DataSet<String> in = ee.readTextFile("/home/robert/flink-workdir/debug/input");

    // count elements in each partition
    DataSet<Tuple2<Integer, Long>> counts = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Integer, Long>>() {
        @Override
        public void mapPartition(Iterable<String> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
            long cnt = 0;
            for (String v : values) {
                cnt++;
            }
            out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), cnt));
        }
    });

    DataSet<Tuple2<Long, String>> result = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Long, String>>() {
        long start = 0;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            List<Tuple2<Integer, Long>> offsets = getRuntimeContext().getBroadcastVariable("counts");
            Collections.sort(offsets, new Comparator<Tuple2<Integer, Long>>() {
                @Override
                public int compare(Tuple2<Integer, Long> o1, Tuple2<Integer, Long> o2) {
                    return ZipWithIndex.compare(o1.f0, o2.f0);
                }
            });
            for(int i = 0; i < getRuntimeContext().getIndexOfThisSubtask(); i++) {
                start += offsets.get(i).f1;
            }
        }

        @Override
        public void mapPartition(Iterable<String> values, Collector<Tuple2<Long, String>> out) throws Exception {
            for(String v: values) {
                out.collect(new Tuple2<Long, String>(start++, v));
            }
        }
    }).withBroadcastSet(counts, "counts");
    result.print();

}

public static int compare(int x, int y) {
    return (x < y) ? -1 : ((x == y) ? 0 : 1);
}
}

Вот как это работает: я использую первую mapPartition() операцию, чтобы просмотреть все элементы в разделах, чтобы подсчитать, сколько элементов там. Мне нужно знать количество элементов в каждом разделе, чтобы правильно установить смещения при назначении идентификаторов элементам. Результатом первого mapPartition является DataSet, содержащий сопоставления. Я транслирую этот DataSet всем вторым mapPartition() операторам, которые будут назначать идентификаторы элементам из ввода. В open() методе второго mapPartition() я вычисляю смещение для каждого раздела.

Я, вероятно, собираюсь внести код в Flink (после обсуждения с другими коммиттерами).

person Robert Metzger    schedule 02.06.2015
comment
Спасибо, Роберт! Не могли бы вы также в двух словах объяснить, как это работает? Например. почему мы используем getRuntimeContext().getIndexOfThisSubtask() и почему может помочь счетчик широковещательной рассылки для каждого раздела? - person Alexey Grigorev; 02.06.2015
comment
Есть ли проблема с Jira для zipWithIndex? Я хотел бы следить за ним, чтобы знать, будет ли / когда он станет частью Flink - person Alexey Grigorev; 11.06.2015
comment
Может быть, добавив zipWithIndex в базу кода Flink, вы сможете обновить ответ? - person Alexey Grigorev; 29.06.2015