Есть ли способ подсчитать количество уникальных слов в потоке с помощью Flink Streaming? Результатом будет поток чисел, который продолжает увеличиваться.
Как посчитать уникальные слова в потоке?
Ответы (1)
Вы можете решить эту проблему, сохранив все слова, которые вы уже видели. Обладая этими знаниями, вы сможете отфильтровать все повторяющиеся слова. Остальные могут быть подсчитаны оператором карты с параллелизмом 1
. Следующий фрагмент кода делает именно это.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream = env.fromElements("foo", "bar", "foobar", "bar", "barfoo", "foobar", "foo", "fo")
// filter words out which we have already seen
val uniqueWords = inputStream.keyBy(x => x).filterWithState{
(word, seenWordsState: Option[Set[String]]) => seenWordsState match {
case None => (true, Some(HashSet(word)))
case Some(seenWords) => (!seenWords.contains(word), Some(seenWords + word))
}
}
// count the number of incoming (first seen) words
val numberUniqueWords = uniqueWords.keyBy(x => 0).mapWithState{
(word, counterState: Option[Int]) =>
counterState match {
case None => (1, Some(1))
case Some(counter) => (counter + 1, Some(counter + 1))
}
}.setParallelism(1)
numberUniqueWords.print();
env.execute()
person
Till Rohrmann
schedule
01.04.2016
Может ли это вызвать OOM или снижение производительности, если входящий поток бесконечен, а набор строк (в
filterWithState
) становится слишком большим?
- person Maxim; 01.04.2016
Нет, если вы используете серверную часть состояния, которая поддерживает вне ядра.
RocksDBStateBackend
— это такой сервер состояния. Если вы используете серверную часть состояния памяти, вам придется время от времени очищать состояние, иначе вы можете столкнуться с OOM.
- person Till Rohrmann; 01.04.2016
Еще один вопрос, насколько я понимаю, операции сохранения/восстановления в/из бэкенда
RocksDBStateBackend
в этом случае имеют сложность O(N), где N - количество элементов в наборе, т.е. этот бэкенд всегда сохраняет/восстанавливает все элементы набора или только измененные элементы?
- person Maxim; 01.04.2016
Эта реализация использует абстракцию
ValueState
, которая всегда сохраняет/восстанавливает полный Set
. Однако, вероятно, можно также использовать абстракцию ListState
, чтобы сделать контрольные точки инкрементными.
- person Till Rohrmann; 01.04.2016
Привет, Тилль,
filterWithState
доступен только в версии 1.1? Я не смог найти его во Flink 1.0.0.
- person Jun; 01.04.2016
Метод
filterWithState
является частью Scala API Flink, начиная с версии 1.0.0. Однако это применимо только к KeyedStreams
. Это означает, что вы должны сначала вызвать keyBy
в потоке.
- person Till Rohrmann; 01.04.2016