Несколько длительных реализаций IHostedSerivces или BackgroundService

Кажется, я не могу найти много информации по этому вопросу. Проблема, с которой я столкнулся, заключается в том, что мне нужно запускать 2+ длительно работающих фоновых службы, но выполняется только первая зарегистрированная служба ExecuteAsync. Я попытался реализовать его через BackgroundService и поместить код в ExecuteAsync, и я попытался реализовать IHostedService напрямую и поместить длительный код в StartAsync.

Я думаю, проблема в возврате Task.CompletedTask; никогда не называется. Например, у меня есть два потребителя Kafka, реализованные как BackgroundServices. Код выглядит одинаково как в теме, так и в методах OnMessage.

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
    var kafkaEndpoint = _kafkaConfig.Endpoint;

    var kafkaTopic = "PhoenixEventStore";

    var consumerConfig = new Dictionary<string, object>
    {
        { "group.id", "consumer1" },
        { "bootstrap.servers", kafkaEndpoint },
        { "auto.offset.reset", "earliest" }
    };

    using (var consumer = new Consumer<Null, string>(consumerConfig, null, new StringDeserializer(Encoding.UTF8)))
    {
        consumer.OnMessage += (obj, msg) =>
        {
            Log.Information($"Consumer1 Received {msg.Value}");
        };

        consumer.OnPartitionEOF += (_, end) =>
        {
            Log.Information($"Consumer1 Reached end of topic {end.Topic} partition {end.Partition}.");
        };

        consumer.OnError += (_, error) =>
        {
            Log.Error($"Consumer1 Error: {error}");
        };

        consumer.Subscribe(new List<string>() { kafkaTopic });

        while (!stoppingToken.IsCancellationRequested)
        {
            consumer.Poll(TimeSpan.FromSeconds(10));
        }

        //consider setting value that check whether the consumer has stopped polling.
    }
    return Task.CompletedTask;
}

Поскольку обе службы работают долго, Task.Complete никогда не выполняется. Однако, если я закомментирую цикл while, будут задействованы обе службы ExecuteAsync, а не только первая зарегистрированная.

Я нашел работу, которая, кажется, работает, но мне интересно, есть ли у кого-нибудь лучший подход.

В основном я реорганизую код, чтобы длительный код запускался в методе void под названием StartConsumer, а затем мой ExecuteAsync выглядел так

  protected override Task ExecuteAsync(CancellationToken stoppingToken)
  {
        Task.Run(() => StartConsumer(stoppingToken));
        return Task.CompletedTask;
  }

Оба сервиса зарегистрированы с использованием

services.AddHostedService<MyHostedService1>
services.AddHostedService<MyHostedService2>

person user2258403    schedule 17.10.2018    source источник
comment
У меня была такая же проблема, и ваше решение сработало. Спасибо.   -  person Rafał Ryszkowski    schedule 29.11.2018


Ответы (1)


У меня такой же чехол и он работает. Шаблон

 protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await Foo()
            }
            catch (Exception ex)
            {
                _log.Error(ex.Message, exception: ex);
            }

            await Task.Delay(timeInMlSeconds);
        }
    }
person Илья Филиппов    schedule 26.01.2021