f# mailboxprocessor — отвечает, не дожидаясь доставки

Я использую агент (MailboxProcessor) для выполнения некоторой обработки с отслеживанием состояния, когда требуется ответ.

  • Звонящий отправляет сообщение, используя MailboxProcessor.PostAndAsyncReply
  • В агенте ответ дается с AsyncReplyChannel.Reply

Однако, покопавшись в исходном коде f#, я обнаружил, что агент не будет обрабатывать следующее сообщение, пока не будет доставлен ответ. В целом это хорошо. Но в моем случае агенту предпочтительнее продолжать обрабатывать сообщения, чем ждать доставки ответа.

Это проблематично сделать что-то подобное, чтобы доставить ответ? (Или есть лучшая альтернатива?)

async { replyChannel.Reply response } |> Async.Start

Я понимаю, что этот метод не гарантирует, что ответы будут доставлены по порядку. Я не против.

Справочный пример

// agent code
let doWork data =
    async { ... ; return response }

let rec loop ( inbox : MailboxProcessor<_> ) =
    async {
        let! msg = inbox.Receive()
        match msg with
        | None ->
            return ()

        | Some ( data, replyChannel ) ->
            let! response = doWork data
            replyChannel.Reply response (* waits for delivery, vs below *)
            // async { replyChannel.Reply response } |> Async.Start
            return! loop inbox
    }

let agent =
    MailboxProcessor.Start(loop)

// caller code
async {
    let! response =
        agent.PostAndAsyncReply(fun replyChannel -> Some (data, replyChannel))
    ...
}

person Kasey Speakman    schedule 08.02.2017    source источник
comment
То, что вы пытаетесь сделать, противоречит сути PostAndAsyncReply, так зачем вообще это использовать? Попросите клиента передать, скажем, наблюдаемую тему и втолкнуть в нее ваши ответы.   -  person Fyodor Soikin    schedule 08.02.2017
comment
Стоимость синхронизации должна быть оплачена в любом случае (с помощью наблюдаемого или ответного канала) для доставки ответа. И метод, использованный в вопросе, не добавляет лишних строк кода и не вводит новых концепций. Но если вы сможете определить другие компромиссы и, возможно, привести пример, это может стать отличным ответом.   -  person Kasey Speakman    schedule 08.02.2017
comment
Вы сами определили компромисс: канал ответа не работает вне очереди.   -  person Fyodor Soikin    schedule 08.02.2017
comment
Позвольте мне посмотреть, правильно ли я понимаю вашу цель: 1. Вы хотите, чтобы агент не ждал доставки сообщения, чтобы он мог продолжать обрабатывать сообщения, пока предыдущее сообщение находится в пути 2. Вы хотите, чтобы вызывающий абонент обрабатывал сообщение синхронно ?   -  person N_A    schedule 08.02.2017
comment
@mydogisbox 1 Да. 2 Нет. Вызывающий объект также работает в асинхронном режиме, который возобновляется после получения ответа. Стоимость синхронизации, упомянутая в предыдущем комментарии, связана со стоимостью координации потоков под прикрытием. т.е. f# AsyncReplyChannel внутренне использует ManualResetEvent, чтобы сигнализировать о готовности результата. Поэтому, когда вызывается Reply, он не возвращается до тех пор, пока не будет доставлен ответ. Это действие заставляет агента ждать, пока вызывающая сторона не получит ответ, прежде чем обрабатывать следующее сообщение.   -  person Kasey Speakman    schedule 08.02.2017
comment
Понятно. Похоже, вам нужна функциональность, несовместимая с MailboxProcessor. Возможно, вы могли бы взять исходный код и написать свой собственный?   -  person N_A    schedule 08.02.2017
comment
Я бы не сказал, что это несовместимо. То, как я упомянул в вопросе, работает. Но я не знал, вызовет ли это другие негативные последствия. Я надеялся, что кто-нибудь, более знакомый с MailboxProcessor, знает ответ или лучшую альтернативу.   -  person Kasey Speakman    schedule 08.02.2017


Ответы (1)


FSharp.Control.AsyncSeq помещает более дружелюбное лицо поверх процессор почтового ящика. За асинхронными последовательностями немного проще следить, однако параллельное сопоставление реализации по умолчанию имеет ту же проблему, что и описанная, ожидая сопоставления предыдущего элемента в последовательности, чтобы сохранить порядок.

Итак, я сделал новую функцию, которая является просто исходной AsyncSeq.mapAsyncParallel, измененной так, что она больше не является истинной картой, поскольку она неупорядочена, но она отображает все, и ленивая последовательность прогрессирует по мере завершения работы.

Полный исходный код для AsyncSeq.mapAsyncParallelUnordered

let mapAsyncParallelUnordered (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {
  use mb = MailboxProcessor.Start (fun _ -> async.Return())
  let! err =
    s 
    |> AsyncSeq.iterAsyncParallel (fun a -> async {
      let! b = f a
      mb.Post (Some b) })
    |> Async.map (fun _ -> mb.Post None)
    |> Async.StartChildAsTask
  yield! 
    AsyncSeq.replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive))
  }

Ниже приведен пример того, как я использую его в инструменте, использующем бесплатный и очень медленный API SSLlabs, который легко может быть перегружен. parallelProcessHost возвращает ленивый AsyncSeq, который генерируется запросами webapi, поэтому AsyncSeq.mapAsyncParallelUnordered AsyncSeq.toListAsync фактически запускает запросы и позволяет консоли распечатывать результаты по мере поступления, независимо от отправленного порядка.

Полный исходный код

let! es = 
    hosts
    |> Seq.indexed
    |> AsyncSeq.ofSeq
    |> AsyncSeq.map parallelProcessHost
    |> AsyncSeq.mapAsyncParallelUnordered AsyncSeq.toListAsync
    |> AsyncSeq.indexed
    |> AsyncSeq.map (fun (i, tail) -> (consoleN "-- %d of %i --- %O --" (i+1L) totalHosts (DateTime.UtcNow - startTime)) :: tail )
    |> AsyncSeq.collect AsyncSeq.ofSeq
    |> AsyncSeq.map stdoutOrStatus //Write out to console
    |> AsyncSeq.fold (|||) ErrorStatus.Okay
person jbtule    schedule 17.02.2020