Доверие через одноранговую сеть

В первой части этой серии мы сделали единый блокчейн. Теперь мы сделаем набор из них и заставим их разговаривать друг с другом. Настоящая суть блокчейна - это распределенная система проверки. Вы можете добавлять блоки с любых узлов, и в конечном итоге они попадают на одноранговые узлы, так что все согласны с тем, как выглядит блокчейн. Это важно, потому что вам не нужен единственный источник истины в распределенной системе.

Сразу возникает одна проблема: каждый узел - это две службы, плюс MongoDB и шина сообщений Kafka, которые все должны общаться друг с другом. Но я хочу протестировать и продемонстрировать несколько узлов на одном хосте (моем ноутбуке). Я работал с Docker compose, поэтому мне нужно сделать один файл Docker compose для каждого узла, чтобы порты оставались прямыми.

Мы будем работать над сервисом узлов, который позволит узлам работать друг с другом. Это будет получать входные данные из двух мест: спокойный интерфейс, который позволяет добавлять и перечислять подключенные узлы, и шина сообщений, предоставляемая Kafka, которая уведомляет службу узлов об изменениях в локальной цепочке блоков, которые необходимо транслировать на одноранговые узлы.

Чтобы сервисы использовались в качестве изображений, я буду использовать плагин Google Jib maven. Это самый простой способ создать образ из сборки maven. Мы добавляем следующее в pom-файл каждого модуля, для которого нам нужен образ:

<build>
    <plugins>
        <plugin>
            <groupId>com.google.cloud.tools</groupId>
            <artifactId>jib-maven-plugin</artifactId>
            <version>2.7.1</version>
        </plugin>
    </plugins>
</build>

В нашем случае мы можем использовать значения по умолчанию для конфигурации. Затем вы можете запустить mvn clean install jib:build, и он создаст образ, который вы можете использовать в своем файле компоновки Docker.

Давайте возьмем то, что у нас есть на данный момент, и начнем с всеобъемлющего файла составления Docker под названием docker-compose-node1:

version: '3.1'
services:
  mongo:
    image: mongo
    restart: always
    ports:
      - 27017
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: example
  mongo-express:
    image: mongo-express
    restart: always
    ports:
      - 8081:8081
    environment:
      ME_CONFIG_MONGODB_ADMINUSERNAME: root
      ME_CONFIG_MONGODB_ADMINPASSWORD: example
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    ports:
      - 2181
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  kafka:
    image: confluentinc/cp-kafka:latest
    ports:
      - 9092
      - 29092
    links:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  blockchain:
    image: rlkamradt/blockchain:1.0-SNAPSHOT
    ports:
      - 8080:8080
    environment:
      MONGO_HOST: mongo
      SPRING_KAFKA_BOOTSTRAP-SERVERS: kafka:29092

Вы заметите, что кое-что изменилось. Во-первых, я удалил внешний порт из Kafka, чтобы он был доступен только внутри сети compose. Затем я добавил приложение блокчейн с изображением, созданным сборкой. Наконец, я переопределил несколько свойств Spring с помощью переменной окружения, чтобы он имел доступ к mongo и Kafka изнутри сети compose. Compose создаст записи DNS, чтобы службы, запущенные из одного файла compose, могли обращаться друг к другу. Запустите его с помощью этой команды docker compose -f docker-compose-node1.yaml up -d и убедитесь, что вы все еще можете использовать базовый API блокчейна:

Теперь наше приложение запущено и создало генезисный блок.

В службе цепочки блоков уже есть код, который отправляет сообщение в Kafka всякий раз, когда мы добавляем блок или добавляем транзакцию. Нам нужно создать новый сервис, который будет читать эти события и транслировать их списку пиров. Давайте остановимся на том, что у нас запущено на данный момент, и добавим простую службу узлов, которая будет регистрировать сообщение, когда оно будет получено. Нам понадобится новый модуль в проекте - это будет еще одна служба Spring Boot, которая сможет общаться с внешними узлами.

Во-первых, нам нужно определить узел, который является просто URL-адресом. Вот Node:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
@Document
public class Node {
    @Id
    private String id;
    private String url;
}

Мы храним его в коллекции в MongoDB, поэтому нам нужен интерфейс репозитория:

@Repository
public interface NodeRepository extends
                 ReactiveMongoRepository<Node, String> {
    Mono<Node> findByUrl(String url);
}

Затем нам нужен контроллер, чтобы иметь возможность видеть и добавлять Node объектов:

@Slf4j
@RestController
@RequestMapping("/node")
public class SimpleNodeController {
    private final Blockchain blockchain;
    private final SimpleNodeService simpleNodeService;

    public SimpleNodeController(Blockchain blockchain,
                     SimpleNodeService simpleNodeService) {
        this.simpleNodeService = simpleNodeService;
        this.blockchain = blockchain;
    }

