Как реализовать рабочий поток, который будет обрабатывать записи Kinesis и обновлять графический интерфейс в javaFx?

Я работаю над приложением для мониторинга микросервисов. Мое приложение предполагает соответствующее обновление графического интерфейса при получении новой потребляемой записи, что означает: Когда я получаю новую запись:

1) Я проверяю, является ли запрос, который он представляет, частью легального потока, и есть ли у этого потока уже представление в графическом интерфейсе. Под представлением я подразумеваю набор кругов, представляющих полный поток. Например, если я получаю транзакцию (полученный запрос MS1), допустимый поток номер 1: то есть от MS1 до MS2 до MS3, поэтому мой графический интерфейс добавит столбец таблицы с двумя серыми кругами: от MS1 до MS2 и от MS2 до MS3. Далее, когда запись: MS2, полученная от MS1, будет израсходована, я закрашу первый круг зеленым и так далее.

Моя проблема: я не понимаю, как «подключиться» к коду Amazon KCL (представленному здесь). это означает, что я не знаю, как сделать, чтобы потребляемая запись вызывала событие в моем графическом интерфейсе JavaFX, которое соответствующим образом обновляло графический интерфейс.

Помощь будет высоко оценена!

пакет com.kinesisdataconsumer;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.DATA_STATUS;
import com.DataBase;
import com.MonitoringLogicImpl;
import com.kinesisdataproducer.Producer;
import com.Transaction;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;

public class Consumer implements IRecordProcessorFactory {
    private static final Logger log = LoggerFactory.getLogger(Consumer.class);
    public DataBase dataBase;
    public ArrayList<Transaction> transactionList;
    public MonitoringLogicImpl monitoringLogic;

    private final AtomicLong largestTimestamp = new AtomicLong(0);

    private final List<Long> sequenceNumbers = new ArrayList<>();

    private final Object lock = new Object();

    public Consumer(DataBase database, ArrayList<Transaction> transactions, MonitoringLogicImpl monitoringLogicImplementation){
        dataBase = database;
        transactionList = transactions;
        monitoringLogic = monitoringLogicImplementation;
    }

    private class RecordProcessor implements IRecordProcessor {
        @Override
        public void initialize(String shardId) {}

        @Override
        public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
            long timestamp = 0;
            List<Long> seqNos = new ArrayList<>();

            for (Record r : records) {

                timestamp = Math.max(timestamp, Long.parseLong(r.getPartitionKey()));

                try {
                    byte[] b = new byte[r.getData().remaining()];
                    r.getData().get(b);
                    seqNos.add(Long.parseLong(new String(b, "UTF-8").split("#")[0]));

                    //this thread adds the transaction to the DB
                    Thread addTransactionToDBThread = new Thread() {
                        public void run() {
                            try {
                                JSONObject jsonObj = new JSONObject(new String(b, "UTF-8").split("#")[1]);
                                Transaction transaction = Transaction.convertJsonToTransaction(jsonObj);
                                //add the transaction to the database
                                dataBase.addTransactionToDB(transaction);
                                //update the user-interface about the last transaction in the system
                                DATA_STATUS transactionStatus = monitoringLogic.getStatus(transaction);
                                monitoringLogic.updateUI(transaction.getUuid(), transaction.getSender(), transaction.getReceiver(), transactionStatus);
                                Thread.sleep(1000);
                            } catch(InterruptedException e) {
                                e.printStackTrace();
                            } catch (UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                        }
                    };
                    addTransactionToDBThread.start();
                } catch (Exception e) {
                    log.error("Error parsing record", e);
                    System.exit(1);
                }
            }

            synchronized (lock) {
                if (largestTimestamp.get() < timestamp) {
                    log.info(String.format(
                            "Found new larger timestamp: %d (was %d), clearing state",
                            timestamp, largestTimestamp.get()));
                    largestTimestamp.set(timestamp);
                    sequenceNumbers.clear();
                }

                // Only add to the shared list if our data is from the latest run.
                if (largestTimestamp.get() == timestamp) {
                    sequenceNumbers.addAll(seqNos);
                    Collections.sort(sequenceNumbers);
                }
            }
            try {
                checkpointer.checkpoint();
            } catch (Exception e) {
                log.error("Error while trying to checkpoint during ProcessRecords", e);
            }
        }

        @Override
        public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
            log.info("Shutting down, reason: " + reason);
            try {
                checkpointer.checkpoint();
            } catch (Exception e) {
                log.error("Error while trying to checkpoint during Shutdown", e);
            }
        }
    }

    /**
     * Log a message indicating the current state.
     */
    public void logResults() {
        synchronized (lock) {
            if (largestTimestamp.get() == 0) {
                return;
            }

            if (sequenceNumbers.size() == 0) {
                log.info("No sequence numbers found for current run.");
                return;
            }

            // The producer assigns sequence numbers starting from 1, so we
            // start counting from one before that, i.e. 0.
            long last = 0;
            long gaps = 0;
            for (long sn : sequenceNumbers) {
                if (sn - last > 1) {
                    gaps++;
                }
                last = sn;
            }

            log.info(String.format(
                    "Found %d gaps in the sequence numbers. Lowest seen so far is %d, highest is %d",
                    gaps, sequenceNumbers.get(0), sequenceNumbers.get(sequenceNumbers.size() - 1)));
        }
    }

    @Override
    public IRecordProcessor createProcessor() {
        return this.new RecordProcessor();
    }

    public void consumeData() {
        KinesisClientLibConfiguration config =
                new KinesisClientLibConfiguration(
                        "KinesisProducerLibSampleConsumer",
                        Producer.STREAM_NAME,
                        new DefaultAWSCredentialsProviderChain(),
                        "KinesisProducerLibSampleConsumer")
                        .withRegionName(Producer.REGION)
                        .withInitialPositionInStream(InitialPositionInStream.LATEST);

        final Consumer consumer = new Consumer(dataBase, transactionList, monitoringLogic);

        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                consumer.logResults();
            }
        }, 10, 1, TimeUnit.SECONDS);

        new Worker.Builder()
                .recordProcessorFactory(consumer)
                .config(config)
                .build()
                .run();
    }
}

person Elon    schedule 10.06.2019    source источник
comment
Возможный дубликат: stackoverflow.com/ вопросы/56519703/   -  person Sedrick    schedule 11.06.2019
comment
Вы пытались просто создать что-то простое, чтобы убедиться, что информация перемещается? Затем предпринимайте шаги, чтобы добраться туда, куда вы пытаетесь попасть.   -  person Sedrick    schedule 11.06.2019