Я хочу использовать HashSet, который существует/работает с одним файлом во время его сопоставления, а затем сбрасывается/воссоздается при сопоставлении следующего файла. Я изменил TextInputFormat, чтобы переопределить isSplitable и вернуть false, чтобы файл не разбивался и обрабатывался Mappers как единое целое. Можно ли сделать что-то подобное? Или есть другой способ сделать меньше записей в таблицу Accumulo?
Позвольте мне начать с того, что я не верю, что мне нужна глобальная переменная. Я просто хочу обеспечить уникальность и, таким образом, записать меньше мутаций в свою таблицу Accumulo.
Мой проект состоит в том, чтобы преобразовать функциональность файла Index.java из примера сегмента из линейной клиентской программы accumulo в программу, использующую функциональность mapreduce, при этом создавая ту же таблицу в Accumulo. Это должно быть mapreduce, потому что это модное слово, и, по сути, оно будет работать быстрее, чем линейная программа с терабайтами данных.
Вот код индекса для справки: http://grepcode.com/file/repo1.maven.org/maven2/org.apache.accumulo/examples-simple/1.4.0/org/apache/accumulo/examples/simple/shard/Index.java
Эта программа использует BatchWriter для записи мутаций в Accumulo и делает это для каждого файла. Чтобы убедиться, что он не записывает больше мутаций, чем необходимо, и для обеспечения уникальности (хотя я верю, что Accumulo в конечном итоге объединяет одни и те же ключи посредством сжатия), Index.java имеет HashSet, который используется для определения того, встречалось ли слово ранее. Все это относительно просто для понимания.
Переход к заданию mapreduce, связанному только с картой, является более сложным.
Это была моя попытка сопоставления, которая, похоже, работает из частичного вывода, который я видел в таблице Accumulo, но работает очень-очень медленно по сравнению с линейной программой Index.java.
public static class MapClass extends Mapper<LongWritable,Text,Text,Mutation> {
private HashSet<String> tokensSeen = new HashSet<String>();
@Override
public void map(LongWritable key, Text value, Context output) throws IOException {
FileSplit fileSplit = (FileSplit)output.getInputSplit();
System.out.println("FilePath " + fileSplit.getPath().toString());
String filePath = fileSplit.getPath().toString();
filePath = filePath.replace("unprocessed", "processed");
String[] words = value.toString().split("\\W+");
for (String word : words) {
Mutation mutation = new Mutation(genPartition(filePath.hashCode() % 10));
word = word.toLowerCase();
if(!tokensSeen.contains(word)) {
tokensSeen.add(word);
mutation.put(new Text(word), new Text(filePath), new Value(new byte[0]));
}
try {
output.write(null, mutation);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Проблема медленной работы может заключаться в том, что я запускаю все это на тестовом экземпляре, одноузловом экземпляре Hadoop с ZooKeeper и Accumulo поверх него. Если это так, мне просто нужно найти решение для единственности.
Любая помощь или совет приветствуется.