Как вставить данные в друид через спокойствие

Следуя руководству по адресу http://druid.io/docs/latest/tutorials/tutorial-loading-streaming-data.html, мне удалось вставить данные в druid через консоль Kafka

Консоль Kafka

Файл спецификации выглядит следующим образом

examples / indexing / wikipedia.spec

[
  {
    "dataSchema" : {
      "dataSource" : "wikipedia",
      "parser" : {
        "type" : "string",
        "parseSpec" : {
          "format" : "json",
          "timestampSpec" : {
            "column" : "timestamp",
            "format" : "auto"
          },
          "dimensionsSpec" : {
            "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
            "dimensionExclusions" : [],
            "spatialDimensions" : []
          }
        }
      },
      "metricsSpec" : [{
        "type" : "count",
        "name" : "count"
      }, {
        "type" : "doubleSum",
        "name" : "added",
        "fieldName" : "added"
      }, {
        "type" : "doubleSum",
        "name" : "deleted",
        "fieldName" : "deleted"
      }, {
        "type" : "doubleSum",
        "name" : "delta",
        "fieldName" : "delta"
      }],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "DAY",
        "queryGranularity" : "NONE"
      }
    },
    "ioConfig" : {
      "type" : "realtime",
      "firehose": {
        "type": "kafka-0.8",
        "consumerProps": {
          "zookeeper.connect": "localhost:2181",
          "zookeeper.connection.timeout.ms" : "15000",
          "zookeeper.session.timeout.ms" : "15000",
          "zookeeper.sync.time.ms" : "5000",
          "group.id": "druid-example",
          "fetch.message.max.bytes" : "1048586",
          "auto.offset.reset": "largest",
          "auto.commit.enable": "false"
        },
        "feed": "wikipedia"
      },
      "plumber": {
        "type": "realtime"
      }
    },
    "tuningConfig": {
      "type" : "realtime",
      "maxRowsInMemory": 500000,
      "intermediatePersistPeriod": "PT10m",
      "windowPeriod": "PT10m",
      "basePersistDirectory": "\/tmp\/realtime\/basePersist",
      "rejectionPolicy": {
        "type": "messageTime"
      }
    }
  }
]

Я запускаю в реальном времени через

