Cassandra 3.7 CDC/добавочная загрузка данных

Я новичок в мире ETL и хочу реализовать добавочную загрузку данных с помощью Cassandra 3.7 и Spark. Я знаю, что более поздние версии Cassandra поддерживают CDC, но я могу использовать только Cassandra 3.7. Есть ли метод, с помощью которого я могу отслеживать только измененные записи и использовать искру для их загрузки, тем самым выполняя добавочную загрузку данных?

Если это невозможно сделать на стороне cassandra, любые другие предложения также приветствуются на стороне Spark :)


person reznov    schedule 21.08.2020    source источник


Ответы (1)


Это довольно широкая тема, и эффективное решение будет зависеть от объема данных в ваших таблицах, структуры таблиц, способа вставки/обновления данных и т. д. Кроме того, конкретное решение может зависеть от доступной версии Spark. Одним из недостатков метода только для Spark является то, что вы не можете легко обнаружить удаление данных, не имея полной копии предыдущего состояния, поэтому вы можете создать разницу между двумя состояниями.

Во всех случаях вам нужно будет выполнить полное сканирование таблицы, чтобы найти измененные записи, но если ваша таблица организована специально для этой задачи, вы можете избежать чтения всех данных. Например, если у вас есть таблица со следующей структурой:

create table test.tbl (
  pk int,
  ts timestamp,
  v1 ...,
  v2 ...,
  primary key(pk, ts));

тогда, если вы выполните следующий запрос:

import org.apache.spark.sql.cassandra._
val data = spark.read.cassandraFormat("tbl", "test").load()
val filtered = data.filter("""ts >= cast('2019-03-10T14:41:34.373+0000' as timestamp) 
                              AND ts <= cast('2019-03-10T19:01:56.316+0000' as timestamp)""")

затем Spark Cassandra Connector отправит этот запрос в Cassandra и будет считывать только данные, где ts находится в заданном временном диапазоне — вы можете проверить это, выполнив filtered.explain и проверив, что оба временных фильтра отмечены символом *.

Другой способ обнаружить изменения — получить время записи от Cassandra и отфильтровать изменения на основе этой информации. Получение времени записи поддерживается в RDD API для всех последних версий SCC и поддерживается в Dataframe API с выпуск SCC 2.5.0 (требуется как минимум Spark 2.4, хотя может работать и с 2.3). После получения этой информации вы можете применять фильтры к данным и извлекать изменения. Но нужно помнить о нескольких вещах:

  • нет способа обнаружить удаление с помощью этого метода
  • информация о времени записи существует только для обычных и статических столбцов, но не для столбцов первичного ключа
  • каждый столбец может иметь свое значение времени записи, в случае, если произошло частичное обновление строки после вставки
  • в большинстве версий Cassandra вызов функции writetime будет генерировать ошибку, когда это делается для столбца коллекции (список/карта/набор), и будет/может вернуть null для столбца с определенным пользователем типом

P.S. Даже если у вас включен CDC, его правильное использование не является тривиальной задачей:

  • вам нужно дедуплицировать изменения - у вас есть RF-копии изменений
  • некоторые изменения могли быть потеряны, например, когда узел был недоступен, а затем распространяться позже с помощью подсказок или исправлений.
  • TTL не прост в обращении
  • ...

Для CDC вы можете поискать презентации с конференции DataStax Accelerate 2019 — на эту тему было несколько докладов.

person Alex Ott    schedule 23.08.2020