hadoop mapreduce неупорядоченный кортеж как ключ карты

Основываясь на примере подсчета слов из Hadoop — The Definitive Guide, я разработал задание mapreduce для подсчета появления неупорядоченных кортежей строк. Ввод выглядит так (только крупнее):

a b
c c
d d
b a
a d
d d

Запустив mapreduce, я ожидаю, что результат будет (для этого примера):

c c 1
d d 1
a b 2
a d 1
d d 1

Это означает, что я хочу, чтобы кортежи a, b и b, a считались одинаковыми. Вопрос уже задавался здесь: Hadoop MapReduce: два значения как ключ в Mapper-Reducer и, вероятно, был решен здесь https://developer.yahoo.com/hadoop/tutorial/module5.html#keytypes.

Для больших входных файлов я получаю вывод следующим образом: первый столбец - это хэш-код соотв. ключ:

151757761 a a 62822
153322274 a b 62516
154886787 a c 62248
156451300 a d 62495
153322274 b a 62334
154902916 b b 62232
158064200 b d 62759
154886787 c a 62200
156483558 c b 124966
158080329 c c 62347
159677100 d c 125047
156451300 d a 62653
158064200 d b 62603
161290000 d d 62778

Как видно, некоторые ключи дублируются, например 153322274 для a, b и b, a. Для других, таких как c, b (и b, c) и c, d (и d, c), подсчет правильный. Примерно в два раза больше, чем у других, потому что тестовые данные отрисовываются случайным образом.

Я искал проблему в течение некоторого времени, и теперь у меня закончились идеи, почему после фазы сокращения все еще могут быть дубликаты ключей.

Ниже приведен код, который я использую:

Сначала код для моего пользовательского WritableComparable

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.math.BigInteger;

public class Pair implements WritableComparable<Pair> {

    private String first;
    private String second;

    public Pair(String first, String second) {
        this.first = first;
        this.second = second;
    }

    public Pair() {
       this("", "");
    }

    @Override
    public String toString() {
        return this.hashCode() + "\t" + first + "\t" + second;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        WritableUtils.writeString(out, first);
        WritableUtils.writeString(out, second);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        first = WritableUtils.readString(in);
        second = WritableUtils.readString(in);
    }

    @Override
    public int hashCode() {
        BigInteger bA = BigInteger.ZERO;
        BigInteger bB = BigInteger.ZERO;

        for(int i = 0; i < first.length(); i++) {
            bA = bA.add(BigInteger.valueOf(127L).pow(i+1).multiply(BigInteger.valueOf(first.codePointAt(i))));
        }

        for(int i = 0; i < second.length(); i++) {
            bB = bB.add(BigInteger.valueOf(127L).pow(i+1).multiply(BigInteger.valueOf(second.codePointAt(i))));
        }

        return bA.multiply(bB).intValue();
    }

    @Override
    public boolean equals(Object o) {
        if (o instanceof Pair) {
            Pair other = (Pair) o;

            boolean result = ( first.compareTo(other.first) == 0 && second.compareTo(other.second) == 0 )
                    || ( first.compareTo(other.second) == 0 && second.compareTo(other.first) == 0 );

            return result;
        }
        return false;
    }

    @Override
    public int compareTo(Pair other) {
        if (( first.compareTo(other.first) == 0 && second.compareTo(other.second) == 0 )
                || ( first.compareTo(other.second) == 0 && second.compareTo(other.first) == 0 ) ) {
            return 0;
        } else {
            int cmp = first.compareTo( other.first );

            if (cmp != 0) {
                return cmp;
            }

            return second.compareTo( other.second );
        }
    }
}

И остальное:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PairCount {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if (otherArgs.length < 2) {
            System.err.println("Usage: paircount <in-dir> <out-dir>");
            System.exit(2);
        }

        Job job = new Job(conf, "word count");
        job.setJarByClass(PairCount.class);

        job.setMapperClass(TokenizerMapper.class);
        job.setReducerClass(IntSumReducer.class);

        job.setMapOutputKeyClass(Pair.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Pair.class);
        job.setOutputValueClass(IntWritable.class);

        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class TokenizerMapper extends Mapper<Object, Text, Pair, IntWritable> {

        private final static IntWritable one = new IntWritable(1);

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());

            while (itr.hasMoreTokens()) {
                context.write(new Pair(itr.nextToken(), itr.nextToken()), one);
            }
        }
    }

    public static class IntSumReducer extends Reducer<Pair, IntWritable, Pair, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Pair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;

            for (IntWritable val : values) {
                sum += val.get();
            }

            result.set(sum);
            context.write( key, result);
        }
    }
}

