Доверие через одноранговую сеть
В первой части этой серии мы сделали единый блокчейн. Теперь мы сделаем набор из них и заставим их разговаривать друг с другом. Настоящая суть блокчейна - это распределенная система проверки. Вы можете добавлять блоки с любых узлов, и в конечном итоге они попадают на одноранговые узлы, так что все согласны с тем, как выглядит блокчейн. Это важно, потому что вам не нужен единственный источник истины в распределенной системе.
Сразу возникает одна проблема: каждый узел - это две службы, плюс MongoDB и шина сообщений Kafka, которые все должны общаться друг с другом. Но я хочу протестировать и продемонстрировать несколько узлов на одном хосте (моем ноутбуке). Я работал с Docker compose, поэтому мне нужно сделать один файл Docker compose для каждого узла, чтобы порты оставались прямыми.
Мы будем работать над сервисом узлов, который позволит узлам работать друг с другом. Это будет получать входные данные из двух мест: спокойный интерфейс, который позволяет добавлять и перечислять подключенные узлы, и шина сообщений, предоставляемая Kafka, которая уведомляет службу узлов об изменениях в локальной цепочке блоков, которые необходимо транслировать на одноранговые узлы.
Чтобы сервисы использовались в качестве изображений, я буду использовать плагин Google Jib maven. Это самый простой способ создать образ из сборки maven. Мы добавляем следующее в pom-файл каждого модуля, для которого нам нужен образ:
<build> <plugins> <plugin> <groupId>com.google.cloud.tools</groupId> <artifactId>jib-maven-plugin</artifactId> <version>2.7.1</version> </plugin> </plugins> </build>
В нашем случае мы можем использовать значения по умолчанию для конфигурации. Затем вы можете запустить mvn clean install jib:build
, и он создаст образ, который вы можете использовать в своем файле компоновки Docker.
Давайте возьмем то, что у нас есть на данный момент, и начнем с всеобъемлющего файла составления Docker под названием docker-compose-node1
:
version: '3.1' services: mongo: image: mongo restart: always ports: - 27017 environment: MONGO_INITDB_ROOT_USERNAME: root MONGO_INITDB_ROOT_PASSWORD: example mongo-express: image: mongo-express restart: always ports: - 8081:8081 environment: ME_CONFIG_MONGODB_ADMINUSERNAME: root ME_CONFIG_MONGODB_ADMINPASSWORD: example zookeeper: image: confluentinc/cp-zookeeper:latest ports: - 2181 environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-kafka:latest ports: - 9092 - 29092 links: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 blockchain: image: rlkamradt/blockchain:1.0-SNAPSHOT ports: - 8080:8080 environment: MONGO_HOST: mongo SPRING_KAFKA_BOOTSTRAP-SERVERS: kafka:29092
Вы заметите, что кое-что изменилось. Во-первых, я удалил внешний порт из Kafka, чтобы он был доступен только внутри сети compose. Затем я добавил приложение блокчейн с изображением, созданным сборкой. Наконец, я переопределил несколько свойств Spring с помощью переменной окружения, чтобы он имел доступ к mongo и Kafka изнутри сети compose. Compose создаст записи DNS, чтобы службы, запущенные из одного файла compose, могли обращаться друг к другу. Запустите его с помощью этой команды docker compose -f docker-compose-node1.yaml up -d
и убедитесь, что вы все еще можете использовать базовый API блокчейна:
Теперь наше приложение запущено и создало генезисный блок.
В службе цепочки блоков уже есть код, который отправляет сообщение в Kafka всякий раз, когда мы добавляем блок или добавляем транзакцию. Нам нужно создать новый сервис, который будет читать эти события и транслировать их списку пиров. Давайте остановимся на том, что у нас запущено на данный момент, и добавим простую службу узлов, которая будет регистрировать сообщение, когда оно будет получено. Нам понадобится новый модуль в проекте - это будет еще одна служба Spring Boot, которая сможет общаться с внешними узлами.
Во-первых, нам нужно определить узел, который является просто URL-адресом. Вот Node
:
@Data @Builder @AllArgsConstructor @NoArgsConstructor @JsonIgnoreProperties(ignoreUnknown = true) @Document public class Node { @Id private String id; private String url; }
Мы храним его в коллекции в MongoDB, поэтому нам нужен интерфейс репозитория:
@Repository public interface NodeRepository extends ReactiveMongoRepository<Node, String> { Mono<Node> findByUrl(String url); }
Затем нам нужен контроллер, чтобы иметь возможность видеть и добавлять Node
объектов:
@Slf4j @RestController @RequestMapping("/node") public class SimpleNodeController { private final Blockchain blockchain; private final SimpleNodeService simpleNodeService; public SimpleNodeController(Blockchain blockchain, SimpleNodeService simpleNodeService) { this.simpleNodeService = simpleNodeService; this.blockchain = blockchain; } @GetMapping(path = "peers", produces = MediaType.TEXT_EVENT_STREAM_VALUE) Flux<Node> getNodes() { return simpleNodeService.getPeers(); } @PostMapping(path = "peers", produces = MediaType.APPLICATION_JSON_VALUE) Mono<Node> addNode(@RequestBody Node node) { return simpleNodeService.connectToPeer(node); } }
Затем нам понадобится простой класс обслуживания, который будет взаимодействовать с MongoDB и Kafka:
@Component @Slf4j public class SimpleNodeService { private static Node myself; final private String host; final private int port; final private NodeRepository peers; final private Blockchain blockchain; final private ReactiveKafkaConsumerTemplate<String, Message> emitter; public SimpleNodeService(@Value("${server.host}") String host, @Value("${server.port}") String port, Blockchain blockchain, NodeRepository peers, ReactiveKafkaConsumerTemplate<String, Message> emitter) { this.host = host; this.port = Integer.parseInt(port); this.blockchain = blockchain; this.emitter = emitter; this.peers = peers; myself = Node.builder() .url("http://" + host + ":" + port) .build(); emitter .receiveAutoAck() .doOnNext(consumerRecord -> log.info( "received key={}, value={} from topic={}, offset={}", consumerRecord.key(), consumerRecord.value(), consumerRecord.topic(), consumerRecord.offset()) ) .map(ConsumerRecord::value) .subscribe( m -> log.info("received message {}", m), e -> log.error("error receiving Message", e)); } public Flux<Node> getPeers() { return peers.findAll(); } public Mono<Node> connectToPeer(Node node) { return peers.save(node); } }
Любое сообщение, полученное от Kafka, будет просто зарегистрировано, и мы фактически ничего не будем делать с узлами, кроме сохранения и перечисления их.
Наконец, нам нужно указать Spring Boot, где можно найти общие компоненты и репозитории. Мы можем аннотировать основной класс:
@Slf4j @SpringBootApplication @ComponentScan(basePackageClasses = { net.kamradtfamily.blockchain.api.Blockchain.class, net.kamradtfamily.blockchainnode.Application.class}) @EnableReactiveMongoRepositories(basePackageClasses = { net.kamradtfamily.blockchain.api.BlockRepository.class, net.kamradtfamily.blockchainnode.NodeRepository.class}) public class Application { public static void main(String [] args) { SpringApplication.run(Application.class, args); try { Properties gitProps = new Properties(); gitProps.load( Application .class .getResourceAsStream("/git.properties")); log.info("Git Properties:"); gitProps.entrySet().stream() .forEach(es -> log.info("{}: {}", es.getKey(), es.getValue())); } catch (Exception e) { log.error("Error reading Git Properties"); } } }
Необходимо указать Spring, где искать компоненты и репозитории. Обычно он смотрит в пакет, в котором находится основной класс, но в нашем случае мы хотели поделиться компонентами из net.kamradtfamily.blockchain.api
. Поэтому я добавил аннотации ComponentScan
и EnableReactiveMongoRepositories
. Я также добавил немного протоколирования, чтобы каждый раз, когда он запускался, мы знали, какой хэш коммита Git запускаем.
Чтобы запустить все это, нам нужно переместить некоторые порты. Чтобы иметь новую службу и существующую службу, нам нужно будет предоставить каждому из них уникальные внешние порты. Добавим это к нашему docker-compose-node1.yaml
:
blockchainnode: image: rlkamradt/blockchainnode:1.0-SNAPSHOT ports: - 8080:8082 environment: MONGO_HOST: mongo SPRING_KAFKA_BOOTSTRAP-SERVERS: kafka:29092
Служба MongoExpress уже использует порт 8081, поэтому мы представим его как 8082. Теперь создайте новые образы, извлеките их и запустите все:
mvn clean install jib:build docker compose -f docker-compose-node1.yaml pull docker compose -f docker-compose-node1.yaml up
Затем, когда вы создаете транзакцию с помощью службы блокчейна, вы увидите в журналах службы blockchainnode
, что сообщение было получено. Вы также сможете подключаться к конечным точкам http: // localhost: 8082 / node / peers, а также создавать и перечислять одноранговые узлы.
Здесь все усложняется. Нам нужно, чтобы работало более одного узла, и нам нужно, чтобы узлы отвечали на сообщения после добавления транзакции или блока. Нам также нужно, чтобы узлы разговаривали друг с другом во время запуска или добавления узлов. Я собираюсь скопировать SimpleNodeService
и SimpleNodeController
в NodeService
и NodeController
. Я собираюсь оставить старые классы на случай, если вы просматриваете код на GitHub и хотите следить за ним, но я собираюсь закомментировать аннотации Component
и RestController
, чтобы они не запускались во время выполнения.
Я собираюсь добавить дополнительную конечную точку к NodeController
, чтобы подтвердить, что транзакция превратилась в блок во всех узлах:
@GetMapping(path = "transactions/:transactionId/confirmations", produces = MediaType.ALL_VALUE) Mono<String> getTransactionFromNode( @RequestParam("transactionId") String transactionId) { return nodeService .getConfirmations(Long.valueOf(transactionId)) .map(b -> b.toString()); }
Это означает, что мне нужен новый набор методов в NodeService
для получения подтверждений от всех узлов:
public Mono<Block> getConfirmation(Node peer, long transactionId) { String URL = peer.getUrl() + "/block/blocks/transactions/" + transactionId; log.info("Getting transactions from: {}", URL); return client .get() .uri(URL) .retrieve().bodyToMono(Block.class); .onErrorContinue((t, o) -> Mono.empty()); } Mono<Long> getConfirmations(long transactionId) { // Get count of peers with confirmations that the transaction exists return blockchain .findTransactionInChain(transactionId, blockchain .getAllBlocks()) .zipWith(peers.findAll() .flatMap(peer -> getConfirmation(peer, transactionId))) .count(); }
Это вернет количество узлов, у которых есть эта транзакция в блоке. Но сначала мне нужно создать новую конечную точку в BlockController
, которая будет сообщать, находится ли транзакция в блоке в цепочке блоков.
@GetMapping(path = "blocks/transaction/{transactionId}", produces = MediaType.APPLICATION_JSON_VALUE) public Mono<Block> getTransaction( @PathVariable("transactionId") Long transactionId) { return blockchain .findTransactionInChain(transactionId, blockchain.getAllBlocks()) .last() // assume there's only one .switchIfEmpty(Mono.error(new ResponseStatusException( HttpStatus.NOT_FOUND, "Transaction Not Found in Blockchain")));
К счастью, у нас уже есть метод findTransactionInChain
, который вернет блок, в котором обнаружена транзакция.
Далее нам нужно ответить на сообщения от Кафки. Мы добавим messageHandler
метод, который будет передавать сообщения всем одноранговым узлам:
public Mono<? extends Object> messageHandler(Message m) { if("addedBlock".equals(m.getMessage())) { return peers.findAll() .flatMap(p -> sendLatestBlock(p, m.getBlock())) .switchIfEmpty(Mono.just(m.getBlock())) .last(); } else if("addedTransaction".equals(m.getMessage())) { return peers.findAll() .flatMap(p -> sendTransaction(p, m.getTransaction())) .switchIfEmpty(Mono.just(m.getTransaction())) .last(); } else if("getBlocks".equals(m.getMessage())) { return peers.findAll() .flatMap(p -> getBlocks(p)) .switchIfEmpty(blockchain.getLastBlock()) .last(); } else { log.error("unknown message {}", m); return Mono.empty(); } }
Для этого требуются два новых метода для выполнения запросов к другим узлам:
public Mono<ClientResponse> sendLatestBlock(Node peer, Block block) { String URL = peer.getUrl() + "/block/blocks/latest"; log.info("Posting latest block to: {}", URL); return client .put() .uri(URL) .body(block, Block.class) .exchange(); } public Mono<ClientResponse> sendTransaction(Node peer, Transaction transaction) { String URL = peer.getUrl() + "/transaction"; log.info("Sending transaction '{}' to: {}", transaction, URL); return client .post() .uri(URL) .body(transaction, Transaction.class) .exchange(); }
У нас уже есть POST для конечной точки /transaction
, но нам нужно добавить PUT в конечную точку /block/blocks/latest
.
@PutMapping(path = "/block/blocks/latest", produces = MediaType.APPLICATION_JSON_VALUE) public Mono<Block> checkReceivedBlock( @RequestBody Block receivedBlock) { return blockchain.checkReceivedBlock(receivedBlock); }
Для этого требуется новый метод в службе Blockchain
.
public Mono<Block> checkReceivedBlock(Block receivedBlock) { return getLastBlock() .filter(b -> b.getIndex() < receivedBlock.getIndex()) .flatMap(b -> { log.info( "Blockchain possibly behind. We got: {}, Peer got: {}", b.getIndex(), receivedBlock.getIndex()); if (b.getHash().equals(receivedBlock.getPreviousHash())) { log.info("Appending received block to our chain"); return addBlock(receivedBlock, true); } else { log.info("Querying chain from our peers"); emitter.send(TOPIC, Message.builder() .type("empty") .message("getBlocks") .build()) .subscribe(); return Mono.empty(); } }); }
Вы можете увидеть, как уровень узлов общается с другими узлами через общедоступный API. Но есть одна проблема. Каждый узел представлен одним URL-адресом, но мы должны поговорить с двумя отдельными службами: службой блоков и службой узлов. Я собираюсь сделать простой вход с экземпляром Nginx. Таким образом, мы можем общаться с обеими службами (а позже и с другими) с помощью одного URL. Вы можете посмотреть код на GitHub, чтобы получить подробную информацию о конфигурации Nginx и добавлении всех сервисов в docker-compose-node1.yaml
.
При запуске все конечные точки по-прежнему работают, и я вижу в журналах связь между службой цепочки блоков и службой узлов на шине Kafka. Пришло время сделать второй узел. Скопируйте docker-compose-node1.yaml
на docker-compose-node2.yaml
и переключите внешний порт службы Nginx с 8080 на 8081, чтобы достичь узла 1 на порту 8080 и узла 2 на порту 8081. Я также собираюсь создать небольшой скрипт с именем startnode1
для запуска каждой службы. по порядку и вывести логи из службы узла:
docker compose -p node1 -f docker-compose-node1.yaml up -d mongo docker compose -p node1 -f docker-compose-node1.yaml up -d zookeeper docker compose -p node1 -f docker-compose-node1.yaml up -d mongo-express docker compose -p node1 -f docker-compose-node1.yaml up -d kafka docker compose -p node1 -f docker-compose-node1.yaml up -d blockchain docker compose -p node1 -f docker-compose-node1.yaml up blockchainnode
Поскольку в последней строке нет флага -d
, она отображает журналы, пока Ctrl-C не остановит его. Я использую флаг -p node1
, чтобы создавать отдельные экземпляры служб. Затем скопируйте файл с именем startnode2
, но замените файл компоновки Docker, чтобы запустить узел 2, а флаг -p
- на узел2. Не забудьте установить флаг исполняемого файла для каждого:
chmod +x startnode1 chmod +x startnode2
Есть одно последнее изменение. Я, член службы узла, должен иметь URL, видимый другими службами, поэтому использование локального хоста не подходит. Я установил свойство Spring в application.properties:
server.myself: http://localhost:8080
Затем я переопределяю его в docker-compose-node1.yaml, который теперь выглядит так:
version: '3.1' services: mongo: image: mongo restart: always ports: - 27017 environment: MONGO_INITDB_ROOT_USERNAME: root MONGO_INITDB_ROOT_PASSWORD: example zookeeper: image: confluentinc/cp-zookeeper:latest ports: - 2181 environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-kafka:latest ports: - 9092 - 29092 links: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 blockchain: image: rlkamradt/blockchain:1.0-SNAPSHOT ports: - 8080 environment: SERVER_MYSELF: http://192.168.0.174:8080 MONGO_HOST: mongo SPRING_KAFKA_BOOTSTRAP-SERVERS: kafka:29092 blockchainnode: image: rlkamradt/blockchainnode:1.0-SNAPSHOT ports: - 8080 environment: SERVER_MYSELF: http://192.168.0.174:8080 MONGO_HOST: mongo SPRING_KAFKA_BOOTSTRAP-SERVERS: kafka:29092 nginx: image: nginx:latest volumes: - ./nginx.conf:/etc/nginx/nginx.conf ports: - 8080:80
docker-compose-node2.yaml
также изменяет порт на 8081 для значения SERVER_MYSELF
и nginx.ports
.
Запустите оба экземпляра. Когда они оба работают, вы можете подключиться друг к другу:
Теперь вы можете создавать транзакции и майнить блоки, как показано в предыдущей статье, но вы можете перечислить транзакции и блоки в обоих узлах. Одноранговый протокол гарантирует, что оба узла имеют одинаковые данные.
Я не скажу, что на данный момент это идеально. Существует множество различных последовательностей, которые необходимо протестировать, чтобы убедиться, что независимо от того, как все делается, цепочка блоков в разных экземплярах остается неизменной. Но эта статья уже достаточно длинная, и я уверен, что вы не хотите читать обо мне, отлаживаю эту сеть!
Спасибо, что прочитали эту довольно длинную статью, я постарался сделать ее как можно более сжатой, но это очень сложная тема.
Думаю, следующая статья из этой серии будет немного проще. Мы обсудим финальную часть головоломки: майнеры и пользователи.
Весь код для этой статьи можно найти здесь:
Предыдущая статья из этой серии: