Исключение возникает при вставке данных в друид через спокойствие

Я подталкиваю поток кафки к друиду через успокоение. версия кафки - 0.9.1, спокойствие - 0.8, друид - 0.10. tranquility запускается нормально, когда сообщение не создается, но когда производитель отправляет сообщение, я получаю JsonMappingException следующим образом:

ava.lang.IllegalArgumentException: Can not deserialize instance of java.util.ArrayList out of VALUE_STRING token
 at [Source: N/A; line: -1, column: -1]
    at com.fasterxml.jackson.databind.ObjectMapper._convert(ObjectMapper.java:2774) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]
    at com.fasterxml.jackson.databind.ObjectMapper.convertValue(ObjectMapper.java:2700) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]
    at com.metamx.tranquility.druid.DruidBeams$.makeFireDepartment(DruidBeams.scala:406) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]
    at com.metamx.tranquility.druid.DruidBeams$.fromConfigInternal(DruidBeams.scala:291) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]
    at com.metamx.tranquility.druid.DruidBeams$.fromConfig(DruidBeams.scala:199) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]
    at com.metamx.tranquility.kafka.KafkaBeamUtils$.createTranquilizer(KafkaBeamUtils.scala:40) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]
    at com.metamx.tranquility.kafka.KafkaBeamUtils.createTranquilizer(KafkaBeamUtils.scala) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]
    at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.<init>(TranquilityEventWriter.java:64) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]
    at com.metamx.tranquility.kafka.writer.WriterController.createWriter(WriterController.java:171) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]
    at com.metamx.tranquility.kafka.writer.WriterController.getWriter(WriterController.java:98) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]
    at com.metamx.tranquility.kafka.KafkaConsumer$2.run(KafkaConsumer.java:231) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_67]
    at java.util.concurrent.FutureTask.run(FutureTask.java:262) [na:1.7.0_67]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_67]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]

а мой kafka.json:

{
  "dataSources" : {
     "stock-index-topic" : {
      "spec" : {
        "dataSchema" : {
          "dataSource" : "stock-index-topic",
          "parser" : {
            "type" : "string",
            "parseSpec" : {
              "timestampSpec" : {
                "column" : "timestamp",
                "format" : "auto"
              },
              "dimensionsSpec" : {
                "dimensions" : ["code","name","acronym","market","tradeVolume","totalValueTraded","preClosePx","openPrice","highPrice","lowPrice","tradePrice","closePx","timestamp"],
                "dimensionExclusions" : [
                  "timestamp",
                  "value"
                ]
              },
              "format" : "json"
            }
          },
          "granularitySpec" : {
            "type" : "uniform",
            "segmentGranularity" : "DAY",
            "queryGranularity" : "none",
            "intervals":"no"
          },
          "metricsSpec" : [
            {
              "name" : "firstPrice",
              "type" : "doubleFirst",
              "fieldName" : "tradePrice"
            },{
              "name" : "lastPrice",
              "type" : "doubleLast",
              "fieldName" : "tradePrice"
            }, {
              "name" : "minPrice",
              "type" : "doubleMin",
              "fieldName" : "tradePrice"
            }, {
              "name" : "maxPrice",
              "type" : "doubleMax",
              "fieldName" : "tradePrice"
            }
          ]
        },
        "ioConfig" : {
          "type" : "realtime"
        },
        "tuningConfig" : {
          "type" : "realtime",
          "maxRowsInMemory" : "100000",
          "intermediatePersistPeriod" : "PT10M",
          "windowPeriod" : "PT10M"
        }
      },
      "properties" : {
        "task.partitions" : "1",
        "task.replicants" : "1",
        "topicPattern" : "stock-index-topic"
      }
    }
  },
  "properties" : {
    "zookeeper.connect" : "localhost:2181",
    "druid.discovery.curator.path" : "/druid/discovery",
    "druid.selectors.indexing.serviceName" : "druid/overlord",
    "commit.periodMillis" : "15000",
    "consumer.numThreads" : "2",
    "kafka.zookeeper.connect" : "localhost:2181",
    "kafka.group.id" : "tranquility-kafka"
  }
}

я использую kafka-console-consumer для получения данных, это выглядит как

{"code": "399982", "name": "500等权", "acronym": "500DQ", "market": "102", "tradeVolume": 0, "totalValueTraded": 0.0, "preClosePx": 0.0, "openPrice": 0.0, "highPrice": 0.0, "lowPrice": 0.0, "tradePrice": 7184.7142, "closePx": 0.0, "timestamp": "2017-05-16T09:06:39.000+08:00"}

Есть идеи, почему? Спасибо.


person Yuezhi Liu    schedule 16.05.2017    source источник


Ответы (1)


"metricsSpec" : [
            {
              "name" : "firstPrice",
              "type" : "doubleFirst",
              "fieldName" : "tradePrice"
            },{
              "name" : "lastPrice",
              "type" : "doubleLast",
              "fieldName" : "tradePrice"
            }, {
              "name" : "minPrice",
              "type" : "doubleMin",
              "fieldName" : "tradePrice"
            }, {
              "name" : "maxPrice",
              "type" : "doubleMax",
              "fieldName" : "tradePrice"
            }
          ]
        },

В документе сказано: «Первый и последний агрегаторы» не могут использоваться в спецификации приема и должны указываться только как часть запросов. Итак, вопрос решен.

person Yuezhi Liu    schedule 17.05.2017