Агрегировать только самый новый документ

У меня есть эластичный индекс, в котором есть документы для истории состояния пользователя. Данные выглядят так;

  {
    "session_id": "yunus",
    "state_name": "start",
    "entry_time": "2016-11-09 15:27:03"
  },
  {
    "session_id": "yunus",
    "state_name": "end",
    "entry_time": "2016-11-09 16:30:00"
  },
  {
    "session_id": "can",
    "state_name": "start",
    "entry_time": "2016-11-09 12:01:00"
  },
  {
    "session_id": "rick",
    "state_name": "start",
    "entry_time": "2016-11-09 09:00:00"
  },
  {
    "session_id": "rick",
    "state_name": "end",
    "entry_time": "2016-11-10 10:00:00"
  }

Я хочу агрегировать по имени состояния с гистограммой даты, но только для актуального последнего состояния в то время. Так что результат может быть;

2016-11-08 
start = 0
end = 0

2016-11-09 
start = 2
end = 1

2016-11-10 
start = 1
end = 2

На самом деле план состоит в том, чтобы создать сгруппированную гистограмму с временной шкалой, чтобы показать изменение состояний с течением времени.

Я попробовал несколько вещей, таких как конвейеры агрегации, лучшие хиты, но не смог добиться никакого прогресса.

Любая помощь приветствуется.


person Fatih Donmez    schedule 13.11.2016    source источник


Ответы (1)


Для всех, кто заинтересован, я решил это с помощью искры. Я использовал elastic-spark для чтения из elasticsearch и затем напишите ответ в elasticsearch.

Вот чтение из es как Rdd;

val allData = sc.esRDD(s"states_${id}/log", query)

Затем я сначала группирую по идентификатору сеанса, сортирую по дате, чтобы найти только последнее состояние сеанса;

val latestStates = allData.groupBy(k => k._2.get("session_id").get).map(k => (k._2).reduceLeft((d1, d2) => {
  d1._2.get("timestamp").get.asInstanceOf[Long] > d2._2.get("timestamp").get.asInstanceOf[Long] match {
    case true => d1
    case false => d2
  }
})).map(_._2)

Когда у меня есть последние состояния сеанса, я фильтрую состояния выхода, а затем считаю по значению;

val stateSummary = latestStates
  .filter(s => s.isDefinedAt("state_id") && s("state_id").asInstanceOf[Long] != -1)
  .map(s => (s("state_id"), s("state_name")))
  .countByValue()
  .map(d => Map("state_id" -> d._1._1.asInstanceOf[Long], "state_name" -> d._1._2.asInstanceOf[String], "count" -> d._2)).toList

Теперь у нас есть текущее количество сессий в штатах. (текущий настраивается, поэтому мы можем установить его на определенное время), осталось только одно, написать обратно в elasticsearch;

sc.makeRDD(Seq(finalElasticDoc)).saveToEs(s"states_${id}/analytic_daily")
person Fatih Donmez    schedule 28.11.2016