Изменить: я добавил модульные тесты для функций hashCode() и compareTo(). Они отлично работают.

import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

public class Tests  {
    @Test
    public void testPairComparison() {
        assertTrue( 0 == new Pair("a", "a").compareTo(new Pair("a", "a")) );
        assertTrue( 0 == new Pair("a", "b").compareTo(new Pair("b", "a")) );
        assertTrue( 0 == new Pair("a", "c").compareTo(new Pair("c", "a")) );
        assertTrue( 0 == new Pair("a", "d").compareTo(new Pair("d", "a")) );

        assertTrue( 0 == new Pair("b", "b").compareTo(new Pair("b", "b")) );
        assertTrue( 0 == new Pair("b", "c").compareTo(new Pair("c", "b")) );
        assertTrue( 0 == new Pair("b", "d").compareTo(new Pair("d", "b")) );

        assertTrue( 0 == new Pair("c", "c").compareTo(new Pair("c", "c")) );
        assertTrue( 0 == new Pair("c", "d").compareTo(new Pair("d", "c")) );

        assertTrue( 0 == new Pair("d", "d").compareTo(new Pair("d", "d")) );

        assertTrue( 0 > new Pair("a", "a").compareTo(new Pair("b", "b")) );
        assertTrue( 0 > new Pair("a", "a").compareTo(new Pair("c", "b")) );
        assertTrue( 0 < new Pair("d", "d").compareTo(new Pair("c", "b")) );
        assertTrue( 0 < new Pair("c", "d").compareTo(new Pair("c", "a")) );
    }

    @Test
    public void testPairHashcode(){
        assertTrue( 0 != new Pair("a", "a").hashCode());
        assertTrue( 0 != new Pair("a", "b").hashCode());
        assertTrue( 0 != new Pair("a", "c").hashCode());
        assertTrue( 0 != new Pair("a", "d").hashCode());

        assertTrue( 0 != new Pair("b", "b").hashCode());
        assertTrue( 0 != new Pair("b", "c").hashCode());
        assertTrue( 0 != new Pair("b", "d").hashCode());

        assertTrue( 0 != new Pair("c", "c").hashCode());
        assertTrue( 0 != new Pair("c", "d").hashCode());

        assertTrue( 0 != new Pair("d", "d").hashCode());

        assertEquals( new Pair("a", "a").hashCode(), new Pair("a", "a").hashCode() );
        assertEquals( new Pair("a", "b").hashCode(), new Pair("b", "a").hashCode() );
        assertEquals( new Pair("a", "c").hashCode(), new Pair("c", "a").hashCode() );
        assertEquals( new Pair("a", "d").hashCode(), new Pair("d", "a").hashCode() );

        assertEquals( new Pair("b", "b").hashCode(), new Pair("b", "b").hashCode() );
        assertEquals( new Pair("b", "c").hashCode(), new Pair("c", "b").hashCode() );
        assertEquals( new Pair("b", "d").hashCode(), new Pair("d", "b").hashCode() );

        assertEquals( new Pair("c", "c").hashCode(), new Pair("c", "c").hashCode() );
        assertEquals( new Pair("c", "d").hashCode(), new Pair("d", "c").hashCode() );

        assertEquals( new Pair("d", "d").hashCode(), new Pair("d", "d").hashCode() );

        assertNotEquals( new Pair("a", "a").hashCode(), new Pair("b", "b").hashCode() );
        assertNotEquals( new Pair("a", "b").hashCode(), new Pair("b", "d").hashCode() );
        assertNotEquals( new Pair("a", "c").hashCode(), new Pair("d", "a").hashCode() );
        assertNotEquals( new Pair("a", "d").hashCode(), new Pair("a", "a").hashCode() );
    }
}

Но я понял, что изменение compareTo() так, чтобы оно всегда возвращало 0, приведет к тому, что каждая пара будет считаться одинаковой, что приведет к результату:

156483558 c b 1000000

в то время как изменение hashCode(), чтобы всегда возвращать 0 (для тех же входных данных, что и выше), приведет к тому же результату, что и выше, только с нулевыми ключами.

0 a a 62822
0 a b 62516
0 a c 62248
0 a d 62495
0 b a 62334
0 b b 62232
0 b d 62759
0 c a 62200
0 c b 124966
0 c c 62347
0 d c 125047
0 d a 62653
0 d b 62603
0 d d 62778

Редактировать:

Я исследовал дальше, заставив compareTo() печатать то, что сравнивается. Это показало, что некоторые ключи, такие как a,b и b,a, никогда не сравниваются друг с другом, поэтому не группируются.