java -Xmx512m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=examples/indexing/wikipedia.spec -classpath config/_common:config/realtime:lib/* io.druid.cli.Main server realtime

В консоли Kafka я вставляю и ввожу следующее

{"timestamp": "2013-08-10T01:02:33Z", "page": "Good Bye", "language" : "en", "user" : "catty", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}

Затем я обычно выполняю запрос, создавая select.json и запускаю curl -X POST 'http://localhost:8084/druid/v2/?pretty' -H 'content-type: application/json' -d @select.json

select.json

 {
   "queryType": "select",
   "dataSource": "wikipedia",
   "dimensions":[],
   "metrics":[],
   "granularity": "all",
   "intervals": [
     "2000-01-01/2020-01-02"
   ],

   "filter" : {"type":"and",
        "fields" : [
                { "type": "selector", "dimension": "user", "value": "catty" }
        ]
   },

   "pagingSpec":{"pagingIdentifiers": {}, "threshold":500}
 }

Мне удалось получить следующий результат.

[ {
  "timestamp" : "2013-08-10T01:02:33.000Z",
  "result" : {
    "pagingIdentifiers" : {
      "wikipedia_2013-08-10T00:00:00.000Z_2013-08-11T00:00:00.000Z_2013-08-10T00:00:00.000Z" : 0
    },
    "events" : [ {
      "segmentId" : "wikipedia_2013-08-10T00:00:00.000Z_2013-08-11T00:00:00.000Z_2013-08-10T00:00:00.000Z",
      "offset" : 0,
      "event" : {
        "timestamp" : "2013-08-10T01:02:33.000Z",
        "continent" : "North America",
        "robot" : "false",
        "country" : "United States",
        "city" : "San Francisco",
        "newPage" : "true",
        "unpatrolled" : "true",
        "namespace" : "article",
        "anonymous" : "false",
        "language" : "en",
        "page" : "Good Bye",
        "region" : "Bay Area",
        "user" : "catty",
        "deleted" : 200.0,
        "added" : 57.0,
        "count" : 1,
        "delta" : -143.0
      }
    } ]
  }
} ]

Похоже, я правильно настроил Друид.

Теперь я хотел бы вставить данные через конечную точку HTTP. Согласно Каким образом ввод данных в Druid в реальном времени?, рекомендуется использовать tranquility

спокойствие

У меня запущена служба индексирования через

java -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath config/_common:config/overlord:lib/*: io.druid.cli.Main server overlord

conf / server.json выглядит так

{
   "dataSources" : [
      {
         "spec" : {
            "dataSchema" : {
                "dataSource" : "wikipedia",
                "parser" : {
                    "type" : "string",
                    "parseSpec" : {
                      "format" : "json",
                      "timestampSpec" : {
                        "column" : "timestamp",
                        "format" : "auto"
                      },
                      "dimensionsSpec" : {
                        "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
                        "dimensionExclusions" : [],
                        "spatialDimensions" : []
                      }
                    }
                },
                "metricsSpec" : [{
                    "type" : "count",
                    "name" : "count"
                }, {
                    "type" : "doubleSum",
                    "name" : "added",
                    "fieldName" : "added"
                }, {
                    "type" : "doubleSum",
                    "name" : "deleted",
                    "fieldName" : "deleted"
                }, {
                    "type" : "doubleSum",
                    "name" : "delta",
                    "fieldName" : "delta"
                }],
                "granularitySpec" : {
                    "type" : "uniform",
                    "segmentGranularity" : "DAY",
                    "queryGranularity" : "NONE"
                }
            },
            "tuningConfig" : {
               "windowPeriod" : "PT10M",
               "type" : "realtime",
               "intermediatePersistPeriod" : "PT10M",
               "maxRowsInMemory" : "100000"
            }
         },
         "properties" : {
            "task.partitions" : "1",
            "task.replicants" : "1"
         }
      }
   ],
   "properties" : {
      "zookeeper.connect" : "localhost",
      "http.port" : "8200",
      "http.threads" : "8"
   }
}

Затем я запускаю сервер, используя

bin/tranquility server -configFile conf/server.json

Я отправляю сообщение на http://xx.xxx.xxx.xxx:8200/v1/post/wikipedia, где content-type равно application/json

{"timestamp": "2013-08-10T01:02:33Z", "page": "Selamat Pagi", "language" : "en", "user" : "catty", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}

Я получаю следующий ответ

{"result":{"received":1,"sent":0}}

Похоже, Tranquility получил наши данные, но не смог отправить их друиду!

Я пытаюсь запустить curl -X POST 'http://localhost:8084/druid/v2/?pretty' -H 'content-type: application/json' -d @select.json, но не получаю результат, который я вставил через спокойствие.

Есть идеи, почему? Спасибо.


person Cheok Yan Cheng    schedule 14.03.2016    source источник
comment
Да, он отбрасывал сообщения из-за того, что отметка времени в ваших документах находилась за пределами вашего windowPeriod. то есть ваша временная метка должна быть в пределах 10 м от времени сервера.   -  person avr    schedule 14.03.2016
comment
Спасибо. Я не понимаю, что установка druid + tranquility не может принимать старые данные, как и в моем предыдущем тестировании с консолью druid + kafka, она может принимать старые данные. Это потому, что успокоение разговаривает с узлом индексирования друида, а консоль кафка разговаривает с узлом реального времени друида?   -  person Cheok Yan Cheng    schedule 15.03.2016
comment
Это зависит от того, какой тип rejectionPolicy вы используете. Вы использовали политику отклонения messageTime при приеме через узел реального времени, тогда как в спокойствии для нее было установлено значение по умолчанию, то есть serverTime. Для получения дополнительной информации вы можете прочитать о rejectionPolicy и windowPeriod.   -  person avr    schedule 15.03.2016
comment
@avr поддерживает ли Tranquility rejectionPolicy=messageTime? Я хочу использовать его для индексации почасовых пакетов, которые мы индексируем с помощью задач индексации hadoop, чтобы избежать зависимости от S3. Кстати, знаете ли вы, druid.coordinator.merge.on будет ли работать с сегментами, проиндексированными Tranquility?   -  person lisak    schedule 07.11.2017


Ответы (5)


Обычно это происходит, когда данные, которые вы отправляете, выходят за рамки периода окна. Если вы вставляете данные вручную, укажите точное текущее время (UTC) в миллисекундах. В противном случае это можно легко сделать, если вы используете какой-либо скрипт для генерации данных. Убедитесь, что это текущее время по Гринвичу.

person saurav.varma    schedule 15.05.2016

Чрезвычайно сложно настроить druid для правильной работы с вставкой данных в реальном времени.

Лучшее, что я нашел, - использовать https://github.com/implydata. Imply - это набор оберток вокруг друида, упрощающий его использование.

Однако вставка в реальном времени тоже не идеальна. Я провел эксперимент OutOfMemoryException, вставив 30 миллионов элементов в реальном времени. Это приведет к потере данных в ранее вставленных 30 миллионах строк.

Подробную информацию о потере данных можно найти здесь: https://groups.google.com/forum/#!topic/imply-user-group/95xpYojxiOg

Зарегистрирован запрос о проблеме: https://github.com/implydata/distribution/issues/8 < / а>

person Cheok Yan Cheng    schedule 24.03.2016
comment
Чеок, у вас есть ссылка на то, как настроить implydata, вам нужно что-то покупать у imply? - person rvazquezglez; 18.10.2019

Окно стриминга друидов Период очень короткий (10 минут). Вне этого периода ваше мероприятие будет проигнорировано.

person olivier.nouguier    schedule 09.05.2016

Как только вы получили {"result": {"receive": 1, "sent": 0}}, ваши рабочие потоки работают нормально. Tranquility решает, какие данные будут отправлены друиду, на основе временной метки, связанной с данными.

Этот период определяется конфигурацией windowPeriod. Итак, если ваш тип - реальное время («тип»: «реальное время») и период окна - PT10M («windowPeriod»: «PT10M»), tranquility отправит любые данные между t-10, t + 10 и не будет отправлять ничего за пределами этого периода. .

Я не согласен с проблемами эффективности вставки, с июня 2016 года мы отправляем 3 миллиона строк каждые 15 минут и все работает отлично. Конечно, у нас более сильная инфраструктура, рассчитанная на такой масштаб.

person karthik r    schedule 23.02.2017

Еще одна причина, по которой не выполняется вставка, - это нехватка памяти на запущенном координаторе / оверлоарде.

person Juan Pablo Lopez    schedule 04.09.2019