Rx: количество сгруппированных событий в движущемся окне

Я начал рассматривать использование реактивных расширений с EventStore. В качестве доказательства концепции я хотел бы посмотреть, смогу ли я заставить Rx потреблять поток событий и выводить количество событий, сгруппированных по типу, в течение одной секунды.

Итак, скажем, я потребляю поток с именем «заказы», ​​я бы хотел, чтобы в консоли появилось что-то вроде следующего:

OrderCreated 201
OrderUpdated 111

(проходит секунда..)

OrderCreated 123
OrderUpdated 132

И так далее.

До сих пор мне удавалось получить количество всех событий в секунду. Но не могу сгруппировать их по типу события.

Код, который я использую, основан на сутье Джеймса Ньюджента:

internal class EventStoreRxSubscription
{
    public Subject<ResolvedEvent> ResolvedEvents { get; }

    public Subject<SubscriptionDropReason>  DroppedReasons { get; }
    public EventStoreSubscription Subscription { get; }

    public EventStoreRxSubscription(EventStoreSubscription subscription, Subject<ResolvedEvent> resolvedEvent, Subject<SubscriptionDropReason> droppedReasons)
    {
        Subscription = subscription;
        ResolvedEvents = resolvedEvent;
        DroppedReasons = droppedReasons;
    }
}

static class EventStoreConnectionExtensions
{
    public static Task<EventStoreRxSubscription> SubscribeTo(this IEventStoreConnection connection, string streamName, bool resolveLinkTos)
    {
        return Task<EventStoreRxSubscription>.Factory.StartNew(() => {

            var resolvedEvents = new Subject<ResolvedEvent>();
            var droppedReasons = new Subject<SubscriptionDropReason>();

            var subscriptionTask = connection.SubscribeToStreamAsync(streamName, resolveLinkTos, 
                                                                    (subscription, @event) => resolvedEvents.OnNext(@event), 
                                                                    (subscription, dropReason, arg3) => droppedReasons.OnNext(dropReason));
            subscriptionTask.Wait();

            return new EventStoreRxSubscription(subscriptionTask.Result, resolvedEvents, droppedReasons);
        });
    }
}

class Program
{
    static void Main(string[] args)
    {
         var connection = EventStoreConnection.Create(new IPEndPoint(IPAddress.Loopback, 1113));
         connection.ConnectAsync();

         var subscriptionTask = connection.SubscribeTo("orders", true);
         subscriptionTask.Wait();

         var events = subscriptionTask.Result.ResolvedEvents;

         var query = events.Timestamp()
                .Buffer(TimeSpan.FromSeconds(1))
                .Select(e => e.Count);

         query.Subscribe(Console.WriteLine);

         Console.ReadLine();
    }
 }

person David Brower    schedule 29.02.2016    source источник
comment
мой обновленный ответ помог вам с этим?   -  person Oliver    schedule 03.03.2016
comment
Да. Я только что получил шанс попробовать это, и это выглядит хорошо. Большое спасибо!   -  person David Brower    schedule 03.03.2016


Ответы (1)


Я делал что-то подобное раньше, я использовал Throttle для группировки всех событий в пределах заданной частоты, однако вы можете использовать Buffer для получения количества/сбора за каждый период.

Пример ниже представляет собой абстрактный пример того, как я этого добился, где AggregateType и AggregateFunction будут заменены вашим собственным типом и агрегацией.

GroupByUntil позволяет группировать по типу в течение заданного периода.

subscription = observable
    .GroupByUntil(e => e.Key, e => e.Buffer(TimeSpan.FromSeconds(1)))
    .SelectMany(e => e.Aggregate(new AggregateType(), (a, e) => AggregateFunction(a, e))
    .Subscribe(onNext, onError, onCompleted);

ИЗМЕНИТЬ

Ниже приведен краткий пример, который я придумал, чтобы показать, как это можно сделать.

public class EventType
{
    public string Type { get; set; }
}

public class AggregatedType
{
    public string EventType { get; set; }
    public int Count { get; set; }
}

class Program
{
    public delegate void ExampleEventHandler(EventType e);

    public static event ExampleEventHandler Event;

    static void Main(string[] args)
    {
        var subscription = Observable.FromEvent<ExampleEventHandler, EventType>(e => Event += e, e => Event -= e)
            .GroupByUntil(e => e.Type, e => e.Buffer(TimeSpan.FromSeconds(1)))
            .SelectMany(e => e
                .Select(ev => new AggregatedType {  EventType = ev.Type })
                .Aggregate(new AggregatedType(), (a, ev) => new AggregatedType { EventType = ev.EventType, Count = a.Count + 1 }))
            .Subscribe(OnAggregaredEvent, OnException, OnCompleted);

        Event(new EventType { Type = "A" });
        Event(new EventType { Type = "A" });
        Event(new EventType { Type = "B" });
        Event(new EventType { Type = "B" });

        SpinWait.SpinUntil(()=> false, TimeSpan.FromSeconds(2));

        Event(new EventType { Type = "A" });
        Event(new EventType { Type = "A" });
        Event(new EventType { Type = "B" });
        Event(new EventType { Type = "B" });

        Console.ReadLine();
    }

    static void OnAggregaredEvent(AggregatedType aggregated)
    {
        Console.WriteLine("Type: {0}, Count: {1}", aggregated.EventType, aggregated.Count);
    }

    static void OnException(Exception ex)
    {
    }

    static void OnCompleted()
    {
    }
}
person Oliver    schedule 29.02.2016
comment
Спасибо. Я попытался превратить ваш абстрактный пример в код, но я действительно изо всех сил пытаюсь получить что-то работоспособное. AggregateFunction, кажется, принимает каждого члена последовательности, тогда как я хотел бы получить количество всех членов для каждого типа события. - person David Brower; 29.02.2016