Повторное использование hyper :: client и tokio_core в Iron и Hyper

Я делаю запрос клиента внутри обработчика Iron. Как я могу повторно использовать Tokio Core и Hyper Client? Я использую hyper 0.11.0 и tokio-core 0.1.

fn get_result(req: &mut Request) -> IronResult<Response> {
    let mut payload = String::new();
    req.body.read_to_string(&mut payload).unwrap();

    // can we re-use core and client somehow. Making then global with lazy_static!() does not work.
    let mut core = tokio_core::reactor::Core::new().unwrap();
    let client = Client::new(&core.handle());

    let uri = "http://host:port/getResult".parse().unwrap();
    let mut req: hyper::Request = hyper::Request::new(hyper::Method::Post, uri);
    req.headers_mut().set(ContentType::json());
    req.headers_mut().set(ContentLength(payload.len() as u64));
    req.set_body(payload);

    let mut results: Vec<RequestFormat> = Vec::new();
    let work = client.request(req).and_then(|res| {
        res.body().for_each(|chunk| {
            let re: ResultFormat = serde_json::from_slice(&chunk).unwrap();
            results.push(re);
            Ok(())
        })
    });

    Ok(Response::with(
        (iron::status::Ok, serde_json::to_string(&results).unwrap()),
    ))
}

person Vishal Kumar    schedule 14.07.2017    source источник


Ответы (1)


Я создал класс Downloader, который обертывает клиент и ядро. Ниже приведен фрагмент.

use hyper;
use tokio_core;
use std::sync::{mpsc};
use std::thread;
use futures::Future;
use futures::stream::Stream;
use std::time::Duration;
use std::io::{self, Write};
use time::precise_time_ns;
use hyper::Client;

pub struct Downloader {
    sender : mpsc::Sender<(hyper::Request, mpsc::Sender<hyper::Chunk>)>,
    #[allow(dead_code)]
    tr : thread::JoinHandle<hyper::Request>,
}
impl Downloader {
    pub fn new() -> Downloader {
        let (sender, receiver) = mpsc::channel::<(hyper::Request,mpsc::Sender<hyper::Chunk>)>();
        let tr = thread::spawn(move||{
            let mut core = tokio_core::reactor::Core::new().unwrap();
            let client =  Client::new(&core.handle());
            loop {
                let (req , sender) = receiver.recv().unwrap();
                let begin = precise_time_ns();
                let work = client.request(req)   
                .and_then(|res| {
                    res.body().for_each(|chunk| {

                        sender.send(chunk)
                        .map_err(|e|{
                            //io::sink().write(&chunk).unwrap();
                            io::Error::new(io::ErrorKind::Other, e)
                        })?;
                        Ok(())
                    })
                    //sender.close();
                //res.body().concat2()
                });
            core.run(work).map_err(|e|{println!("Error Is {:?}", e);});
           //This time prints same as all request processing time. 
            debug!("Time taken In Download {:?} ms", (precise_time_ns() - begin) / 1000000);
            }
        });
        Downloader{sender,
                tr,
        }
    }

    pub fn download(&self, req : hyper::Request, results:  mpsc::Sender<Vec<u8>>){
        self.sender.send((req, results)).unwrap();
    }
}

Теперь у клиента этого класса может быть статическая переменная.

lazy_static!{
    static ref DOWNLOADER : Mutex<downloader::Downloader> = 
Mutex::new(downloader::Downloader::new());
}
let (sender, receiver) = mpsc::channel();
DOWNLOADER.lock().unwrap().download(payload, sender);

а затем прочитайте канал приема. Может потребоваться закрыть канал отправителя с помощью sender.drop ()

person Vishal Kumar    schedule 17.07.2017