Пользовательская оболочка журнала

Я пытаюсь создать пользовательскую оболочку для регистрации времени выполнения, затрачиваемого на обработку каждого сообщения, выполняемого каждым вершинным процессором.

Используя DiagnosticProcessors.peekInputP/peekOutputP и процессор.PeekWrappedP в качестве вдохновения, я получил следующие коды:

public final class LogEngine {

    private LogEngine() {
    }

    @Nonnull
    public static <T> ProcessorMetaSupplier logP(@Nonnull ProcessorMetaSupplier wrapped) {
        return new WrappingProcessorMetaSupplier(wrapped, p ->
                new LogWrappedP<>(p));
    }

    @Nonnull
    public static <T> ProcessorSupplier logP(@Nonnull ProcessorSupplier wrapped) {
        return new WrappingProcessorSupplier(wrapped, p ->
                new LogWrappedP<>(p));
    }

    @Nonnull
    public static <T> DistributedSupplier<Processor> logP(@Nonnull DistributedSupplier<Processor> wrapped) {
        return () -> new LogWrappedP<>(wrapped.get());
    }    

}

И

public final class LogWrappedP<T> implements Processor {

    private final Processor wrappedProcessor;
    private static Logger logger;

    private ProcCtx ctx;

    public LogWrappedP(@Nonnull Processor wrappedProcessor) {
        checkNotNull(wrappedProcessor, "wrappedProcessor");

        this.wrappedProcessor = wrappedProcessor;
    }

    @Override
    public void init(@Nonnull Outbox outbox, @Nonnull Context context) {

        // Fix issue #595: pass a logger with real class name to processor
        // We do this only if context is ProcCtx (that is, not for tests where TestProcessorContext can be used
        // and also other objects could be mocked or null, such as jetInstance())
        if (context instanceof ProcCtx) {
            ProcCtx c = (ProcCtx) context;
            NodeEngine nodeEngine = ((HazelcastInstanceImpl) c.jetInstance().getHazelcastInstance()).node.nodeEngine;
            ILogger newLogger = nodeEngine.getLogger(
                    createLoggerName(wrappedProcessor.getClass().getName(), c.vertexName(), c.globalProcessorIndex()));
            ctx = new ProcCtx(c.jetInstance(), c.getSerializationService(), newLogger, c.vertexName(),
                    c.globalProcessorIndex(), c.processingGuarantee());
        }

        logger = LogManager.getLogger(wrappedProcessor.getClass().getName());

        wrappedProcessor.init(outbox, ctx);
    }

    @Override
    public boolean isCooperative() {
        return wrappedProcessor.isCooperative();
    }

    @Override
    public void process(int ordinal, @Nonnull Inbox inbox) {
        KafkaMessage msg = (KafkaMessage) inbox.peek();

        logger.info("START {} {} {} {} {}", ctx.vertexName(), getProcessorId(), getInstanceHost(), msg.getUuid(), Instant.now().toEpochMilli());

        wrappedProcessor.process(ordinal, inbox);

        logger.info("END {} {} {} {} {}", ctx.vertexName(), getProcessorId(), getInstanceHost(), msg.getUuid(), Instant.now().toEpochMilli());
    }

    @Override
    public boolean tryProcess() {
        return wrappedProcessor.tryProcess();
    }

    @Override
    public boolean complete() {
        return wrappedProcessor.complete();
    }

    @Override
    public boolean completeEdge(int ordinal) {
        return wrappedProcessor.completeEdge(ordinal);
    }

    @Override
    public boolean saveToSnapshot() {
        return wrappedProcessor.saveToSnapshot();
    }

    @Override
    public void restoreFromSnapshot(@Nonnull Inbox inbox) {
        wrappedProcessor.restoreFromSnapshot(inbox);
    }

    @Override
    public boolean finishSnapshotRestore() {
        return wrappedProcessor.finishSnapshotRestore();
    }

    protected int getProcessorId() {
        return ctx.globalProcessorIndex();
    }

    protected String getInstanceUUID() {
        return ctx.jetInstance().getCluster().getLocalMember().getUuid();
    }

    protected String getInstanceHost() {
        return ctx.jetInstance().getCluster().getLocalMember().getAddress().getHost();
    }    

}

Теперь я могу использовать свою оболочку в своей вершине app:

Vertex kafkaSource = dag.newVertex("kafkaSource", streamKafkaP(properties, decodeKafkaMessage, topic))
        .localParallelism(2);

Vertex app = dag.newVertex("app", LogEngine.logP(ProcessFrameP::new))
        .localParallelism(2);

