Коллекция была изменена, ошибка при обработке данных из потока сокета

Я пытаюсь получить данные об уровне тиков из потока агрегированных данных Binance. Как только дата меняется, функция обрабатывает данные предыдущего дня. Я использую Task.Run для создания другого потока, так как новые данные за текущий день все еще поступают в потоковом режиме и должны быть захвачены. Однако я получаю сообщение «Коллекция была изменена, операция перечисления может не выполняться». в цикле foreach функции, обрабатывающей данные. Я не уверен, почему tradeData будет изменен после того, как он будет передан функции обработки? Не уверен, что это имеет значение, но я фиксирую около 40 различных символов в отдельных потоках.

private static void Start()
{
    foreach (SymbolData symbol in symbolData)
    {
         Task.Run(() => SubscribeToSymbol(symbol.symbol));
    }
}

private static void SubscribeToSymbol(string symbol)
{
    Dictionary<decimal, decimal> tradeData = new Dictionary<decimal, decimal>();
    DateTime lastTrade = DateTime.UtcNow;
    try
    {
         var socketClient = new BinanceSocketClient();
         socketClient.FuturesUsdt.SubscribeToAggregatedTradeUpdates(symbol, data =>
         {
              if (data.TradeTime.Date > lastTrade.Date)
              {
                   Task.Run(() => ProcessData(symbol, lastTrade.Date, tradeData));
                   tradeData.Clear();
              }
              lastTrade = data.TradeTime;

              if (tradeData.ContainsKey(data.Price))
              {
                   tradeData[data.Price] = tradeData[data.Price] + data.Quantity;
              }
              else
              {
                   tradeData[data.Price] = data.Quantity;
              }
         });
     }
     catch { }
}


private static void ProcessData(string symbol, DateTime date, Dictionary<decimal, decimal> tradeData)
{
     foreach (var price in tradeData)
     {
     //Error: Collection was modified enumeration operation may not execute.
     }
}

person sleepymatto    schedule 12.01.2021    source источник
comment
catch { } Не делайте этого. Как минимум запишите что-нибудь. Такое исключение только усложнит диагностику проблем с вашим кодом.   -  person    schedule 12.01.2021
comment
Ну, вы модифицируете tradeData во время его обработки: ProcessData может работать параллельно с tradeData[data.Price] = tradeData[data.Price] + data.Quantity; или tradeData[data.Price] = data.Quantity;, потому что вы начали его с Task.Run().   -  person Xaver    schedule 12.01.2021
comment
Расширение Xaver — вы очищаете торговые данные, как только запускаете их для асинхронного запуска. Таким образом, если время выбрано правильно, цикл foreach запускается, а затем, пока цикл foreach выполняется, tradeData очищается, поэтому foreach терпит неудачу, потому что элементы, над которыми он работал, исчезают. Вы могли бы лучше получить то, что ищете, передав копию tradeData в ProcessData вместо фактической передачи tradeData.   -  person Russ    schedule 12.01.2021
comment
Хорошо, интересно. Я думал, что шаг Task.Run по крайней мере завершит отправку текущей копии tradeData в ее состоянии в функцию ProcessData, прежде чем продолжить работу с остальной частью кода. Это хорошее предложение — я сделаю копию данных перед отправкой в ​​функцию ProcessData.   -  person sleepymatto    schedule 12.01.2021


Ответы (3)


Вы неоднократно вызываете Task.Run и в результате имеете более одного потока, работающего с вашим объектом tradeData, поэтому вы видите исключение, измененное коллекцией.

Я бы порекомендовал вам переосмыслить дизайн кода, чтобы у вас не было нескольких потоков, работающих с одними и теми же объектами. Если вам абсолютно необходимо, чтобы потоки работали с одними и теми же объектами, тогда следует использовать блокировку:

private static object _objlock = new object();

private static void ProcessData(string symbol, DateTime date, Dictionary<decimal, decimal> tradeData)
{
    lock (_objlock)
    {
        foreach (var price in tradeData)
        {
     
        }
     }
}

Просто будьте осторожны с взаимоблокировками при использовании блокировки. Я бы порекомендовал прочитать о блокировках и многопоточности в С#.

person Noobie3001    schedule 12.01.2021
comment
Хорошо, спасибо за предложение, я рассмотрю это - person sleepymatto; 12.01.2021

К тому времени, когда вы запускаете цикл foreach по словарю, снаружи он очищается. Вы должны сделать копию и отправить ProcessData.

Task.Run(() => ProcessData(symbol, lastTrade.Date, tradeData.ToArray()));

И получить копию:

private static void ProcessData(string symbol, DateTime date, KeyValuePair<decimal, decimal>[] tradeData)
{
    foreach (var price in tradeData)
    {
// ... do your thing here
    }
}

Или, что еще проще, просто воссоздайте новый словарь вместо его очистки:

//tradeData.Clear();
tradeData = new Dictionary<decimal, decimal>();
person user1344783    schedule 12.01.2021

Однажды у меня была похожая проблема, когда я удалял значение из списка в цикле foreach, я изменил его на простой цикл for, и это поможет вам попробовать.

person Hesh    schedule 12.01.2021