Как обернуть многопоточный обратный вызов для async / await?

У меня есть код async / await, и я хочу использовать API, похожий на веб-сокет. Для получения новых сообщений требуется обратный вызов, который вызывается из другого потока.

Могу ли я выполнить этот обратный вызов в том же контексте async / await, что и инициация соединения, не прибегая к блокировке?

Я думаю, что это то, для чего нужен SynchronizationContext, но я не могу сказать, является ли он потокобезопасным. Если я регистрирую идентификатор потока, каждый обратный вызов будет в другом потоке. Если я зарегистрирую Task.CurrentId, он будет равен нулю. Я думаю, что один и тот же контекст синхронизации перемещается по разным потокам, так что это может быть нормально, но я не знаю, как это подтвердить.

// External api, the callbacks will be from multiple threads
public class Api
{
    public static Connect(
        Action<Connection> onConnect,
        Action<Connection> onMessage) 
    {}
}

async Task<Connection> ConnectAsync(Action<Message> callback)
{
    if (SynchronizationContext.Current == null)
    {
        SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
    }

    var syncContext = SynchronizationContext.Current;

    var tcs = new TaskCompletionSource<Connection>();

    // use post() to ensure callbacks from other threads are executed thread-safely

    Action<Connection> onConnect = conn => 
    {
        syncContext.Post(o => tcs.SetResult(conn), null);
    };
    Action<Message> onMsg = msg => 
    { 
        syncContext.Post(o => callback(msg), null);
    };

    // call the multi-threaded non async/await api supplying the callbacks

    Api.Connect(onConnect, onMsg);

    return await tcs.Task;
}

var connection = await ConnectAsync(
    msg => 
    { 
        /* is code here threadsafe with the send without extra locking? */ 
    });

await connection.Send("Hello world);

person daw    schedule 17.02.2018    source источник
comment
Я не понял, что вы пытаетесь сделать, но могу сказать, что отправка в SynchronizationContext (для этого конкретного, который вы используете в своем коде, если SynchronizationContext.Current имеет значение null) просто выполнит обратный вызов в потоке пула потоков, вот и все.   -  person Evk    schedule 17.02.2018
comment
Могут ли несколько вызовов Post () в одном и том же контексте выполняться параллельно или они гарантированно будут последовательными, даже если они не в одном потоке?   -  person daw    schedule 17.02.2018
comment
В этом конкретном контексте (базовый SynchronizationContext, который вы устанавливаете с помощью SynchronizationContext.SetSynchronizationContext(new SynchronizationContext())) - публикация просто отправляет обратный вызов потоку пула потоков, и все, больше ничего не делается. Так что да - обратные вызовы будут выполняться параллельно. В других контекстах все может быть иначе.   -  person Evk    schedule 17.02.2018
comment
Я понимаю. Это означает, что потоки пользовательского интерфейса инкапсулируют поток и реализуют насос сообщений в качестве своего контекста синхронизации, а этот - нет.   -  person daw    schedule 17.02.2018
comment
Да, это правильно. Я бы сказал, ваша библиотека не должна заботиться о таких вещах. Выполнение обратных вызовов в фоновых потоках - обычная практика, и вызывающий должен решать это сам.   -  person Evk    schedule 17.02.2018
comment
Спасибо Evk, это помогло мне найти решение. К вашему сведению, код моделировал однопоточную модель выполнения, и я не мог ввести обычные стратегии блокировки с несколькими записывающими устройствами.   -  person daw    schedule 18.02.2018


Ответы (1)


Спасибо @Evk, который указал, что SynchronizationContext по умолчанию на самом деле ничего не синхронизирует и не реализует отправку / публикацию так, как вы ожидаете.

https://github.com/StephenClearyArchive/AsyncEx.Context

Исправление заключается в использовании асинхронной библиотеки Стивена Клири, которая реализует SynchronizationContext как перекачку сообщений в одном потоке, чтобы вызовы post () вызывались в том же потоке, что и другие ожидаемые вызовы.

// External api, the callbacks will be from multiple threads
public class Api
{
    public static Connect(
        Action<Connection> onConnect,
        Action<Connection> onMessage) 
    {}
}

async Task<Connection> ConnectAsync(Action<Message> callback)
{
    var syncContext = SynchronizationContext.Current;

    var tcs = new TaskCompletionSource<Connection>();

    // use post() to ensure callbacks from other threads are executed thread-safely

    Action<Connection> onConnect = conn => 
    {
        syncContext.Post(o => tcs.SetResult(conn), null);
    };
    Action<Message> onMsg = msg => 
    { 
        syncContext.Post(o => callback(msg), null);
    };

    // call the multi-threaded non async/await api supplying the callbacks

    Api.Connect(onConnect, onMsg);

    return await tcs.Task;
}

//
// https://github.com/StephenClearyArchive/AsyncEx.Context
//
Nito.AsyncEx.AsyncContext.Run(async () =>
{
    var connection = await ConnectAsync(
        msg => 
        { 
            /* this will execute in same thread as ConnectAsync and Send */ 
        });

    await connection.Send("Hello world);

    ... more async/await code
});
person daw    schedule 17.02.2018