И получить ожидаемые результаты,

2018-02-18 08:47:04,024 INFO START app 1 172.21.0.1 bc407e15-e78e-4734-822d-1172485e6632 1518954424024
2018-02-18 08:47:04,108 INFO END app 1 172.21.0.1 bc407e15-e78e-4734-822d-1172485e6632 1518954424108
2018-02-18 08:47:04,681 INFO START app 1 172.21.0.1 82e38e7e-73b7-4729-8d28-4f7fc87700ad 1518954424681
2018-02-18 08:47:04,710 INFO END app 1 172.21.0.1 82e38e7e-73b7-4729-8d28-4f7fc87700ad 1518954424710
2018-02-18 08:47:05,524 INFO START app 1 172.21.0.1 16633f77-8af5-4ab1-b94a-6192022f904f 1518954425524
2018-02-18 08:47:05,551 INFO END app 1 172.21.0.1 16633f77-8af5-4ab1-b94a-6192022f904f 1518954425551
2018-02-18 08:47:06,518 INFO START app 1 172.21.0.1 29622922-4987-44d4-8def-186b415c8fa9 1518954426518
2018-02-18 08:47:06,533 INFO END app 1 172.21.0.1 29622922-4987-44d4-8def-186b415c8fa9 1518954426533
2018-02-18 08:47:07,457 INFO START app 1 172.21.0.1 ce016601-d7be-4382-bc81-1d6a75e8748b 1518954427457
2018-02-18 08:47:07,475 INFO END app 1 172.21.0.1 ce016601-d7be-4382-bc81-1d6a75e8748b 1518954427475
2018-02-18 08:47:08,358 INFO START app 1 172.21.0.1 6a0be934-3eb6-4e46-9f08-76c072304de6 1518954428358
2018-02-18 08:47:08,379 INFO END app 1 172.21.0.1 6a0be934-3eb6-4e46-9f08-76c072304de6 1518954428379

Проблема в том, что оболочка не работает для вершины kafkaSource. Я пытался понять логику peekOutputP, но не смог получить работающую версию исходной вершины.

Что нужно сделать, чтобы обертка работала и в исходной вершине?


person Kleyson Rios    schedule 18.02.2018    source источник


Ответы (1)


Источник Kafka не обрабатывает элементы — он их испускает. Вот почему метод process не вызывается.

Однако ваш метод process также неверен. Метод process не обязан обрабатывать только один элемент из папки «Входящие»: он может обрабатывать много или даже ноль элементов. Вы можете использовать код, аналогичный LoggingInbox, но вы будете знать только, что элемент был взят, но вы не знаете, когда началась и закончилась обработка.

Выполнимым методом может быть обертка AbstractProcessor, которая является базовым классом для большинства процессоров Jet. Он делает метод process final и позволяет только своим подклассам переопределять методы tryProcess(item). Этот метод обрабатывает только один элемент: но он может вернуть false, если он не закончил его обработку и хочет, чтобы его снова вызвали с тем же элементом.

Вот код:

/**
 * A processor that wraps another AbstractProcessor and measures 
 * time spent to process each item.
 */
public class MeasureTimeP extends AbstractProcessor {

    private final AbstractProcessor wrapped;
    private long spentTime;
    private TestInbox tmpInbox = new TestInbox();

    private MeasureTimeP(Processor wrapped) {
        this.wrapped = (AbstractProcessor) wrapped;
    }

    @Override
    protected boolean tryProcess(int ordinal, @Nonnull Object item) {
        // if tmpInbox is not empty, we are continuing with an item that was processed before
        if (tmpInbox.isEmpty()) {
            tmpInbox.add(item);
        } else {
            assert tmpInbox.peek().equals(item);
        }
        spentTime -= System.nanoTime();
        // process the item
        wrapped.process(ordinal, tmpInbox);
        spentTime += System.nanoTime();
        // if tmpInbox is now empty, the item was processed
        if (tmpInbox.isEmpty()) {
            getLogger().info("Processed item in " + NANOSECONDS.toMillis(spentTime) + " ms: " + item);
            spentTime = 0;
            return true;
        }
        return false;
    }

    @Nonnull
    public static ProcessorMetaSupplier measureTimeP(@Nonnull ProcessorMetaSupplier wrapped) {
        return new WrappingProcessorMetaSupplier(wrapped, MeasureTimeP::new);
    }

    @Nonnull
    public static ProcessorSupplier measureTimeP(@Nonnull ProcessorSupplier wrapped) {
        return new WrappingProcessorSupplier(wrapped, MeasureTimeP::new);
    }

