В контексте микросервисов оркестратор — это мозг системы, который распределяет задачу между каждым микросервисом по некоторому правилу. Когда должна произойти транзакция, оркестратор должен определить рабочий процесс для микрослужб. В этой статье мы создадим простой оркестратор на Rust.

Что мы создаем?

Здесь у нас есть ряд микросервисов, которые могут быть на разных компьютерах (каждый с другой базой данных, разными данными и разными технологиями), которые позволят клиентам выполнять запросы по определенному параметру, а затем возвращать клиенту соответствующую запрошенную информацию.

Каждая микрослужба имеет n портов, открытых для запросов, эти порты будут исследоваться с использованием алгоритма планирования циклического перебора для равномерного распределения вызовов между портами.

Цель также состоит в том, чтобы сделать оркестратор как можно более параметрическим, чтобы при необходимости можно было добавлять дополнительные микросервисы.

Процесс строительства

Первые шаги

Во-первых, нам нужно выяснить, как мы можем удаленно включить микросервисы, чтобы позволить оркестратору активировать порты, если они не работают. Эта возможность известна как отказоустойчивость и может быть реализована в виде отдельной программы. У нас есть два варианта: SSH-подключение или создание HTTP-сервиса, который позволяет активировать микросервисы по запросу. Для упрощения настройки, особенно в клиентских операционных системах, таких как Windows, я выберу последний вариант. С этого момента я буду называть эту систему воспламенителем, потому что это звучит броско, не так ли?

Я буду использовать Rust с крейтом Rocket для создания службы HTTP, которая позволит мне вызывать конечную точку /‹port›, которая включит микрослужбу на этом конкретном порту. Таким образом, мы будем использовать файл YAML для настройки воспламенителя, например:

start_command: "START COMMAND WITH THE ARG OF PORT AT THE END"
path: "PATH TO THE APPLICATION"

Нам потребуются следующие зависимости в Cargo.toml (я объясню их использование позже)

[dependencies]
serde = {version="1.0.163", features=["derive"]}
serde_yaml = "0.9.21"
rocket = "=0.5.0-rc.3"
lazy_static = "1.4.0"

Теперь, когда у нас есть зависимости, мы создадим структуру Config, которая будет содержать все конфигурации, сохраненные в файле YAML, как таковые:

#[derive(Debug, Deserialize, Serialize)]
struct Config {
    start_command: String,
    path: String,
}

/** Reads YAML config file from a given path and returns the config as struct*/
fn read_config(path: &str) -> Config {
    // Open the file
    let mut file = File::open(path).expect("Failed to open file"); 
    // Read file contents as a String 
    let mut contents = String::new();
    file.read_to_string(&mut contents)
        .expect("Failed to read file");
    // Use serde_yaml to parse the String into a Config Struct
    let config_result: Result<Config, serde_yaml::Error> = serde_yaml::from_str(&contents);
    
    // If the config struct couldn't be parsed panic, else declare the config
    let config: Config = match config_result {
        Ok(config) => config,
        Err(err) => panic!("Failed to parse YAML: {}", err),
    };

    // return the parsed config 
    config
}

// Declares a global variable for the config
lazy_static! {
    static ref CONFIG: Config = read_config("config.yaml");
}

Приведенный выше код делает следующее:

  • Определяет структуру Config с полями конфигурации воспламенителя.
  • Создайте функцию, которая, учитывая путь к файлу конфигурации, анализирует его, используя serde_yaml, в нашу структуру Config.
  • Объявляет глобальную переменную CONFIG, в которой хранится конфигурация воспламенителя.

После этого мы должны настроить Rocket, чтобы наш код был веб-службой, а также определить конечную точку /‹port›. Я также изменил порт воспламенителя по умолчанию на порт 4444, используя $Env:ROCKET_PORT=4444 в терминале.

// This endpoint turns the application on in the port passed in the request
#[get("/<port>")]
fn index(port: i32) -> String {
    // the function that instantiates the application in the port
    run_server(port)
}

// Default Rocket launch
#[launch]
fn rocket() -> _ {
    rocket::build().mount("/", routes![index])
}

