StreamInsight, сопоставление событий

Я столкнулся с ситуацией со StreamInsight, когда у меня есть 1 источник ввода с разными типами событий, которые нужно обрабатывать по-разному, но в конечном итоге они сопоставляются с другими событиями из того же источника.

Ниже я создал (НАМНОГО) более простой сценарий, в котором источник ввода генерирует случайные числа (для простоты — 1 и 0). Если число четное, мы хотим сохранить его до дальнейшего уведомления (неизвестная продолжительность). Если число нечетное, мы хотим сопоставить его с n-1 из четного потока, а затем удалить n-1 из четного потока. Если совпадений нет, нечетное число просто обрабатывается как есть без каких-либо дополнительных вычислений. У меня все работает, как и ожидалось, вплоть до удаления соответствующего n-1 из четного потока. Совпадение установлено, и совпадение передается выходному адаптеру, но остается доступным для другого соединения, которое можно выполнить с данным событием. То, что я собрал за несколько дней экспериментов и исследований, так это то, что мне каким-то образом нужно обрезать продолжительность события четного потока (ClipEventDuration), предположительно как часть фильтра в GenerateEvenJoinQuery, однако все, что я пробовал, не дало никаких изменений или нежелательно Результаты. Я также попытался изменить даже Stream на форму Interval с еще меньшим успехом. Будем признательны за любую помощь или предложения.

Например, для упрощенного списка: [ 1, 0, 1, 1, 0, 0, 1, 1, 1 ]

Я ожидаю, что вывод будет выглядеть так: [ 1, 100, 1, 100, 100, 1 ]

Я бы также принял в качестве реального сценария, с которым я работаю, первый вывод на самом деле невозможен: обратите внимание, что 2-й и 3-й 0 соединены с одной 1. [ 1, 100, 1, 100, 1, 1 ]

...
    CepStream<int> inputStream = CepStream<int>.Create(
            app
            , "ATGInputStream"
            , typeof(InputAdapterFactory)
            , new InputConfig()
            , EventShape.Point);


    var everythingFilter = from e in inputStream select e;
    Query everythingQuery = GenerateEverythingQuery(app, inputStream);
    var everythingStream = everythingQuery.ToStream<int>("everythingStream");

    Query oddQuery = GenerateOddQuery(app, everythingStream);
    var oddAts = new AdvanceTimeSettings(new AdvanceTimeGenerationSettings(1, TimeSpan.FromTicks(-1), false), null, AdvanceTimePolicy.Drop);
    var oddStream = oddQuery.ToStream<int>("oddStream", oddAts);

    // only inject a cti in to even when we need it
    var ats = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings("oddStream"), AdvanceTimePolicy.Adjust);
    Query evenQuery = GenerateEvenQuery(app, everythingStream);
    var evenStream = evenQuery.ToStream<int>("evenStream", ats);

    Query joinQuery = GenerateOddEvenJoinQuery(app, evenStream, oddStream);
    var joinStream = joinQuery.ToStream<int>("joinStream");
...


private Query GenerateOddEvenJoinQuery(Application app, CepStream<int> evenStream, CepStream<int> oddStream) {
    // (o * e) + 100 is an easy way to tell we had a match
    var filter = (from o in oddStream
                  from e in evenStream
                  where e == (o - 1)
                  select (o * e) + 100);

    // LEFT ANTI SEMI JOIN
    var filter2 = from o in oddStream
                    where (from e in evenStream where e == o - 1 select e).IsEmpty()
                    select o;

    var joinFilter = filter.Union(filter2);

    return joinFilter.ToQuery(
            app
            , "Number Join Query"
            , "Joins number streams."
            , EventShape.Point
            , StreamEventOrder.FullyOrdered);
}

private Query GenerateEvenQuery(Application app, CepStream<int> stream) {
    var evenFilter = (from e in stream where e % 2 == 0 select e).AlterEventDuration(e => TimeSpan.MaxValue);

    return evenFilter.ToQuery(
            app
            , "EvenQuery"
            , ""
            , EventShape.Edge
            , StreamEventOrder.FullyOrdered);
}

private Query GenerateOddQuery(Application app, CepStream<int> stream) {
    var filter = (from f in stream where (f % 2) == 1 select f);

    return filter.ToQuery(
            app
            , "OddQuery"
            , "Queries for odd numbers in stream."
            , EventShape.Point
            , StreamEventOrder.FullyOrdered);
}

