Debezium не предоставляет CDC для встроенной версии для mysql

Я использую зависимости ниже,

        <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-connector-oracle</artifactId>
        <version>${version.debezium}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/io.debezium/debezium-connector-mysql -->
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-connector-mysql</artifactId>
        <version>${version.debezium}</version>
    </dependency>

<version.debezium>0.8.3.Final</version.debezium>

Ниже мой метод Java,

public void runMysqlParsser() {

    Configuration config = Configuration.create()
            /* begin engine properties */
            .with("connector.class",
                    "io.debezium.connector.mysql.MySqlConnector")
            .with("offset.storage",
                    "org.apache.kafka.connect.storage.FileOffsetBackingStore")
            .with("offset.storage.file.filename",
                    "/home/mohit/tmp/offset.dat")
            .with("offset.flush.interval.ms", 60000)
            /* begin connector properties */
            .with("name", "my-sql-connector")
            .with("database.hostname", "localhost")
            .with("database.port", 3306)
            .with("database.user", "root")
            .with("database.password", "root")
            .with("server.id", 1)
            .with("database.server.name", "my-app-connector")
            .with("database.history",
                    "io.debezium.relational.history.FileDatabaseHistory")
            .with("database.history.file.filename",
                    "/home/mohit/tmp/dbhistory.dat")
            .with("database.whitelist", "mysql")
            .with("table.whitelist", "mysql.customers")
            .build();
    EmbeddedEngine engine = EmbeddedEngine.create()
            .using(config)
            .notifying(this::handleEvent)
            .build();
    Executor executor = Executors.newSingleThreadExecutor();
    executor.execute(engine);
}

    private void handleEvent(SourceRecord sourceRecord) {
    try {
        LOG.info("Got record :" + sourceRecord.toString());
    } catch (Exception ex) {
        LOG.info("exception in handle event:" + ex);
    }

Мои конфигурации sql,.

general_log_file = /var/log/mysql/mysql.log
general_log = 1
server-id               = 1
log_bin                 = /var/log/mysql/mysql-bin.log
expire_logs_days        = 10
max_binlog_size   = 100M
binlog_format     = row
binlog_row_image  = full
binlog_rows_query_log_events = on
gtid_mode        =  on
enforce_gtid_consistency   = on

Когда я запускаю этот код, я получаю смещение для журналов истории, также к файлу mysql.log добавляется смещение. Однако, когда я выполняю какой-либо оператор обновления для таблицы, он не дает мне никаких журналов, т.е. метод handleEvent не вызывается. Кто-нибудь может сказать мне, что не так с кодом или конфигурацией?

Ниже приведены журналы после запуска кода Java,

$$ java -jar debezium-gcp-1.0-SNAPSHOT-jar-with-dependencies.jar 

log4j: WARN Для регистратора (org.apache.kafka.connect.json.JsonConverterConfig) не найдены дополнения. log4j: ПРЕДУПРЕЖДЕНИЕ. Правильно инициализируйте систему log4j.

log4j: WARN См. http://logging.apache.org/log4j/1.2/faq.html#noconfig для получения дополнительной информации. 28 ноября 2018 г. 13:29:47 com.debezium.gcp.SampleMysqlEmbededDebezium handleEvent INFO: Получена запись: SourceRecord {sourcePartition = {server = my-app-connector}, sourceOffset = {file = mysql-bin.000002, pos = 980, gtids = 31b708c7-ee22-11e8-b8a3-080027fbf50e: 1-17, snapshot = true}} ConnectRecord {topic = 'my-app-connector', kafkaPartition = 0, key = Struct {databaseName =}, value = Struct {source = Struct {version = 0.8.3.Final, name = my-app-connector, server_id = 0, ts_sec = 0, file = mysql-bin.000002, pos = 980, row = 0, snapshot = true}, databaseName =, ddl = SET character_set_server = latin1, collation_server = latin1_swedish_ci;}, timestamp = null, headers = ConnectHeaders (headers =)} 28 ноября 2018 г., 13:29:47 com.github.shyiko.mysql.binlog.BinaryLogClient connect ИНФОРМАЦИЯ: подключен к localhost: 3306 по адресу 31b708c7-ee22-11e8-b8a3-080027fbf50e: 1-17 (sid: 6326, cid: 21)


person shiv    schedule 28.11.2018    source источник


Ответы (1)


Вы вносите в белый список правильную базу данных / таблицу?

Не могли бы вы взглянуть на эту демонстрацию - https://github.com/debezium/debezium-examples/tree/master/kinesis Просто оставьте код, связанный с Kinesis, и распечатайте его только на консоли. Также проверьте параметр конфигурации table.ignore.builtin. IMHO mysql база данных относится к встроенным и по умолчанию отфильтрована.

person Jiri Pechanec    schedule 28.11.2018
comment
Да, @Jiri Pechanec. Таблица - это mysql.customers. - person shiv; 28.11.2018