Wordcount на выходах (Key, Value) из карты Reduce

У меня есть несколько (title , text ) упорядоченных пар, полученных в результате работы приложения MapReduce в Hadoop с использованием Java.

Теперь я хотел бы реализовать Word Count в текстовом поле этих упорядоченных пар.

Итак, мой окончательный результат должен выглядеть так:

(title-a , word-a-1 , count-a-1 , word-a-2 , count-a-2 ....)

(title-b , word-b-1, count-b-1 , word-b-2 , count-b-2 ....)
.
.
.
.
(title-x , word-x-1, count-x-1 , word-x-2 , count-x-2 ....)

Подводя итог, я хочу реализовать wordcount отдельно для выходных записей из первого mapreduce. Может ли кто-нибудь предложить мне хороший способ сделать это или как я могу связать вторую задачу уменьшения карты, чтобы создать вышеуказанный результат или лучше отформатировать его?

Ниже приведен код, заимствованный из github и внесенный в него некоторые изменения.

package com.org;
import javax.xml.stream.XMLStreamConstants;//XMLInputFactory;
import java.io.*;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;               
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import javax.xml.stream.*;

public class XmlParser11
{

    public static class XmlInputFormat1 extends TextInputFormat {

    public static final String START_TAG_KEY = "xmlinput.start";
    public static final String END_TAG_KEY = "xmlinput.end";


    public RecordReader<LongWritable, Text> createRecordReader(
            InputSplit split, TaskAttemptContext context) {
        return new XmlRecordReader();
    }

    /**
     * XMLRecordReader class to read through a given xml document to output
     * xml blocks as records as specified by the start tag and end tag
     *
     */
    // @Override
    public static class XmlRecordReader extends
            RecordReader<LongWritable, Text> {
        private byte[] startTag;
        private byte[] endTag;
        private long start;
        private long end;
        private FSDataInputStream fsin;
        private DataOutputBuffer buffer = new DataOutputBuffer();

        private LongWritable key = new LongWritable();
        private Text value = new Text();
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
            endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
            FileSplit fileSplit = (FileSplit) split;

            // open the file and seek to the start of the split
            start = fileSplit.getStart();
            end = start + fileSplit.getLength();
            Path file = fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            fsin = fs.open(fileSplit.getPath());
            fsin.seek(start);

        }
    @Override
    public boolean nextKeyValue() throws IOException,
                InterruptedException {
            if (fsin.getPos() < end) {
                if (readUntilMatch(startTag, false)) {
                    try {
                        buffer.write(startTag);
                        if (readUntilMatch(endTag, true)) {
                            key.set(fsin.getPos());
                            value.set(buffer.getData(), 0,
                                    buffer.getLength());
                            return true;
                        }
                    } finally {
                        buffer.reset();
                    }
                }
            }
            return false;
        }
    @Override
    public LongWritable getCurrentKey() throws IOException,
                InterruptedException {
            return key;
        }

    @Override
    public Text getCurrentValue() throws IOException,
                InterruptedException {
            return value;
        }
    @Override
    public void close() throws IOException {
            fsin.close();
        }
    @Override
        public float getProgress() throws IOException {
            return (fsin.getPos() - start) / (float) (end - start);
        }