    @GetMapping(path = "peers", produces =
                    MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<Node> getNodes() {
        return simpleNodeService.getPeers();
    }

    @PostMapping(path = "peers", produces =
                   MediaType.APPLICATION_JSON_VALUE)
    Mono<Node> addNode(@RequestBody Node node) {
        return simpleNodeService.connectToPeer(node);
    }

}

Затем нам понадобится простой класс обслуживания, который будет взаимодействовать с MongoDB и Kafka:

@Component
@Slf4j
public class SimpleNodeService {
    private static Node myself;
    final private String host;
    final private int port;
    final private NodeRepository peers;
    final private Blockchain blockchain;
    final private ReactiveKafkaConsumerTemplate<String, 
                                      Message> emitter;

    public SimpleNodeService(@Value("${server.host}") String host,
           @Value("${server.port}") String port,
           Blockchain blockchain,
           NodeRepository peers,
           ReactiveKafkaConsumerTemplate<String, Message> emitter) {
        this.host = host;
        this.port = Integer.parseInt(port);
        this.blockchain = blockchain;
        this.emitter = emitter;
        this.peers = peers;
        myself = Node.builder()
                .url("http://" + host + ":" + port)
                .build();
        emitter
           .receiveAutoAck()
           .doOnNext(consumerRecord -> log.info(
               "received key={}, value={} from topic={}, offset={}",
               consumerRecord.key(),
               consumerRecord.value(),
               consumerRecord.topic(),
               consumerRecord.offset())
                )
                .map(ConsumerRecord::value)
                .subscribe(
                    m -> log.info("received message {}", m), 
                    e -> log.error("error receiving Message", e));
    }

    public Flux<Node> getPeers() {
        return peers.findAll();
    }

