Введение:
Позвольте мне начать с извинений за неопределенность в моем вопросе. Я постараюсь предоставить как можно больше информации по этой теме (надеюсь, не слишком много), и, пожалуйста, дайте мне знать, если я должен предоставить больше. Кроме того, я новичок в Kafka и, вероятно, наткнусь на терминологию.
Итак, исходя из моего понимания того, как работают приемник и источник, я могу использовать FileStreamSourceConnector, предоставленный руководством Kafka Quickstart, для записи данных (команд Neo4j) в тему, хранящуюся в кластере Kafka. Затем я могу написать свой собственный коннектор приемника Neo4j и задачу для чтения этих команд и отправки их на один или несколько серверов Neo4j. Чтобы сделать проект максимально простым, на данный момент я основал коннектор и задачу приемника на основе FileStreamSinkConnector и FileStreamSinkTask из руководства Kafka Quickstart.
FileStream Кафки:
Коннектор мойки Neo4j:
package neo4k.sink;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class Neo4jSinkConnector extends SinkConnector {
public enum Keys {
;
static final String URI = "uri";
static final String USER = "user";
static final String PASS = "pass";
static final String LOG = "log";
}
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(Keys.URI, Type.STRING, "", Importance.HIGH, "Neo4j URI")
.define(Keys.USER, Type.STRING, "", Importance.MEDIUM, "User Auth")
.define(Keys.PASS, Type.STRING, "", Importance.MEDIUM, "Pass Auth")
.define(Keys.LOG, Type.STRING, "./neoj4sinkconnecterlog.txt", Importance.LOW, "Log File");
private String uri;
private String user;
private String pass;
private String logFile;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void start(Map<String, String> props) {
uri = props.get(Keys.URI);
user = props.get(Keys.USER);
pass = props.get(Keys.PASS);
logFile = props.get(Keys.LOG);
}
@Override
public Class<? extends Task> taskClass() {
return Neo4jSinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> config = new HashMap<>();
if (uri != null)
config.put(Keys.URI, uri);
if (user != null)
config.put(Keys.USER, user);
if (pass != null)
config.put(Keys.PASS, pass);
if (logFile != null)
config.put(Keys.LOG, logFile);
configs.add(config);
}
return configs;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
}
Моя задача раковины Neo4j:
package neo4k.sink;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.neo4j.driver.v1.AuthTokens;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.exceptions.Neo4jException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Map;
public class Neo4jSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(Neo4jSinkTask.class);
private String uri;
private String user;
private String pass;
private String logFile;
private Driver driver;
private Session session;
public Neo4jSinkTask() {
}
@Override
public String version() {
return new Neo4jSinkConnector().version();
}
@Override
public void start(Map<String, String> props) {
uri = props.get(Neo4jSinkConnector.Keys.URI);
user = props.get(Neo4jSinkConnector.Keys.USER);
pass = props.get(Neo4jSinkConnector.Keys.PASS);
logFile = props.get(Neo4jSinkConnector.Keys.LOG);
driver = null;
session = null;
try {
driver = GraphDatabase.driver(uri, AuthTokens.basic(user, pass));
session = driver.session();
} catch (Neo4jException ex) {
log.trace(ex.getMessage(), logFilename());
}
}
@Override
public void put(Collection<SinkRecord> sinkRecords) {
StatementResult result;
for (SinkRecord record : sinkRecords) {
result = session.run(record.value().toString());
log.trace(result.toString(), logFilename());
}
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void stop() {
if (session != null)
session.close();
if (driver != null)
driver.close();
}
private String logFilename() {
return logFile == null ? "stdout" : logFile;
}
}
Проблема:
После написания этого я собрал его, включая все зависимости, которые у него были, за исключением любых зависимостей Kafka, в банку (или Uber Jar? Это был один файл). Затем я отредактировал пути к плагинам в connect-standalone.properties, чтобы включить этот артефакт, и написал файл свойств для моего коннектора приемника Neo4j. Я сделал все это, пытаясь следовать этим руководящим принципам.
Мой файл свойств коннектора приемника Neo4j:
name=neo4k-sink
connector.class=neo4k.sink.Neo4jSinkConnector
tasks.max=1
uri=bolt://localhost:7687
user=neo4j
pass=Hunter2
topics=connect-test
Но после запуска автономного я получаю эту ошибку в выводе, которая закрывает поток (ошибка в строке 5):
[2017-08-14 12:59:00,150] INFO Kafka version : 0.11.0.0 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-08-14 12:59:00,150] INFO Kafka commitId : cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-08-14 12:59:00,153] INFO Source task WorkerSourceTask{id=local-file-source-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:143)
[2017-08-14 12:59:00,153] INFO Created connector local-file-source (org.apache.kafka.connect.cli.ConnectStandalone:91)
[2017-08-14 12:59:00,153] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:100)
java.lang.IllegalArgumentException: Malformed \uxxxx encoding.
at java.util.Properties.loadConvert(Properties.java:574)
at java.util.Properties.load0(Properties.java:390)
at java.util.Properties.load(Properties.java:341)
at org.apache.kafka.common.utils.Utils.loadProps(Utils.java:429)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:84)
[2017-08-14 12:59:00,156] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:65)
[2017-08-14 12:59:00,156] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:154)
[2017-08-14 12:59:00,168] INFO Stopped ServerConnector@540accf4{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)
[2017-08-14 12:59:00,173] INFO Stopped o.e.j.s.ServletContextHandler@6d548d27{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)
Изменить: я должен упомянуть, что во время части загрузки соединителя, когда в выходных данных объявляется, какие плагины были добавлены, я не вижу никаких упоминаний о банке, которую я создал ранее и создал путь для в connect-standalone.properties. Вот фрагмент контекста:
[2017-08-14 12:58:58,969] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-08-14 12:58:58,969] INFO Added plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-08-14 12:58:58,969] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-08-14 12:58:58,969] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-08-14 12:58:58,970] INFO Added plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
Вывод:
Я в растерянности, я провел тестирование и исследование около пары часов, и я не думаю, что точно знаю, какой вопрос задать. Так что я скажу спасибо за чтение, если вы зашли так далеко. Если вы заметили что-то вопиющее, что я мог сделать неправильно в коде или методе (например, упаковывая банку), или подумайте, что я должен предоставить больше контекстных или консольных журналов или чего-то еще, дайте мне знать. Еще раз спасибо.
connect-standalone
. Я бы посоветовал поискать скрытые символы в файле (ни одного не было в содержимом этого сообщения), попытаться без вашего JAR (-ов), присутствующего в пути к классам, и даже попробовать другой пустой файл, чтобы убедиться, что он, по крайней мере, может быть прочитан в правильно, а затем постепенно добавляйте строки по одной, пока не определите все свойства конфигурации. - person Randall Hauch   schedule 14.08.2017