Как зарегистрироваться и использовать схему AVRO для Kafka Connect и Spark?

Я не совсем понимаю, как я могу зарегистрировать схему, чтобы использовать ее в соединителях источника или приемника jdbc и читать данные в Spark.

это схема avro, которую я хотел бы использовать для извлечения записей из базы данных MS SQL.

{
 "type": "record",
 "name": "myrecord",
 "fields": [
   { "name": "int1", "type": "int" },
   { "name": "str1", "type": "string" },
   { "name": "str2", "type": "string" }
 ]
} 

я хочу использовать эту схему для этого соединителя источника

{"name": "mssql-source",
 "config": {
 "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
 "key.converter": "io.confluent.connect.avro.AvroConverter",
 "key.converter.schema.registry.url": "http://localhost:8081",
 "value.converter": "io.confluent.connect.avro.AvroConverter",
 "value.converter.schema.registry.url": "http://localhost:8081",
 "incrementing.column.name": "int1",
 "tasks.max": "1",
 "table.whitelist": "Hello",
 "mode": "incrementing",
 "topic.prefix": "mssql-",
 "name": "mssql-source",
 "connection.url":
 "jdbc:sqlserver://XXX.XXX.X;databaseName=XXX;username=XX;password=XX"
 }

и это Spark Consumer, который я использую

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class SparkAvroConsumer {


    private static Injection<GenericRecord, byte[]> recordInjection;

    private static final String USER_SCHEMA = "{"
            + "\"type\":\"record\","
            + "\"name\":\"myrecord\","
            + "\"fields\":["
            + "  { \"name\":\"int1\", \"type\":\"int\" },"
            + "  { \"name\":\"str1\", \"type\":\"string\" },"
            + "  { \"name\":\"str2\", \"type\":\"string\" }"
            + "]}";

    static {
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(USER_SCHEMA);
        recordInjection = GenericAvroCodecs.toBinary(schema);
    }

    public static void main(String[] args) {

        SparkConf conf = new SparkConf()
                .setAppName("kafka-sandbox")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

        Set<String> topics = Collections.singleton("mssql-Hello");
        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", "localhost:9092");
        kafkaParams.put("metadata.broker.list", "localhost:9092");
        kafkaParams.put("schema.registry.url", "http://localhost:8081");
        JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(ssc,
                String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics);

        directKafkaStream
                .map(message -> recordInjection.invert(message._2).get())
                .foreachRDD(rdd -> {
                    rdd.foreach(record -> {
                        System.out.println("int1= " + record.get("int1")
                                + ", str1= " + record.get("str1")
                                + ", str2=" + record.get("str2"));
                    });
                });

        ssc.start();
        ssc.awaitTermination();
    }

}

comment
Возможно, вы захотите попробовать это: схема реестра"> stackoverflow.com/questions/40728056/   -  person Explorer    schedule 09.04.2018
comment
@Explorer я сделал, но я до сих пор не понимаю, как я могу использовать его после регистрации.   -  person Mahmoud Elbably    schedule 09.04.2018


Ответы (1)


Каждая схема имеет schemaId, когда вы регистрируете схему в Confluent Schema Registry, она создает для нее Int Id. ID добавляется к сообщениям, отправленным исходной системой. (Проверьте эту ссылку). Вы можете использовать CachedSchemaRegistryClient, чтобы получить схему из SchemaRegistry, вы можете сделать что-то вроде этого (его код Scala):

var schemaRegistry: SchemaRegistryClient = null
val url = "http://schema.registry.url:8181"
schemaRegistry = new CachedSchemaRegistryClient(url, 10)
val schema = schemaRegistry.getByID(schemaId) // consult the Schema Registry if you know the `SchemaId` in advance (you get this while registering your Schema)
//CachedSchemaRegistryClient have getAllSubjects API that will return all the schemas in your registry.
println(schema)

Если вы хотите получить идентификатор схемы из входящих сообщений, сделайте что-то вроде:

def getSchema(buffer: Array[Byte]): String = { //buffer is your incoming binary Avro message
    val url = "http://schema.registry.url:8181"
    val schemaRegistry = new CachedSchemaRegistryClient(url, 10)
    val bb = ByteBuffer.wrap(buffer)
    bb.get() // consume MAGIC_BYTE
    val schemaId = bb.getInt // consume schemaId //println(schemaId.toString)
    //println(schemaId.toString)
    schemaRegistry.getByID(schemaId) // consult the Schema Registry
}

Надеюсь, это поможет.

person Explorer    schedule 09.04.2018
comment
В идеале вы должны определить реестр схемы в другом месте, чтобы кэшировать идентификаторы. В противном случае каждый запрос getSchema создает новый объект реестра. - person OneCricketeer; 10.04.2018