Соединитель Debezium mongodb kafka не создает некоторые записи в теме, как в mongodb

В моем mongodb у меня есть эти данные

mongo01:PRIMARY> db.col.find({"_id" : ObjectId("5d8777f188fef5555b")})
{ "_id" : ObjectId("5d8777f188fef5555b"), "attachments" : [ { "name" : "Je", "src" : "https://google.co", "type" : "image/png" } ], "tags" : [ 51, 52 ], "last_comment" : [ ], "hashtags" : [ "Je" ], "badges" : [ ], "feed_id" : "1", "company_id" : 1, "message" : "aJsm9LtK", "group_id" : "106", "feed_type" : "post", "thumbnail" : "", "group_tag" : false, "like_count" : 0, "clap_count" : 0, "comment_count" : 0, "created_by" : 520, "created_at" : "1469577278628", "updated_at" : "1469577278628", "status" : 1, "__v" : 0 }

mongo01:PRIMARY> db.col.find({"_id" : ObjectId("5d285b4554e3b584bf97759")})
{ "_id" : ObjectId("5d285b4554e3b584bf97759"), "attachments" : [ ], "tags" : [ ], "last_comment" : [ ], "company_id" : 1, "group_id" : "00e35289", "feed_type" : "post", "group_tag" : false, "status" : 1, "feed_id" : "3dc44", "thumbnail" : "{}", "message" : "s2np1HYrPuFF", "created_by" : 1, "html_content" : "", "created_at" : "144687057949", "updated_at" : "144687057949", "like_count" : 0, "clap_count" : 0, "comment_count" : 0, "__v" : 0, "badges" : [ ], "hashtags" : [ ] }

Я использую этот соединитель debezium mongodb, чтобы получить данные mongodb в теме kafka.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 
   http://localhost:8083/connectors/ -d '{
   "name": "mongo_connector-4",
    "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.hosts": "mongo01/localhost:27017",
    "mongodb.name": "mongo_1",
    "collection.whitelist": "data.col",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false,
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "transforms" : "unwrap",
    "transforms.unwrap.type" : "io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope",
    "transforms.unwrap.drop.tombstones" : "false",
    "transforms.unwrap.delete.handling.mode" : "drop",
    "transforms.unwrap.operation.header" : "true",
    "errors.tolerance" : "all",
    "snapshot.delay.ms":"120000",
    "poll.interval.ms":"3000",
    "heartbeat.interval.ms":"90000"
  }
}'

теперь, печатая тему в ksql, я получаю, что для некоторых записей данные пришли со всеми столбцами (как это было в mongodb), а для некоторых записей некоторые столбцы отсутствуют.

ksql> print 'mongo_1.data.col' from beginning;
Format:JSON
{"ROWTIME":1571148520736,"ROWKEY":"{\"id\":\"5d8777f188fef5555b\"}","attachments":[{"name":"Je","src":"https://google.co","type":"image/png"}],"tags":[51,52],"last_comment":[],"hashtags":[],"badges":[],"feed_id":"1","company_id":1,"message":"aJsm9LtK","group_id":"106","feed_type":"post","thumbnail":"","group_tag":false,"like_count":0,"clap_count":0,"comment_count":0,"created_by":520,"created_at":"1469577278628","updated_at":"1469577278628","status":1,"__v":0,"id":"5d8777f188fef5555b"}
{"ROWTIME":1571148520736,"ROWKEY":"{\"id\":\"5d285b4554e3b584bf97759\"}","badges":[],"hashtags":[],"id":"5d285b4554e3b584bf97759"}

Почему это происходит и как решить эту проблему?

PS: единственная разница, которую я обнаружил, заключается в том, что обе записи имеют разный порядок столбцов.

При поиске по этой проблеме я нашел только близкую вещь здесь https://github.com/hpgrahsl/kafka-connect-mongodb что-то, что они говорят о постобработке и редактировании полей, содержащих конфиденциальные данные. Но, как видите, обе мои записи похожи и не содержат конфиденциальных данных (под конфиденциальными данными я подразумеваю зашифрованные данные, возможно, они имели в виду что-то другое).


person MAYANK BHARTI    schedule 16.10.2019    source источник


Ответы (1)


Не пропадают ли значения после обновлений? Не забывайте, что коннектор MongoDB предоставляет patch для обновлений, а не afterhttps://debezium.io/documentation/reference/0.10/connectors/mongodb.html#change-events-value

Если вам нужно создать полный формат after в случае MongoDB, вам нужно ввести конвейер Kafka Streams, который будет сохранять событие после вставки в постоянное хранилище, а затем объединять исправление с исходной вставкой для создания окончательного события.

person Jiri Pechanec    schedule 17.10.2019