    public Mono<Node> connectToPeer(Node node) {
        return peers.save(node);
    }
}

Любое сообщение, полученное от Kafka, будет просто зарегистрировано, и мы фактически ничего не будем делать с узлами, кроме сохранения и перечисления их.

Наконец, нам нужно указать Spring Boot, где можно найти общие компоненты и репозитории. Мы можем аннотировать основной класс:

@Slf4j
@SpringBootApplication
@ComponentScan(basePackageClasses = {
        net.kamradtfamily.blockchain.api.Blockchain.class,
        net.kamradtfamily.blockchainnode.Application.class})
@EnableReactiveMongoRepositories(basePackageClasses = {
        net.kamradtfamily.blockchain.api.BlockRepository.class,
        net.kamradtfamily.blockchainnode.NodeRepository.class})
public class Application {
    public static void main(String [] args) {
        SpringApplication.run(Application.class, args);
        try {
            Properties gitProps = new Properties();
            gitProps.load(
                Application
                   .class
                   .getResourceAsStream("/git.properties"));
            log.info("Git Properties:");
            gitProps.entrySet().stream()
                    .forEach(es -> 
                          log.info("{}: {}",
                          es.getKey(),
                          es.getValue()));
        } catch (Exception e) {
            log.error("Error reading Git Properties");
        }
    }
}

Необходимо указать Spring, где искать компоненты и репозитории. Обычно он смотрит в пакет, в котором находится основной класс, но в нашем случае мы хотели поделиться компонентами из net.kamradtfamily.blockchain.api. Поэтому я добавил аннотации ComponentScan и EnableReactiveMongoRepositories. Я также добавил немного протоколирования, чтобы каждый раз, когда он запускался, мы знали, какой хэш коммита Git запускаем.

Чтобы запустить все это, нам нужно переместить некоторые порты. Чтобы иметь новую службу и существующую службу, нам нужно будет предоставить каждому из них уникальные внешние порты. Добавим это к нашему docker-compose-node1.yaml:

blockchainnode:
  image: rlkamradt/blockchainnode:1.0-SNAPSHOT
  ports:
    - 8080:8082
  environment:
    MONGO_HOST: mongo
    SPRING_KAFKA_BOOTSTRAP-SERVERS: kafka:29092

Служба MongoExpress уже использует порт 8081, поэтому мы представим его как 8082. Теперь создайте новые образы, извлеките их и запустите все:

mvn clean install jib:build
docker compose -f docker-compose-node1.yaml pull
docker compose -f docker-compose-node1.yaml up

Затем, когда вы создаете транзакцию с помощью службы блокчейна, вы увидите в журналах службы blockchainnode, что сообщение было получено. Вы также сможете подключаться к конечным точкам http: // localhost: 8082 / node / peers, а также создавать и перечислять одноранговые узлы.

Здесь все усложняется. Нам нужно, чтобы работало более одного узла, и нам нужно, чтобы узлы отвечали на сообщения после добавления транзакции или блока. Нам также нужно, чтобы узлы разговаривали друг с другом во время запуска или добавления узлов. Я собираюсь скопировать SimpleNodeService и SimpleNodeController в NodeService и NodeController. Я собираюсь оставить старые классы на случай, если вы просматриваете код на GitHub и хотите следить за ним, но я собираюсь закомментировать аннотации Component и RestController, чтобы они не запускались во время выполнения.

Я собираюсь добавить дополнительную конечную точку к NodeController, чтобы подтвердить, что транзакция превратилась в блок во всех узлах:

@GetMapping(path = "transactions/:transactionId/confirmations",
                        produces = MediaType.ALL_VALUE)
Mono<String> getTransactionFromNode(
             @RequestParam("transactionId") String transactionId) {
    return nodeService
            .getConfirmations(Long.valueOf(transactionId))
            .map(b -> b.toString());
}

Это означает, что мне нужен новый набор методов в NodeService для получения подтверждений от всех узлов:

public Mono<Block> getConfirmation(Node peer, long transactionId) {
    String URL = peer.getUrl() 
          + "/block/blocks/transactions/" 
          + transactionId;
    log.info("Getting transactions from: {}", URL);
    return client
            .get()
            .uri(URL)
            .retrieve().bodyToMono(Block.class);
            .onErrorContinue((t, o) -> Mono.empty());
}

Mono<Long> getConfirmations(long transactionId) {
// Get count of peers with confirmations that the transaction exists
    return blockchain
          .findTransactionInChain(transactionId, blockchain
               .getAllBlocks())
          .zipWith(peers.findAll()
               .flatMap(peer -> getConfirmation(peer,
                               transactionId)))
          .count();
}

Это вернет количество узлов, у которых есть эта транзакция в блоке. Но сначала мне нужно создать новую конечную точку в BlockController, которая будет сообщать, находится ли транзакция в блоке в цепочке блоков.

@GetMapping(path = "blocks/transaction/{transactionId}", 
                produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<Block> getTransaction(
            @PathVariable("transactionId") Long transactionId) {
    return blockchain
            .findTransactionInChain(transactionId,
                              blockchain.getAllBlocks())
            .last() // assume there's only one
            .switchIfEmpty(Mono.error(new ResponseStatusException(
                    HttpStatus.NOT_FOUND, 
                    "Transaction Not Found in Blockchain")));

К счастью, у нас уже есть метод findTransactionInChain, который вернет блок, в котором обнаружена транзакция.

Далее нам нужно ответить на сообщения от Кафки. Мы добавим messageHandler метод, который будет передавать сообщения всем одноранговым узлам:

public Mono<? extends Object> messageHandler(Message m) {
  if("addedBlock".equals(m.getMessage())) {
    return peers.findAll()
            .flatMap(p -> sendLatestBlock(p, m.getBlock()))
            .switchIfEmpty(Mono.just(m.getBlock()))
            .last();
  } else if("addedTransaction".equals(m.getMessage())) {
    return peers.findAll()
            .flatMap(p -> sendTransaction(p, m.getTransaction()))
            .switchIfEmpty(Mono.just(m.getTransaction()))
            .last();
  } else if("getBlocks".equals(m.getMessage())) {
    return peers.findAll()
            .flatMap(p -> getBlocks(p))
            .switchIfEmpty(blockchain.getLastBlock())
            .last();
  } else {
    log.error("unknown message {}", m);
    return Mono.empty();
  }
}

Для этого требуются два новых метода для выполнения запросов к другим узлам:

public Mono<ClientResponse> sendLatestBlock(Node peer, 
                             Block block) {
    String URL = peer.getUrl() + "/block/blocks/latest";
    log.info("Posting latest block to: {}", URL);
    return client
            .put()
            .uri(URL)
            .body(block, Block.class)
            .exchange();
}

public Mono<ClientResponse> sendTransaction(Node peer, 
                             Transaction transaction) {
    String URL = peer.getUrl() + "/transaction";
    log.info("Sending transaction '{}' to: {}", transaction, URL);
    return client
            .post()
            .uri(URL)
            .body(transaction, Transaction.class)
            .exchange();
}

У нас уже есть POST для конечной точки /transaction, но нам нужно добавить PUT в конечную точку /block/blocks/latest.

@PutMapping(path = "/block/blocks/latest", 
             produces =  MediaType.APPLICATION_JSON_VALUE)
public Mono<Block> checkReceivedBlock(
                    @RequestBody Block receivedBlock) {
    return blockchain.checkReceivedBlock(receivedBlock);
}

Для этого требуется новый метод в службе Blockchain.

public Mono<Block> checkReceivedBlock(Block receivedBlock) {
  return getLastBlock()
      .filter(b -> b.getIndex() < receivedBlock.getIndex())
      .flatMap(b -> {
         log.info(
            "Blockchain possibly behind. We got: {}, Peer got: {}",
            b.getIndex(),
            receivedBlock.getIndex());
         if (b.getHash().equals(receivedBlock.getPreviousHash())) { 
            log.info("Appending received block to our chain");
            return addBlock(receivedBlock, true);
         } else {
            log.info("Querying chain from our peers");
            emitter.send(TOPIC, Message.builder()
               .type("empty")
               .message("getBlocks")
               .build())
               .subscribe();
            return Mono.empty();
          }
        });
}

Вы можете увидеть, как уровень узлов общается с другими узлами через общедоступный API. Но есть одна проблема. Каждый узел представлен одним URL-адресом, но мы должны поговорить с двумя отдельными службами: службой блоков и службой узлов. Я собираюсь сделать простой вход с экземпляром Nginx. Таким образом, мы можем общаться с обеими службами (а позже и с другими) с помощью одного URL. Вы можете посмотреть код на GitHub, чтобы получить подробную информацию о конфигурации Nginx и добавлении всех сервисов в docker-compose-node1.yaml.

При запуске все конечные точки по-прежнему работают, и я вижу в журналах связь между службой цепочки блоков и службой узлов на шине Kafka. Пришло время сделать второй узел. Скопируйте docker-compose-node1.yaml на docker-compose-node2.yaml и переключите внешний порт службы Nginx с 8080 на 8081, чтобы достичь узла 1 на порту 8080 и узла 2 на порту 8081. Я также собираюсь создать небольшой скрипт с именем startnode1 для запуска каждой службы. по порядку и вывести логи из службы узла:

docker compose -p node1 -f docker-compose-node1.yaml up -d mongo
docker compose -p node1 -f docker-compose-node1.yaml up -d zookeeper
docker compose -p node1 -f docker-compose-node1.yaml up -d mongo-express
docker compose -p node1 -f docker-compose-node1.yaml up -d kafka
docker compose -p node1 -f docker-compose-node1.yaml up -d blockchain
docker compose -p node1 -f docker-compose-node1.yaml up blockchainnode

Поскольку в последней строке нет флага -d, она отображает журналы, пока Ctrl-C не остановит его. Я использую флаг -p node1, чтобы создавать отдельные экземпляры служб. Затем скопируйте файл с именем startnode2, но замените файл компоновки Docker, чтобы запустить узел 2, а флаг -p - на узел2. Не забудьте установить флаг исполняемого файла для каждого:

chmod +x startnode1
chmod +x startnode2

Есть одно последнее изменение. Я, член службы узла, должен иметь URL, видимый другими службами, поэтому использование локального хоста не подходит. Я установил свойство Spring в application.properties:

server.myself: http://localhost:8080

Затем я переопределяю его в docker-compose-node1.yaml, который теперь выглядит так:

version: '3.1'
services:
  mongo:
    image: mongo
    restart: always
    ports:
      - 27017
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: example
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    ports:
      - 2181
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  kafka:
    image: confluentinc/cp-kafka:latest
    ports:
      - 9092
      - 29092
    links:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  blockchain:
    image: rlkamradt/blockchain:1.0-SNAPSHOT
    ports:
      - 8080
    environment:
      SERVER_MYSELF: http://192.168.0.174:8080
      MONGO_HOST: mongo
      SPRING_KAFKA_BOOTSTRAP-SERVERS: kafka:29092
  blockchainnode:
    image: rlkamradt/blockchainnode:1.0-SNAPSHOT
    ports:
      - 8080
    environment:
      SERVER_MYSELF: http://192.168.0.174:8080
      MONGO_HOST: mongo
      SPRING_KAFKA_BOOTSTRAP-SERVERS: kafka:29092
  nginx:
    image: nginx:latest
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    ports:
      - 8080:80

docker-compose-node2.yaml также изменяет порт на 8081 для значения SERVER_MYSELF и nginx.ports.

Запустите оба экземпляра. Когда они оба работают, вы можете подключиться друг к другу:

Теперь вы можете создавать транзакции и майнить блоки, как показано в предыдущей статье, но вы можете перечислить транзакции и блоки в обоих узлах. Одноранговый протокол гарантирует, что оба узла имеют одинаковые данные.

Я не скажу, что на данный момент это идеально. Существует множество различных последовательностей, которые необходимо протестировать, чтобы убедиться, что независимо от того, как все делается, цепочка блоков в разных экземплярах остается неизменной. Но эта статья уже достаточно длинная, и я уверен, что вы не хотите читать обо мне, отлаживаю эту сеть!

Спасибо, что прочитали эту довольно длинную статью, я постарался сделать ее как можно более сжатой, но это очень сложная тема.

Думаю, следующая статья из этой серии будет немного проще. Мы обсудим финальную часть головоломки: майнеры и пользователи.

Весь код для этой статьи можно найти здесь:



Предыдущая статья из этой серии: