Мой коннектор раковины Kafka для Neo4j не загружается

Введение:

Позвольте мне начать с извинений за неопределенность в моем вопросе. Я постараюсь предоставить как можно больше информации по этой теме (надеюсь, не слишком много), и, пожалуйста, дайте мне знать, если я должен предоставить больше. Кроме того, я новичок в Kafka и, вероятно, наткнусь на терминологию.

Итак, исходя из моего понимания того, как работают приемник и источник, я могу использовать FileStreamSourceConnector, предоставленный руководством Kafka Quickstart, для записи данных (команд Neo4j) в тему, хранящуюся в кластере Kafka. Затем я могу написать свой собственный коннектор приемника Neo4j и задачу для чтения этих команд и отправки их на один или несколько серверов Neo4j. Чтобы сделать проект максимально простым, на данный момент я основал коннектор и задачу приемника на основе FileStreamSinkConnector и FileStreamSinkTask из руководства Kafka Quickstart.

FileStream Кафки:

FileStreamSourceConnector

FileStreamSourceTask

FileStreamSinkConnector

FileStreamSinkTask

Коннектор мойки Neo4j:

package neo4k.sink;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class Neo4jSinkConnector extends SinkConnector {

    public enum Keys {
        ;
        static final String URI = "uri";
        static final String USER = "user";
        static final String PASS = "pass";
        static final String LOG = "log";
    }

    private static final ConfigDef CONFIG_DEF = new ConfigDef()
            .define(Keys.URI, Type.STRING, "", Importance.HIGH, "Neo4j URI")
            .define(Keys.USER, Type.STRING, "", Importance.MEDIUM, "User Auth")
            .define(Keys.PASS, Type.STRING, "", Importance.MEDIUM, "Pass Auth")
            .define(Keys.LOG, Type.STRING, "./neoj4sinkconnecterlog.txt", Importance.LOW, "Log File");

    private String uri;
    private String user;
    private String pass;
    private String logFile;

    @Override
    public String version() {
        return AppInfoParser.getVersion();
    }

    @Override
    public void start(Map<String, String> props) {
        uri = props.get(Keys.URI);
        user = props.get(Keys.USER);
        pass = props.get(Keys.PASS);
        logFile = props.get(Keys.LOG);
    }

    @Override
    public Class<? extends Task> taskClass() {
        return Neo4jSinkTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        ArrayList<Map<String, String>> configs = new ArrayList<>();
        for (int i = 0; i < maxTasks; i++) {
            Map<String, String> config = new HashMap<>();
            if (uri != null)
                config.put(Keys.URI, uri);
            if (user != null)
                config.put(Keys.USER, user);
            if (pass != null)
                config.put(Keys.PASS, pass);
            if (logFile != null)
                config.put(Keys.LOG, logFile);
            configs.add(config);
        }
        return configs;
    }

    @Override
    public void stop() {
    }

    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }
}

Моя задача раковины Neo4j:

package neo4k.sink;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.neo4j.driver.v1.AuthTokens;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.exceptions.Neo4jException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Map;

public class Neo4jSinkTask extends SinkTask {

    private static final Logger log = LoggerFactory.getLogger(Neo4jSinkTask.class);

    private String uri;
    private String user;
    private String pass;
    private String logFile;

    private Driver driver;
    private Session session;

    public Neo4jSinkTask() {
    }

    @Override
    public String version() {
        return new Neo4jSinkConnector().version();
    }

    @Override
    public void start(Map<String, String> props) {
        uri = props.get(Neo4jSinkConnector.Keys.URI);
        user = props.get(Neo4jSinkConnector.Keys.USER);
        pass = props.get(Neo4jSinkConnector.Keys.PASS);
        logFile = props.get(Neo4jSinkConnector.Keys.LOG);

        driver = null;
        session = null;

        try {
            driver = GraphDatabase.driver(uri, AuthTokens.basic(user, pass));
            session = driver.session();
        } catch (Neo4jException ex) {
            log.trace(ex.getMessage(), logFilename());
        }
    }

    @Override
    public void put(Collection<SinkRecord> sinkRecords) {
        StatementResult result;
        for (SinkRecord record : sinkRecords) {
            result = session.run(record.value().toString());
            log.trace(result.toString(), logFilename());
        }
    }