private Query GenerateEverythingQuery(Application app, CepStream<int> stream) {
    var everythingFilter = from e in stream select e;

    return everythingFilter.ToQuery(
            app
            , "EverythingQuery"
            , "Queries everything from the input stream."
            , EventShape.Point
            , StreamEventOrder.FullyOrdered);
}

РЕШЕНИЕ:

Хотя я надеялся на что-то более сложное и потенциально более быстрое, отложенная обработка может помочь с производительностью.

public Program() {
    public Program() {
        ...
            var stream = CepStream<RandomNumber>.Create(
                    app
                    , "StaticInputStream"
                    , typeof(StaticInputAdapterFactory)
                    , new InputConfig()
                    , EventShape.Point);

            var processedStream = stream.Scan(new StreamMatcher());

            Query consoleQuery = GenerateConsoleOutputQuery(app, processedStream);
        ...
    }

    private Query GenerateConsoleOutputQuery(Application app, CepStream<int> stream) {
        var filter = from f in stream select f;

        return filter.ToQuery(
                app
                , "Console Output Query"
                , "Queries for messages to output to the console."
                , typeof(OutputAdapterFactory)
                , new OutputConfig()
                , EventShape.Point
                , StreamEventOrder.FullyOrdered);
    }

    public class StreamMatcher : CepPointStreamOperator<RandomNumber, int> {
        private List<int> unmatched = new List<int>();

        public override bool IsEmpty {
            get { return false; }
        }

        public override IEnumerable<int> ProcessEvent(PointEvent<RandomNumber> inputEvent) {
            if(inputEvent.Payload.value % 2 == 0) {
                unmatched.Add(inputEvent.Payload.value);
            } else {
                var result = inputEvent.Payload.value;

                int match = -1;

                try {
                    match = (from f in unmatched where f == result - 1 select f).Take(1).Single();
                    unmatched.Remove(match);
                } catch { }

                if(match > -1) {
                    result += match + 100;
                }

                yield return result;
            }
        }
    }
}

public class RandomNumber {
    public int value { get; set; }
    public DateTime timeStamp { get; set; }
}

person codeape    schedule 26.11.2012    source источник
comment
Добавлены дополнительные сведения и более полный код здесь: blog.techpire.com/   -  person codeape    schedule 23.12.2012


Ответы (1)


Вы можете рассмотреть возможность использования UDSO (пользовательский оператор потока), где вы можете сохранить состояние своих нулей.

void Main()
{
    var randomNumbers = new []
    {
        new RandomNumber(){ Value = 1, TimeStamp = DateTime.Parse("2012-01-01 10:01:00 AM")  },
        new RandomNumber(){ Value = 0, TimeStamp = DateTime.Parse("2012-01-01 10:02:00 AM")  },
        new RandomNumber(){ Value = 0, TimeStamp = DateTime.Parse("2012-01-01 10:02:00 AM")  },
        new RandomNumber(){ Value = 1, TimeStamp = DateTime.Parse("2012-01-01 10:03:00 AM")  },
        new RandomNumber(){ Value = 1, TimeStamp = DateTime.Parse("2012-01-01 10:04:00 AM")  },
        new RandomNumber(){ Value = 0, TimeStamp = DateTime.Parse("2012-01-01 10:05:00 AM")  },

    };

    var stream =  randomNumbers.ToPointStream(Application, 
            e=> PointEvent.CreateInsert(e.TimeStamp ,e), 
            AdvanceTimeSettings.IncreasingStartTime) ;

    var query = stream.Scan(new MyCalculation());

    query.Dump();   

}


public class MyCalculation : CepPointStreamOperator<RandomNumber,string>
{

    private Queue<int> _queue = new Queue<int>() ;

    public override bool IsEmpty
    {
        get { return false; }
    }

    public override IEnumerable<string> ProcessEvent(PointEvent<RandomNumber> inputEvent)
    {
        if (inputEvent.Payload.Value % 2 == 0)
        {
                _queue.Enqueue(inputEvent.Payload.Value);
        }
        else
        {
            var result= inputEvent.Payload.Value.ToString() ;
            var list = _queue.ToArray();
            for (int i = 0; i < list.Count(); i++)
            {
                result += list[i];
            }
             yield return result ;
        }
    }
}


public class RandomNumber 
{
    public int Value {get;set;}
    public DateTime TimeStamp {get;set;}
}
person Donaciano Rojas    schedule 27.11.2012
comment
Спасибо, я играю с этим прямо сейчас, чтобы увидеть, работает ли он для того, что мне действительно нужно. - person codeape; 27.11.2012