обновить один столбец в таблице кассандры

У меня есть таблица cassandra person_master (personId: int, customerId: Int, firstName: String, lastName: String, mrids: Set) primaryKey (personId и customerID)

Предположим, у меня есть один входной RDD структуры [personId, customerId, firstName, lastname, messageType: String, source: String, sourceType: String]

предположим, что значение RDD: [1001,119, None, None, {abc.xyz} и строка cassandra имеет значение [1001,119, Vikash, Singh, {aaa.bbb}]

Я хочу получить строку cassandra на основе значения RDD и обновить столбец mrids таблицы cassandra и использовать все остальные столбцы из строки cassandra.

например в этом случае я хочу, чтобы окончательное значение RDD было [1001,119, Vikash, Singh, {aaa.bbb, abc.xyz}], которое я обновлю до cassandra позже.

Может ли кто-нибудь дать мне решение сделать это в Spark с помощью Cassandra Connector.


person vikash kumar    schedule 17.04.2016    source источник


Ответы (1)


Предполагая, что sc - это sparkContext, например,

val sparkConf = new SparkConf().setMaster(SPARK_MASTER)
                            .setAppName(SPARK_SCALA_APP_NAME)
                            .setJars(SPARK_SCALA_JAR)
sparkConf.set("spark.cassandra.connection.host", value)
sparkConf.set("spark.cassandra.auth.username", value)
sparkConf.set("spark.cassandra.auth.password", value)
val sc = new SparkContext(sparkConf)

Вы можете использовать или игнорировать предложение where (где может использоваться, только если его ключ раздела)

val selectedRow = sc.cassandraTable("keyspace", "tableName")
      .select("key", "column2", "column3")
      .where("key IN ?", keys)
      .as((key: String, column2: String, column3: Integer)
          =>(key, column2, column3))

Выполните фильтрацию и модификацию на своем rdd. Затем сохраните его как

selectedRow.saveToCassandra("keyspace",
                           "tableName",
                           SomeColumns("key", "column2", "column3"))
person Abhishek Anand    schedule 18.04.2016