Как посчитать уникальные слова в потоке?

Есть ли способ подсчитать количество уникальных слов в потоке с помощью Flink Streaming? Результатом будет поток чисел, который продолжает увеличиваться.


person Jun    schedule 31.03.2016    source источник


Ответы (1)


Вы можете решить эту проблему, сохранив все слова, которые вы уже видели. Обладая этими знаниями, вы сможете отфильтровать все повторяющиеся слова. Остальные могут быть подсчитаны оператором карты с параллелизмом 1. Следующий фрагмент кода делает именно это.

val env = StreamExecutionEnvironment.getExecutionEnvironment

val inputStream = env.fromElements("foo", "bar", "foobar", "bar", "barfoo", "foobar", "foo", "fo")

// filter words out which we have already seen
val uniqueWords = inputStream.keyBy(x => x).filterWithState{
  (word, seenWordsState: Option[Set[String]]) => seenWordsState match {
    case None => (true, Some(HashSet(word)))
    case Some(seenWords) => (!seenWords.contains(word), Some(seenWords + word))
  }
}

// count the number of incoming (first seen) words
val numberUniqueWords = uniqueWords.keyBy(x => 0).mapWithState{
  (word, counterState: Option[Int]) =>
    counterState match {
      case None => (1, Some(1))
      case Some(counter) => (counter + 1, Some(counter + 1))
    }
}.setParallelism(1)

numberUniqueWords.print();

env.execute()
person Till Rohrmann    schedule 01.04.2016
comment
Может ли это вызвать OOM или снижение производительности, если входящий поток бесконечен, а набор строк (в filterWithState) становится слишком большим? - person Maxim; 01.04.2016
comment
Нет, если вы используете серверную часть состояния, которая поддерживает вне ядра. RocksDBStateBackend — это такой сервер состояния. Если вы используете серверную часть состояния памяти, вам придется время от времени очищать состояние, иначе вы можете столкнуться с OOM. - person Till Rohrmann; 01.04.2016
comment
Еще один вопрос, насколько я понимаю, операции сохранения/восстановления в/из бэкенда RocksDBStateBackend в этом случае имеют сложность O(N), где N - количество элементов в наборе, т.е. этот бэкенд всегда сохраняет/восстанавливает все элементы набора или только измененные элементы? - person Maxim; 01.04.2016
comment
Эта реализация использует абстракцию ValueState, которая всегда сохраняет/восстанавливает полный Set. Однако, вероятно, можно также использовать абстракцию ListState, чтобы сделать контрольные точки инкрементными. - person Till Rohrmann; 01.04.2016
comment
Привет, Тилль, filterWithState доступен только в версии 1.1? Я не смог найти его во Flink 1.0.0. - person Jun; 01.04.2016
comment
Метод filterWithState является частью Scala API Flink, начиная с версии 1.0.0. Однако это применимо только к KeyedStreams. Это означает, что вы должны сначала вызвать keyBy в потоке. - person Till Rohrmann; 01.04.2016