    @Nonnull
    public static DistributedSupplier<Processor> measureTimeP(@Nonnull DistributedSupplier<Processor> wrapped) {
        return () -> new MeasureTimeP(wrapped.get());
    }

    // ######
    // Here is the rest of AbstractProcessor's methods where we'll just delegate
    // to the wrapped processor. You can add time measurement to those too.

    @Override
    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        return wrapped.tryProcessWatermark(watermark);
    }

    @Override
    public boolean isCooperative() {
        return wrapped.isCooperative();
    }

    @Override
    protected void init(@Nonnull Context context) throws Exception {
        wrapped.init(getOutbox(), context);
    }

    @Override
    protected void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value) {
        assert tmpInbox.isEmpty();
        tmpInbox.add(entry(key, value));
        wrapped.restoreFromSnapshot(tmpInbox);
        assert tmpInbox.isEmpty();
    }

    @Override
    public boolean tryProcess() {
        return wrapped.tryProcess();
    }

    @Override
    public boolean completeEdge(int ordinal) {
        return wrapped.completeEdge(ordinal);
    }

    @Override
    public boolean complete() {
        return wrapped.complete();
    }

    @Override
    public boolean saveToSnapshot() {
        return wrapped.saveToSnapshot();
    }

    @Override
    public boolean finishSnapshotRestore() {
        return wrapped.finishSnapshotRestore();
    }
}

И тест:

public static void main(String[] args) {
    JetInstance instance = Jet.newJetInstance();

    IList<Integer> list = instance.getList("list");
    for (int i = 0; i < 50; i++) {
        list.add(i);
    }

    // we'll use this mapper to simulate slow operation. It will sleep according to the item value.
    DistributedFunction<Integer, Integer> mapFn = processTime -> {
        LockSupport.parkNanos(MILLISECONDS.toNanos(processTime));
        return processTime;
    };

    DAG dag = new DAG();
    Vertex source = dag.newVertex("source", readListP("list"));
    Vertex process = dag.newVertex("process", measureTimeP(mapP(mapFn)));
    dag.edge(between(source, process));

    instance.newJob(dag).join();

    instance.shutdown();
}
person Oliv    schedule 19.02.2018
comment
привет @Oliv, я видел LoggingOutbox, но мне не удалось получить рабочую версию для моего случая, я попробую еще раз. Для LogWrappedP.process() я попытался использовать свой регистратор в методе tryProcess(), но регистратор вызывался сотни раз, даже без каких-либо сообщений в папке «Входящие». Имея регистратор в методе process(), я получил результат, который ранее ожидал от tryProcess(). - person Kleyson Rios; 20.02.2018
comment
Я упустил момент в вашем вопросе: вы пытаетесь измерить время обработки элемента. Я заменил ответ. - person Oliv; 22.02.2018
comment
Привет @Oliv, похоже, этот код несовместим с Jet 0.5.1, который я использую. Ваш пример кода работает только с элементами почтового ящика, верно? Мне также нужно регистрировать информацию для KafkaSource, и, как вы объяснили ранее, источник ввода не будет запускать tryProcess(). Какие изменения я должен сделать, чтобы эта оболочка также регистрировала информацию для вершины KafkaSource? - person Kleyson Rios; 25.02.2018
comment
сейчас использую 0.6-SNAPSHOT для совместимости, но пока не знаю, как регистрировать информацию для каждого исходного элемента ввода. - person Kleyson Rios; 25.02.2018
comment
Как я уже сказал, мне непонятно, как измерить время обработки элемента для источника Kafka. Это продолжительность чего? Если вы просто хотите увидеть испускаемые предметы, используйте стандартные peekOutputP: Vertex source = dag.newVertex("source", peekOutputP(streamKafkaP(...))); - person Oliv; 26.02.2018
comment
Привет @Oliv, как видно из моего примера, я не рассчитываю время для обработки каждого элемента. Я просто записываю отметку времени начала и конца каждого выполнения, и это то, что я хотел бы иметь в исходном коде Kafka. Эти журналы будут отправлены во внешнюю систему для расчета многих показателей. И именно по этой причине я хотел бы регистрировать отметку времени, когда источник ввода передает элемент нижестоящим процессорам. Я попытаюсь просмотреть peekOutputP(). - person Kleyson Rios; 26.02.2018
comment
peekOutputP использует регистратор, он регистрирует время эмиссии, если настроено. Вы неправильно регистрируете начало и конец, вы не можете осмысленно использовать inbox.peek() в методе process(inbox). - person Oliv; 27.02.2018