        private boolean readUntilMatch(byte[] match, boolean withinBlock)
                throws IOException {
            int i = 0;
            while (true) {
                int b = fsin.read();
                // end of file:
                if (b == -1)
                    return false;
                // save to buffer:
                if (withinBlock)
                    buffer.write(b);
                // check if we're matching:
        if (b == match[i]) {
                    i++;
                    if (i >= match.length)
                        return true;
                } else
                    i = 0;
                // see if we've passed the stop point:
                if (!withinBlock && i == 0 && fsin.getPos() >= end)
                    return false;
            }
        }
    }
}


    public static class Map extends Mapper<LongWritable, Text,Text, Text> {
  @Override
  protected void map(LongWritable key, Text value,
                 Mapper.Context context)
  throws
  IOException, InterruptedException {
    String document = value.toString();
    System.out.println("'" + document + "'");
        try {
      XMLStreamReader reader =                      XMLInputFactory.newInstance().createXMLStreamReader(new     
           ByteArrayInputStream(document.getBytes()));
  String propertyName = "";
  String propertyValue = "";
  String currentElement = "";
  while (reader.hasNext()) {
    int code = reader.next();
    switch (code) {
      case XMLStreamConstants.START_ELEMENT: //START_ELEMENT:
        currentElement = reader.getLocalName();
        break;
      case XMLStreamConstants.CHARACTERS:  //CHARACTERS:
        if (currentElement.equalsIgnoreCase("title")) {
          propertyName += reader.getText();
          //System.out.println(propertyName);
        } else if (currentElement.equalsIgnoreCase("text")) {
     propertyValue += reader.getText();
          //System.out.println(propertyValue);
        }
        break;
    }
  }
  reader.close();
  context.write(new Text(propertyName.trim()), new Text(propertyValue.trim()));

}
    catch(Exception e){
            throw new IOException(e);

            }

  }
}
public static class Reduce
extends Reducer<Text, Text, Text, Text> {

  @Override
  protected void setup(
  Context context)
  throws IOException, InterruptedException {
    context.write(new Text("<Start>"), null);
  }

  @Override
  protected void cleanup(
  Context context)
  throws IOException, InterruptedException {
    context.write(new Text("</Start>"), null);
  }

  private Text outputKey = new Text();
  public void reduce(Text key, Iterable<Text> values,
                 Context context)
  throws IOException, InterruptedException {
for (Text value : values) {
      outputKey.set(constructPropertyXml(key, value));
      context.write(outputKey, null);
    }
 }

  public static String constructPropertyXml(Text name, Text value) {
    StringBuilder sb = new StringBuilder();
    sb.append("<property><name>").append(name)
    .append("</name><value>").append(value)
    .append("</value></property>");
    return sb.toString();
  }
}



    public static void main(String[] args) throws Exception
    {
            Configuration conf = new Configuration();

            conf.set("xmlinput.start", "<page>");
            conf.set("xmlinput.end", "</page>");
            Job job = new Job(conf);
            job.setJarByClass(XmlParser11.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            job.setMapperClass(XmlParser11.Map.class);
            job.setReducerClass(XmlParser11.Reduce.class);

            job.setInputFormatClass(XmlInputFormat1.class);
            job.setOutputFormatClass(TextOutputFormat.class);

            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            job.waitForCompletion(true);
    }
}

Код wordcount, который мы находим в Интернете, подсчитывает количество слов во всех файлах и выдает результат. Я хочу сделать подсчет слов для каждого текстового поля отдельно. Вышеупомянутый преобразователь используется для извлечения заголовка и текста из XML-документа. Есть ли способ сделать подсчет слов в том же картографе. Если я это сделаю, мое следующее сомнение заключается в том, как передать его вместе с уже существующими парами значений ключа (заголовок, текст) в редуктор. Извините, я не могу правильно сформулировать свой вопрос, но полагаю, что читатель должен иметь какое-то представление


person user2623946    schedule 01.08.2013    source источник
comment
SSCCE значительно упростит чтение и понимание этого вопроса.   -  person Sam I am says Reinstate Monica    schedule 02.08.2013


Ответы (2)


Я не уверен, правильно ли я это понял. Так что у меня много вопросов к моему ответу.

Прежде всего, кто бы ни написал этот код, вероятно, пытается показать, как написать настраиваемый InputFormat для обработки XML-данных с помощью MR. Не знаю, как это связано с твоей проблемой.

Подводя итог, я хочу реализовать wordcount отдельно для выходных записей из первого mapreduce. Может кто-нибудь подскажет, как это сделать?

Прочтите выходной файл, сгенерированный первым MR, и сделайте это.