Если не все ключи сравниваются друг с другом, как вообще возможна группировка (кроме использования hashCode(), чего она не делает)?

Я думаю, что есть какая-то крошечная вещь, которую я упускаю. Я рад любым идеям! Заранее большое спасибо.

с уважением


person user3365    schedule 24.03.2015    source источник
comment
Просто попробуйте с другими входными данными. Возможно, ваши входные данные повреждены.   -  person Gaurav Mishra    schedule 25.03.2015
comment
Входные данные были сгенерированы простым скриптом Python, который случайным образом выбирает две буквы из массива букв [a,b,c,d] (в данном случае) и записывает их в файл. Я не знаю, где они должны были быть повреждены.   -  person user3365    schedule 25.03.2015
comment
странно видеть, что это работает для других, а не для ab и ba.   -  person Gaurav Mishra    schedule 25.03.2015
comment
Я не уверен, но попробуйте использовать ссылку ниже для хэш-кода. давайте посмотрим:(commons. apache.org/proper/commons-lang/apidocs/org/apache/)   -  person Gaurav Mishra    schedule 25.03.2015
comment
Использование HashCodeBuilder приведет к разным хэш-кодам для a, b и b, a, а это не то, что мне нужно.   -  person user3365    schedule 25.03.2015
comment
Вы можете решить эту проблему   -  person Gaurav Mishra    schedule 26.03.2015


Ответы (3)


Учитывая первоначальные требования {a,b} =:= {b,a}, не проще ли было бы упорядочить элементы кортежа в конструкторе?

public Pair(String first, String second) {
    boolean swap = first.compareTo(second) > 0;
    this.first = swap ? second : first;
    this.second = swap ? first : second;
}

Это упростит такие методы, как compareTo и equals, а также сделает ненужной реализацию Partitioner.

person Igor Ostaptchenko    schedule 23.07.2017

Я думаю, что вижу проблему здесь. Вы не внедрили разделитель.

Когда вы говорите, что сталкиваетесь с проблемами при работе с большим набором данных, я предполагаю, что вы используете несколько редукторов. Если вы использовали один редуктор, ваш код будет работать. Но в случае нескольких редьюсеров вам нужен разделитель, чтобы сообщить фреймворку, что ab и ba по сути являются одними и теми же ключами и должны идти к одному и тому же редюсеру.

Вот поясняющая ссылка: ССЫЛКА

person Venkat    schedule 24.03.2015
comment
На сайте написано: Реализация Partitioner по умолчанию называется HashPartitioner. Он использует метод hashCode() ключевых объектов по модулю общего количества разделов, чтобы определить, в какой раздел отправить данную пару (ключ, значение). Поскольку значения hashCode() совпадают между a,b и b,a, это, вероятно, не проблема. - person user3365; 25.03.2015

Проблема в функции compareTo(). Сначала проверьте, равны ли они с точки зрения a, b равно b, a. Если это не так, сначала сравните меньшие значения пар и, если они совпадают, сравните большие значения соотв. пары. Это решает проблему.

Вот как я это реализовал сейчас:

@Override
public int compareTo(Pair other){
    int cmpFirstFirst = first.compareTo(other.first);
    int cmpSecondSecond =  second.compareTo(other.second);
    int cmpFirstSecond = first.compareTo(other.second);
    int cmpSecondFirst =  second.compareTo(other.first);

    if ( cmpFirstFirst == 0 && cmpSecondSecond == 0 || cmpFirstSecond == 0 && cmpSecondFirst == 0) {
        return 0;
    }

    String thisSmaller;
    String otherSmaller;

    String thisBigger;
    String otherBigger;

    if ( this.first.compareTo(this.second) < 0 ) {
        thisSmaller = this.first;
        thisBigger = this.second;
    } else {
        thisSmaller = this.second;
        thisBigger = this.first;
    }

    if ( other.first.compareTo(other.second) < 0 ) {
        otherSmaller = other.first;
        otherBigger = other.second;
    } else {
        otherSmaller = other.second;
        otherBigger = other.first;
    }

    int cmpThisSmallerOtherSmaller = thisSmaller.compareTo(otherSmaller);
    int cmpThisBiggerOtherBigger = thisBigger.compareTo(otherBigger);

    if (cmpThisSmallerOtherSmaller == 0) {
        return cmpThisBiggerOtherBigger;
    } else {
        return cmpThisSmallerOtherSmaller;
    }
}

Это означает, что, вопреки моему предположению, группировка вывода карты выполняется с использованием транзитивного отношения, а не перекрестного произведения ключей. Нужен стабильный порядок ключей. Это имеет смысл, как только вы это знаете и понимаете.

person user3365    schedule 27.03.2015