Отдельный шаблон Hadoop MapReduce с пользовательским Writable создает дубликаты ключей

Я пытаюсь реализовать отдельный шаблон:

map(key, record):
  emit record,null
reduce(key, records):
  emit key

Мой ключ сложный, пользовательский Writable. Если я испускаю в уменьшении ключ и его хэш-код:

context.write(key, new IntWtitable(key.hashCode());

Я получаю следующий вывод:

key1 -1808937256
key2 -768063202
key3 906064410
key2 -768063202
key3 906064410

Теоретически вывод должен содержать только key1, key2 и key3, так как я использую HashPartitioner: ключи с одинаковым хеш-кодом объединяются в один раздел. Здесь явно не тот случай.

Если я преобразовываю свой сложный Writable в объект Text (и соответствующим образом адаптирую классы Mapper/Reducer) и испускаю в Mapper:

 context.write(new Text(key.toString()), NullWritable.get());

... результат такой, как ожидалось:

key1 1013632023
key2 762485389
key3 -1193948769

Хорошо, а вот минимальный рабочий пример, иллюстрирующий поведение.

Вход:

A A A A A
B B B B B
C C C C C
A A A A A
B B B B B

Задание MapReduce:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class DistinctPattern extends Configured implements Tool {
public static class DistinctMapper extends Mapper<Object, Text, ComplexObject, NullWritable> {


    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        ComplexObject o = new ComplexObject(value.toString());
        context.write(o, NullWritable.get());
    }
}

public static class DistinctReducer extends Reducer<ComplexObject, NullWritable, ComplexObject, IntWritable> {


    public void reduce(ComplexObject key, Iterable<NullWritable> values, Context context)
            throws IOException, InterruptedException {

        context.write(key, new IntWritable(key.hashCode()));
    }
}

public static class MyArrayWritable extends ArrayWritable {

    public MyArrayWritable(Writable[] values) {
        super(DatumObject.class, values);
    }

    public MyArrayWritable() {
        super(DatumObject.class);
    }

    @Override
    public String toString() {
        return Arrays.toString(get());
    }

}

public static class DatumObject implements Writable {
    private String datum;

    public DatumObject() {}

    public DatumObject(String d) {
        datum = d;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        datum = in.readUTF();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(datum);    
    }

    @Override
    public String toString() {
        return datum;
    }

    @Override
    public int hashCode() {
        return 31 * datum.hashCode();
    }

}

public static class ComplexObject implements WritableComparable<ComplexObject> {
    private List<DatumObject> data = new ArrayList<>();

    public ComplexObject() {}

    public ComplexObject(String d) {
        String[] elements = d.split(" ");
        for(int i = 0; i < elements.length; i++)
            data.add(new DatumObject(elements[i]));
    }

    public int size() {
        return data.size();
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        data.clear();
        MyArrayWritable m = new MyArrayWritable();
        m.readFields(in);
        Writable[] w = m.get();
        for(int i = 0; i < w.length; i++)
            data.add((DatumObject) w[i]);

    }

    @Override
    public void write(DataOutput out) throws IOException {
        MyArrayWritable m = new MyArrayWritable(data.toArray(new DatumObject[data.size()]));
        m.write(out);
    }

    @Override
    public int compareTo(ComplexObject o) {
        if(this.equals(o))
            return 0;

        if(o.size() < this.size())
            return -1;

        return 1;
    }

    @Override
    public boolean equals(Object obj) {
        if(!(obj instanceof ComplexObject))
            return false;

        ComplexObject other = (ComplexObject) obj;
        return other.data.equals(data);
    }

    @Override
    public int hashCode() {
        return 31 * data.hashCode();
    }

    @Override
    public String toString() {
        StringBuilder s= new StringBuilder();
        data.forEach( entry -> {
            s.append(entry); 
            s.append(" ");
        });

        return s.toString();
    }

}

@Override
public int run(String[] args) throws Exception {
    Job job = Job.getInstance();
    job.setJar("distinct.jar");
    job.setJarByClass(DistinctPattern.class);
    job.setMapperClass(DistinctMapper.class);
    job.setReducerClass(DistinctReducer.class);
    job.setMapOutputKeyClass(ComplexObject.class);
    job.setMapOutputValueClass(NullWritable.class);
    job.setOutputKeyClass(ComplexObject.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args) throws Exception {       
    int exitCode = ToolRunner.run(new DistinctPattern(), args);
    System.exit(exitCode);
}
}

Ожидаемый результат:

A A A A A       368623362
B B B B B       1285710467
C C C C C       -2092169724

Фактический результат:

A A A A A       368623362
B B B B B       1285710467
C C C C C       -2092169724
A A A A A       368623362
B B B B B       1285710467

Что мне не хватает?

PS: Хадуп 2.7.3


person mikmak    schedule 25.04.2017    source источник


Ответы (1)


Хорошо, нашел ошибку (ошибки) в моем коде. Во-первых, в минимальном рабочем примере отсутствует реализация метода equals в классе DatumObject:

@Override
public boolean equals(Object obj) {
    if(obj == null)
        return false;

    if(!(obj instanceof DatumObject))
        return false;

    DatumObject other = (DatumObject) obj;
        return other.datum.equals(datum);
}

Во-вторых, один аспект, который я не смог воспроизвести в минимальном рабочем примере, но который присутствует в моем реальном коде, заключается в том, что не все мои key классы реализовывали интерфейс WritableComparable. В результате я подозреваю, что на этапе тасования ключи не сортируются должным образом. Как только методы compareTo были правильно реализованы во всех классах, составляющих мое значение key (см. диаграмму классов здесь), отличный шаблон работал, как ожидалось.

person mikmak    schedule 30.04.2017