Присоединение схемы JSON к записям потока KSQL

Я использую KSQL, и пока он отлично работает. Но теперь я хотел бы передать вывод в BigQuery через Kafka Connect, и мне нужно прикрепить схему JSON. Мне сложно понять, как это сделать. Вот мой запрос:

CREATE STREAM tweets_original (
      CreatedAt BIGINT,
      Id BIGINT,
      Text VARCHAR,
      Source VARCHAR,
      GeoLocation VARCHAR,
      User STRUCT<Id BIGINT, Name VARCHAR, Description VARCHAR, ScreenName VARCHAR, URL VARCHAR, FollowersCount BIGINT, FriendsCount BIGINT>
    )
    WITH (kafka_topic='tweets', value_format='JSON');

    CREATE STREAM tweets_new
    WITH (kafka_topic='tweets-new') AS
    SELECT
      CreatedAt as created_at,
      Id as tweet_id,
      Text as tweet_text,
      Source as source,
      GeoLocation as geo_location,
      User->Id as user_id,
      User->Name as user_name,
      User->Description as user_description,
      User->ScreenName as user_screenname
    FROM tweets_original ;

Вот пример записи, которая была записана в тему вывода (tweets-new).

{
  "CREATED_AT": 1535036410000,
  "TWEET_ID": 1032643668614819800,
  "TWEET_TEXT": "Sample text",
  "SOURCE": "<a href=\"http://twitter.com\" rel=\"nofollow\">Twitter Web Client</a>",
  "GEO_LOCATION": null,
  "USER_ID": 123,
  "USER_NAME": "John Smith",
  "USER_DESCRIPTION": "Developer in Chief",
  "USER_SCREENNAME": "newphonewhodis"
}

Однако, чтобы Kafka Connect передавал эти записи в BigQuery, мне нужно прикрепить схему, например:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int64",
        "optional": false,
        "field": "CREATED_AT"
      },
      {
        "type": "int64",
        "optional": false,
        "field": "TWEET_ID"
      },
      {
        "type": "string",
        "optional": false,
        "field": "TWEET_TEXT"
      }
      ...
    ],
    "optional": false,
    "name": "foobar"
  },
  "payload": {...}
}

В любом случае, я не вижу в документации ничего, что показывало бы, как я могу подойти к этому (возможно, я ищу не в том месте). Любая помощь будет принята с благодарностью!


person foxygen    schedule 23.08.2018    source источник
comment
Если вы изменили тему вывода на формат Avro, а затем использовали AvroConverter в конфигурации Kafka Connect, приемник, вероятно, будет работать ... Я не думаю, что вы можете заставить KSQL выводить запись, сформированную схемой / полезной нагрузкой. В противном случае вы можете попытаться установить schemas.enable в значение false в JsonConverter   -  person OneCricketeer    schedule 24.08.2018


Ответы (1)


Это простое решение для KSQL, просто обновите второй поток до AVRO.

CREATE STREAM tweets_new
    WITH (kafka_topic='tweets-new', value_format='AVRO') AS
    SELECT
      CreatedAt as created_at,
      Id as tweet_id,
      Text as tweet_text,
      Source as source,
      GeoLocation as geo_location,
      User->Id as user_id,
      User->Name as user_name,
      User->Description as user_description,
      User->ScreenName as user_screenname
    FROM tweets_original ;

Затем в конфигурации Kafka Connect вы можете использовать AvroConvertor и разрешить эволюцию / управление схемой в Google Big Query.

person Chris    schedule 05.06.2019