Приведенный выше код позволяет или код быть веб-службой Rust, но нам нужно закодировать, как на самом деле включить приложение.

/** Starts the server given a port, the command and path of a given application */
fn start_server( port: i32) -> Result<(), Box<dyn Error>> {
    // creates a new cmd instance
    let mut cmd = Command::new("cmd"); 
    // tells the cmd to run the command in a shell and close the shell 
    // when done
    cmd.arg("/C") 
        .arg(format!("cd {}", CONFIG.path)) // change directory to the specified path
        .arg(format!("& {} {}", CONFIG.start_command, port)) // run start command with port
        .stdout(Stdio::piped()) // capture the stdout of the program
        .stderr(Stdio::piped()); // capture the stderr of the program

    // Start the command and get a handle to its standard output and error streams
    let mut child = cmd.spawn().expect("failed to start");

    // Wait for 1 seg
    thread::sleep(Duration::from_millis(1000));

    // Try to wait for the child process to end
    match child.try_wait() {
        //If program exited means it probably failed.
        Ok(Some(status)) => {
            return Err(Box::new(IoError::new(
                ErrorKind::Other,
                format!("process exited with status code: {}", status),
            )));
        }
        // If it has not ended then asume it starts
        Ok(_) => {
            println!("SERVER SUCCESFULLY STARTED AT PORT {}", port);
            return Ok(());
        }
        // If any other error happens return it
        Err(e) => {
            return Err(Box::new(IoError::new(ErrorKind::Other, format!("{}", e))));
        }
    }
}

Я знаю, что это много кода, это может быть не самая чистая реализация, учитывая, что она предполагает, что если процесс не завершается, он выполняется, но работает. Опишем шаги, которые есть в коде:

  • Создает экземпляр cmd, переходит в каталог приложения и выполняет команду запуска.
  • Создает экземпляр cmd для фактического запуска в системе.
  • Ожидает одну секунду, чтобы позволить процессу начаться.
  • Если к этому времени процесс не завершился, значит, он запущен (надеюсь).

Этот код, который можно увидеть в репозитории Github, позволяет использовать зажигатель, который включает приложение на запрошенном порту.

Создание оркестратора

Пришло время создать фактический оркестратор, для этого нам нужно будет прочитать еще один YAML, чтобы настроить оркестратор, сделав его полностью настраиваемым. Наш YAML будет выглядеть так:

servers:
  - address: "localhost"
    ports: [8001, 8002, 8003, 8004]
    ignite_port: 4444 
    path: "/a/parts/search"
    company: "A"
  - address: "127.0.0.1"
    ports: [8001, 8002, 8003, 8004]
    ignite_port: 4444
    path: "/a/parts/search"
    company: "B"

Где:

  • Серверы — это приложения, которые должны работать.
  • Адрес — это место, где находится приложение.
  • Порты — это порты, которые используются для выполнения запросов.
  • Порт воспламенения - это порт, в котором расположен воспламенитель.
  • Путь — это место, где находится конечная точка запроса в микрослужбе.
  • А компания — это название микросервиса.

Этот YAML мы будем загружать аналогично зажигателю, просто изменив значение структуры.

use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::prelude::*;

