Синхронизация потоков (блокировка), которая освобождает только последний поток

Каков правильный способ гарантировать, что только «последний» поток получит доступ к мьютексу/заблокированной области, в то время как промежуточные потоки не получат блокировку?

Пример последовательности:

A acquires lock
B waits
C waits
B fails to acquire lock*
A releases lock
C acquires lock

*B не должен получить блокировку либо через исключение (как в SemaphoreSlim.Wait(CancellationToken), либо через логическую конструкцию типа Monitor.TryEnter().

Я могу придумать несколько подобных схем для достижения этой цели (например, использование CancellationTokenSource и SemaphoreSlim), но ни одна из них не кажется особенно элегантной.

Есть ли обычная практика для этого сценария?


person Andrew Hanlon    schedule 02.09.2015    source источник
comment
Это интересный вопрос — так что же произойдет, если — между A снятием блокировки и C ее получением — D войдет? Мне просто любопытно - пытаюсь придумать сценарий, где потребуется такой подход.   -  person xxbbcc    schedule 02.09.2015
comment
Извините, я не понял, B не удается, что вы имеете в виду? Вы заморозили нить из окна треда?   -  person Kapoor    schedule 02.09.2015
comment
@xxbbcc Желаемый результат состоит в том, что последний входящий (D в вашем сценарии) успешно получает блокировку, а промежуточные потоки - нет (B и C не входят в заблокированный раздел).   -  person Andrew Hanlon    schedule 02.09.2015
comment
@Kapoor Я имею в виду, что B не может получить блокировку либо через исключение (как в SemaphoreSlim.Wait(CancellationToken), либо через конструкцию логического типа Monitor.TryEnter.   -  person Andrew Hanlon    schedule 02.09.2015
comment
@AndrewHanlon Да, я так и подозревал. Однако очень сложно определить, что такое последний вход. Я предполагаю, что как только начинается процесс предоставления блокировки потоку, любые новые потоки становятся первыми.   -  person xxbbcc    schedule 02.09.2015
comment
@xxbbcc По сути, как только новый поток входит в очередь ожидания, все остальные ожидающие потоки «отменяются».   -  person Andrew Hanlon    schedule 02.09.2015


Ответы (2)


Это должно работать так, как вы хотите, для управления им используется SemaphoreSlim с размером 1. Я также добавил поддержку передачи CancelationToken для преждевременной отмены ожидания блокировки, а также поддерживает WaitAsync возврат задачи вместо блокировки.

public sealed class LastInLocker : IDisposable
{
    private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1);
    private CancellationTokenSource _cts = new CancellationTokenSource();
    private bool _disposed = false;

    public void Wait()
    {
        Wait(CancellationToken.None);
    }

    public void Wait(CancellationToken earlyCancellationToken)
    {
        if(_disposed)
            throw new ObjectDisposedException("LastInLocker");

        var token = ReplaceTokenSource(earlyCancellationToken);
        _semaphore.Wait(token);
    }

    public Task WaitAsync()
    {
        return WaitAsync(CancellationToken.None);
    }

    public async Task WaitAsync(CancellationToken earlyCancellationToken)
    {
        if (_disposed)
            throw new ObjectDisposedException("LastInLocker");

        var token = ReplaceTokenSource(earlyCancellationToken);

        //I await here because if ReplaceTokenSource thows a exception I want the 
        //observing of that exception to be deferred until the caller awaits my 
        //returned task.
        await _semaphore.WaitAsync(token).ConfigureAwait(false);
    }

    public void Release()
    {
        if (_disposed)
            throw new ObjectDisposedException("LastInLocker");

        _semaphore.Release();
    }

    private CancellationToken ReplaceTokenSource(CancellationToken earlyCancellationToken)
    {
        var newSource = CancellationTokenSource.CreateLinkedTokenSource(earlyCancellationToken);
        var oldSource = Interlocked.Exchange(ref _cts, newSource);
        oldSource.Cancel();
        oldSource.Dispose();

        return newSource.Token;
    }

    public void Dispose()
    {
        _disposed = true;

        _semaphore.Dispose();
        _cts.Dispose();
    }
}

Вот небольшая тестовая программа, которая воссоздает ваш тестовый пример

internal class Program
{
    static LastInLocker locker = new LastInLocker();
    private static void Main(string[] args)
    {
        Task.Run(() => Test("A"));
        Thread.Sleep(500);
        Task.Run(() => Test("B"));
        Thread.Sleep(500);
        Task.Run(() => Test("C"));
        Console.ReadLine();
    }

    private static void Test(string name)
    {
        Console.WriteLine("{0} waits for lock", name);
        try
        {
            locker.Wait();
            Console.WriteLine("{0} acquires lock", name);

            Thread.Sleep(4000);
            locker.Release();

            Console.WriteLine("{0} releases lock", name);
        }
        catch (Exception)
        {
            Console.WriteLine("{0} fails to acquire lock", name);
        }
    }
}

выходы

A waits for lock
A acquires lock
B waits for lock
C waits for lock
B fails to acquire lock
A releases lock
C acquires lock
C releases lock
person Scott Chamberlain    schedule 02.09.2015
comment
Привет, Скотт, спасибо за подробный ответ. Это очень близко к методу, который я использовал (просто заменив внутреннюю блокировку на Interlocked.Exchange). Но обернуть его в многоразовый класс — очень приятное прикосновение. - person Andrew Hanlon; 03.09.2015
comment
@AndrewHanlon Я хотел, чтобы .Cancel() и назначение нового токена были атомарной операцией, иначе я бы, вероятно, сделал то же самое. Немного переписав, я думаю, что мог бы сделать это с помощью InterlockedExchange, но эта блокировка, вероятно, будет очень маленькой и чаще всего не оспариваемой, поэтому накладные расходы не так уж велики. - person Scott Chamberlain; 03.09.2015
comment
Я надеялся/думал, что должен быть более простой примитив для выполнения этого сценария, но на самом деле это может быть правильный подход. - person Andrew Hanlon; 03.09.2015
comment
@AndrewHanlon это было достаточно просто, как только я попытался подумать об этом, я обновил ответ на Interlocked.Exchange - person Scott Chamberlain; 03.09.2015

Попробуй это:

public interface ILocker
{
    bool GetLock();

    void Release();
}

class Locker : ILocker
{
    private long m_NumberOfTimeGetLockWasCalled = 0;

    private readonly object m_LockingObject = new object();

    private readonly object m_LockingObject2 = new object();

    public bool GetLock()
    {

        long lock_count = 0;

        var lock_was_taken = false;

        lock(m_LockingObject)
        {
            lock_count = m_NumberOfTimeGetLockWasCalled++;

            lock_was_taken = Monitor.TryEnter(m_LockingObject2);

            if (lock_was_taken)
                return true;

        }

        while(!lock_was_taken)
        {

            Thread.Sleep(5);

            lock(m_LockingObject)
            {

                if (lock_count != m_NumberOfTimeGetLockWasCalled)
                    return false;

                lock_was_taken = Monitor.TryEnter(m_LockingObject2);

                if (lock_was_taken)
                    break;

            }


        }


        return true;
    }

    public void Release()
    {
        Monitor.Exit(m_LockingObject2);
    }
}
person Yacoub Massad    schedule 02.09.2015