или как я могу связать вторую задачу уменьшения карты, чтобы создать вышеуказанный результат или лучше отформатировать его?

Вы можете определенно связать задания вместе таким образом, написав несколько методов драйвера, по одному для каждого задания. См. this для получения дополнительных сведений и this в качестве примера.

Я хочу подсчитать количество слов для каждого текстового поля отдельно.

Что вы подразумеваете под отдельно? В традиционной программе подсчета слов количество каждого слова рассчитывается независимо от других.

Можно ли как-нибудь подсчитать количество слов в том же картографе?

Надеюсь, вы правильно поняли программу wordcount. В традиционной программе wordcount вы читаете входной файл по одной строке за раз, разбиваете строку на слова и затем выдаете каждое слово как ключ со значением 1. Все это происходит внутри Mapper, который по сути является тем же Mapper. А затем общее количество каждого слова определяется в разделе «Редуктор» вашей работы. Если вы хотите выдать слова с их общим счетчиком из самого картографа, вы должны прочитать весь файл в самом картографе и произвести подсчет. Для этого вам нужно установить isSplittable в вашем InputFormat на false, чтобы ваш входной файл читался как единое целое и передавался только одному Mapper.

Когда вы отправляете что-то из Mapper, и если это задание не только для карты, выходные данные Mapper автоматически отправляются в Reducer. Вам еще что-нибудь нужно?

person Tariq    schedule 01.08.2013
comment
Не могли бы вы показать мне свой входной xml-файл и немного подробнее объяснить, что именно вы пытаетесь с ним делать? - person Tariq; 02.08.2013
comment
Мой вклад - дамп из Википедии. Каждая страница Википедии представляет собой XML-файл, содержащий такие объекты, как заголовок, автор, текст, редакция и т. Д. Моя mapreduce анализирует заголовок и текст и сохраняет их как пару ключ-значение. Заголовок - это ключ, а текст - значение. Теперь мне нужно подсчитать количество слов для текста, присутствующего на каждой странице. Я должен вести подсчет слов для каждой страницы отдельно. Предположим, у вас есть 3 страницы Страница 1 - Заголовок-1, текст-1 Страница 2 - Заголовок-2, текст-2 Страница 3 - Заголовок-3, текст-3 У меня есть количество слов для страницы 1. На выходе должна быть пара ключ-значение, где ключом по-прежнему является Заголовок-1. - person user2623946; 02.08.2013
comment
(Продолжение) - Но значение составное, оно содержит каждое слово и количество раз, которое оно встречается на странице. Подсчет слов на каждой странице должен производиться отдельно. Я не могу понять, как это сделать? Я готов предоставить как можно больше информации. Но, пожалуйста, не закрывайте эту страницу, говоря, что предоставлено недостаточно информации, так как я не специалист в области компьютерных наук и никогда не изучаю программирование формально. - person user2623946; 02.08.2013
comment
Пример ввода выглядит следующим образом ‹page› ‹title› ABC ‹/title› ‹id› 123 ‹/id› ‹revision› ‹id› 100 ‹/id› .... ‹/revision› ‹revision› ‹id› 200 ‹/Id› .... ‹/revision› ‹revision› ‹id› 300 ‹/id› .... ‹/revision› ‹text› Это пример страницы о том, как реализовать подсчет слов в java с помощью hadoop ‹/ текст ›‹/page› - person user2623946; 02.08.2013
comment
Идеально. Большое спасибо за все подробности. Дай мне хорошенько взглянуть на это. И я не нахожу причин отмечать этот вопрос как закрывающий. - person Tariq; 02.08.2013

я предложил вам использовать регулярное выражение

и выполнить отображение и группировку. в файле jar примера hadoop предоставьте класс Grep, используя это, вы можете выполнить сопоставление данных hdfs с использованием регулярного выражения. и сгруппируйте ваши сопоставленные данные.

person Alkesh_IT    schedule 16.09.2013