#[derive(Debug, Deserialize, Serialize)]
pub struct Server {
    pub address: String,
    pub ports: Vec<i32>,
    pub ignite_port: i32,
    pub path: String,
    pub company: String,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct Config {
    pub check_time: i32,
    pub servers: Vec<Server>,
}

impl Config {
    pub fn new(path: &str) -> Config {
        let mut file = File::open(path).expect("Failed to open file");
        let mut contents = String::new();
        file.read_to_string(&mut contents)
            .expect("Failed to read file");
        let config_result: Result<Config, serde_yaml::Error> = serde_yaml::from_str(&contents);
        let config: Config = match config_result {
            Ok(config) => config,
            Err(err) => panic!("Failed to parse YAML: {}", err),
        };

        config
    }
}

Примечание: функция read_config теперь находится внутри реализации структуры Config, что лучше, чем функция!

В оркестраторе используются следующие ящики:

[dependencies]
serde = {version="1.0.163", features=["derive"]}
serde_yaml = "0.9.21"
rocket = "=0.5.0-rc.3"
reqwest = "0.11.18"
serde_json = "1.0"
rocket_contrib = "0.4"
lazy_static = "1.4.0"

Мы объявим глобальную переменную конфигурации, но мы также создадим таблицу поиска, в которой у нас будут адреса серверов в качестве ключей и портов с их номерами вызовов, и запустим нашу ракетную веб-службу, чтобы позже позволить клиентам запросите наши микросервисы:

lazy_static! {
    static ref CONFIG: Config = Config::new("config.yaml");
}

/**
 * Gives ports in the following format
 * 
 * ports = {
 *  "address1": {
 *  20: 0,
 *  21: 0,
 *  22: 0,
 * }, 
 *  "address2": {
 *  50: 0,
 *  51: 0,
 *  52: 0,
 * }, 
 * }
 */
fn get_ports() -> HashMap<String, HashMap<i32, i32>> {
    // Read configuration to get the addresses and ports
    let config = Config::new("config.yaml");
    
    // Generates the hashmap in the format described above
    let ports: HashMap<String, HashMap<i32, i32>> = config.servers.into_iter().map(|ele| {
        let server_ports: HashMap<i32, i32> = ele.ports.into_iter().map(|port| (port, 0)).collect();
        (ele.address, server_ports)
    }).collect();
    
    ports
}

#[launch]
fn rocket() -> _ {
    rocket::build().mount("/", routes![index])
}

Необходимо иметь способ взаимодействия с другими HTTP-сервисами микросервисов, а также с зажигателем, который мы сделали. Таким образом, мы имеем следующую функцию:

/** Makes an HTTP request in the pattern http://address:port/uri/.../q */
async fn make_request(
    port: i32, 
    address: &str,
    q: &str,
    uri: &str,
) -> Result<Value, Box<dyn std::error::Error>> {
    // Generates the url as http://address:port/uri/q
    let request_url = format!("http://{}:{}{}/{}", address, port, uri, q);
    // Time that will be considered as a timeout in the request
    let timeout = Duration::new(1, 0);
    // Creates a HTTP Client
    let client = ClientBuilder::new().timeout(timeout).build()?;
    // Gets the response of the HTTP call to the url
    let response = client.get(&request_url).send().await?;
    
    // Checks if response is successful
    if response.status().is_success() {
        // Gets response text
        let body = response.text().await?;
        // Parses response text into a serde_json json
        let json: Value = serde_json::from_str(&body)?;
        Ok(json)
    } else {
        Ok(Value::Null)
    }
}

Приведенный выше код обеспечивает базовую реализацию запроса HTTP GET на указанный адрес, порт, URI и строку запроса. Он обрабатывает тайм-ауты, извлекает ответ и анализирует тело ответа как JSON, если ответ успешен.

Мы также хотим, чтобы клиент понял, откуда эта информация. Делаем со следующей функцией:

async fn make_request_data(
    port: i32,
    address: &str,
    q: &str,
    uri: &str,
    company: &str,
) -> content::RawJson<String> {
    // Makes an http request
    let res = make_request(port, address, q, uri).await;
    
    // Makes the parsed json into a string to allow the addition of the source
    let json_str = match res {
        Ok(json) => json.to_string(),
        Err(err) => {
            eprintln!("Error: {}", err);
            Value::Null.to_string()
        }
    };

    // Returns a RawJson of the data and the source of this data
    content::RawJson(String::from(format!(
        "{{\"company\": \"{}\", \"data\": {} }}",
        company, json_str
    )))
}

При этом мы объединяем функциональность функции make_request с дополнительными данными (имя источника) и возвращаем строку JSON, содержащую как данные, полученные из HTTP-запроса, так и название компании.

Реализация алгоритма циклического планирования

Для реализации алгоритма циклического перебора мы будем использовать нашу ports HashMap и перебирать каждый сервер, а затем каждый порт, чтобы получить порт, который использовался меньше всего.

fn find_smallest_port(server: &Server) -> Option<i32> {
    // Declare the smallest port and smallest usage 
    let smallest_port;
    let smallest_value;
    // Get the ports hashmap
    let mut data_lock = PORTS.lock().unwrap();
    // Get the ports of this specific server 
    let ports_usage = data_lock.get(&server.address).unwrap();
    // Iterate through the hashmap and an find the port with the least usage
    smallest_port = ports_usage
        .iter()
        .min_by_key(|(_, used)| *used)
        .map(|(port, _)| *port);
    
    // If we got a port, add one to it's usage, because it will be used
    if let Some(port) = smallest_port {
        if let Some(value) = smallest_value {
            *data_lock
                .get_mut(&server.address)
                .unwrap()
                .get_mut(&port)
                .unwrap() = value + 1;
        }
    }
    
    // Return the smallest port used
    smallest_port
}

Теперь мы собираем все это вместе и создаем новую конечную точку GET в нашем оркестраторе, которая позволит запрашивать наши микросервисы с помощью строки.

#[get("/<q>")]
async fn index(q: String) -> content::RawJson<String> {
     // Creates a vector of the data recovered from the servers
     let mut data_recovered = Vec::new();
     // Iterates through the servers and recovers their data, 
     // querying the least used port
    for server in &CONFIG.servers {
        let port;

        // Needed because async function
        {
            // Finds the smallest port
            port = find_smallest_port(server)
                .expect(format!("No port found for {}", &server.address).as_str());
        }
        
        // Request the data to the server in the least used port
        let res = make_request_data(port, &server.address, &q, &server.path, &server.company).await;

        // Retrives the json_string
        let content::RawJson(json_string) = res;
        // Check if the response indicates an error
        let json_value: Value = serde_json::from_str(&json_string).unwrap_or_else(|_| Value::Null);
        
        // If an error ocurred (couldn't connect to the server) ignite!
        if *json_value.get("data").unwrap() == Value::Null {
            // Pings the igniter to turn the application on on this port
            let _ping = make_request_data(
                server.ignite_port,
                &server.address,
                &format!("{}", port),
                "",
                &server.company,
            )
            .await;
            // Calls the server again and appends the data to our responses
            data_recovered.push(
                make_request_data(port, &server.address, &q, &server.path, &server.company).await,
            )
        } else {
            // If the call was successful just append it to our responses
            data_recovered.push(content::RawJson(json_string));
        }
    }

    // Parses and serializes all the data obtained from the server calls
    let values: Vec<Value> = data_recovered
        .into_iter()
        .map(|content::RawJson(json_string)| serde_json::from_str(&json_string))
        .collect::<Result<Vec<Value>, _>>()
        .expect("Failed to parse JSON");
    // Gets the list of json objects as a string
    let json_string = serde_json::to_string(&values).expect("Failed to serialize JSON");

    // Print the port usage
    print_ports();

    // Send the results as a the response
    content::RawJson(String::from(format!("{{\"responses\": {}}}", json_string)))
}

Таким образом, этот код может обрабатывать входящие запросы, перебирать несколько серверов, извлекать данные с каждого сервера и формировать ответ, содержащий все собранные данные. Он выполняет обработку ошибок для неудачных запросов, обрабатывает запуск сервера в случае, если порт отключен, и предоставляет комбинированный ответ данных JSON с нескольких серверов.

После всех этих шагов у нас есть полный оркестратор, который обрабатывает запросы и выполняет поиск на нескольких серверах, а также может включать серверы. Мы можем проверить это с помощью почтальона:

Выводы

Мы создали оркестратор с нуля, который может запрашивать несколько микросервисов. Однако этот оркестратор не идеален, его можно улучшить, чтобы он мог делать больше вещей, например:

  • Обработка постоянных отказов порта.
  • Использование SSH, когда он легко доступен в ОС сервера.
  • Защита воспламенителя, чтобы он не допускал случайных вызовов службы HTTP.
  • Улучшение структуры кода и уменьшение запахов кода.
  • Какими бы ни были ваши мечты и надежды!

Код будет доступен в моем Github. Если вам нужны какие-либо разъяснения, не сомневайтесь, свяжитесь со мной по адресу [email protected].