    @Override
    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }

    @Override
    public void stop() {
        if (session != null)
            session.close();
        if (driver != null)
            driver.close();
    }

    private String logFilename() {
        return logFile == null ? "stdout" : logFile;
    }
}

Проблема:

После написания этого я собрал его, включая все зависимости, которые у него были, за исключением любых зависимостей Kafka, в банку (или Uber Jar? Это был один файл). Затем я отредактировал пути к плагинам в connect-standalone.properties, чтобы включить этот артефакт, и написал файл свойств для моего коннектора приемника Neo4j. Я сделал все это, пытаясь следовать этим руководящим принципам.

Мой файл свойств коннектора приемника Neo4j:

name=neo4k-sink

connector.class=neo4k.sink.Neo4jSinkConnector

tasks.max=1

uri=bolt://localhost:7687

user=neo4j

pass=Hunter2

topics=connect-test

Но после запуска автономного я получаю эту ошибку в выводе, которая закрывает поток (ошибка в строке 5):

[2017-08-14 12:59:00,150] INFO Kafka version : 0.11.0.0 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-08-14 12:59:00,150] INFO Kafka commitId : cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-08-14 12:59:00,153] INFO Source task WorkerSourceTask{id=local-file-source-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:143)
[2017-08-14 12:59:00,153] INFO Created connector local-file-source (org.apache.kafka.connect.cli.ConnectStandalone:91)
[2017-08-14 12:59:00,153] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:100)
java.lang.IllegalArgumentException: Malformed \uxxxx encoding.
    at java.util.Properties.loadConvert(Properties.java:574)
    at java.util.Properties.load0(Properties.java:390)
    at java.util.Properties.load(Properties.java:341)
    at org.apache.kafka.common.utils.Utils.loadProps(Utils.java:429)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:84)
[2017-08-14 12:59:00,156] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:65)
[2017-08-14 12:59:00,156] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:154)
[2017-08-14 12:59:00,168] INFO Stopped ServerConnector@540accf4{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)
[2017-08-14 12:59:00,173] INFO Stopped o.e.j.s.ServletContextHandler@6d548d27{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)

Изменить: я должен упомянуть, что во время части загрузки соединителя, когда в выходных данных объявляется, какие плагины были добавлены, я не вижу никаких упоминаний о банке, которую я создал ранее и создал путь для в connect-standalone.properties. Вот фрагмент контекста:

[2017-08-14 12:58:58,969] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-08-14 12:58:58,969] INFO Added plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-08-14 12:58:58,969] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-08-14 12:58:58,969] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-08-14 12:58:58,970] INFO Added plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)

Вывод:

Я в растерянности, я провел тестирование и исследование около пары часов, и я не думаю, что точно знаю, какой вопрос задать. Так что я скажу спасибо за чтение, если вы зашли так далеко. Если вы заметили что-то вопиющее, что я мог сделать неправильно в коде или методе (например, упаковывая банку), или подумайте, что я должен предоставить больше контекстных или консольных журналов или чего-то еще, дайте мне знать. Еще раз спасибо.


person C. Ommen    schedule 14.08.2017    source источник
comment
Kafka Connect не смог прочитать файл свойств, в котором вы определили конфигурацию коннектора, что является вторым аргументом команды запуска connect-standalone. Я бы посоветовал поискать скрытые символы в файле (ни одного не было в содержимом этого сообщения), попытаться без вашего JAR (-ов), присутствующего в пути к классам, и даже попробовать другой пустой файл, чтобы убедиться, что он, по крайней мере, может быть прочитан в правильно, а затем постепенно добавляйте строки по одной, пока не определите все свойства конфигурации.   -  person Randall Hauch    schedule 14.08.2017
comment
Или попробуйте указать файл конфигурации как полный путь в команде запуска.   -  person Randall Hauch    schedule 14.08.2017
comment
Вот и все! Спасибо.   -  person C. Ommen    schedule 15.08.2017


Ответы (1)


Как указал @Randall Hauch, в моем файле свойств были скрытые символы, потому что это был документ с форматированным текстом. Я исправил это, продублировав файл connect-file-sink.properties, поставляемый с Kafka, который, как мне кажется, является обычным текстовым документом. Затем переименовываем и редактируем этот дубликат для моих свойств приемника neo4j.

person C. Ommen    schedule 15.08.2017