Пример публикации / подписки с помощью Azure Cosmos DB на Java

Мне нужна система сообщений о событиях pub / sub с Azure Cosmos DB. Я использую пакет SDK для Java для Azure Cosmos DB v4.

Я пробую использовать ChangeFeedProcessor на основе этого образца https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples/blob/main/src/main/java/com/azure/cosmos/examples/changefeed/SampleChangeFeedProcessor.java, но он работает не так, как ожидалось.

Мои проблемы:

  • Сбор корма / рост контейнера продолжаются. Как я могу удалить событие после того, как все активные узлы получили событие?
  • Задержка событий кажется относительно большой. Около минуты.
  • Только один узел получает события. Это кажется интересным для балансировки нагрузки, но это не мой вариант использования.

comment
Почему в ленте изменений требуется Pub / Subs? Более того, вам нужно несколько процессоров в качестве подписчиков.   -  person singhh-msft    schedule 04.03.2021
comment
Я не понимаю вопроса. Мне нужен pub / sub, который отправляет событие всем подключенным узлам / хостам. Например, изменение конфигурации. Мне не нужна лента изменений. ChangeFeedProcessor - это API, который я попробовал, потому что он был предложен в некоторых комментариях и выглядит как решение.   -  person Horcrux7    schedule 04.03.2021
comment
Я не понимаю вашего вопроса. Ожидаете ли вы, что данные будут удалены из контейнера после чтения через канал изменений?   -  person Mark Brown    schedule 04.03.2021
comment
Да, после того, как событие отправлено на все узлы, это событие больше не имеет смысла. en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern   -  person Horcrux7    schedule 04.03.2021


Ответы (1)


С версией 4.12.0 Java SDK у меня работает следующий фрагмент кода. Но он использует бета-код драйвера. Это может измениться в будущем.

private static final String                CHANNEL = "events";

private CosmosContainer                    collection;

private boolean                            stopped;

void start( String clientID ) {
    CosmosContainerProperties props = new CosmosContainerProperties( CHANNEL, "/type" );
    // delete all events after 60 seconds. All nodes should receive it in the meantime.
    props.setDefaultTimeToLiveInSeconds( 60 );
    collection = getOrCreateContainer( props );
    Thread thread = new Thread( () -> {
        String[] continuation = new String[1];
        try {
            while( !stopped ) {
                // sample code: https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java
                CosmosChangeFeedRequestOptions options = continuation[0] == null ? //
                CosmosChangeFeedRequestOptions.createForProcessingFromNow( FeedRange.forFullRange() ) : // initial value
                CosmosChangeFeedRequestOptions.createForProcessingFromContinuation( continuation[0] ); // continue value
                Iterator<EventPOJO> it = collection //
                                .queryChangeFeed( options, EventPOJO.class ) //
                                .handle( ( response ) -> continuation[0] = response.getContinuationToken() ) //
                                .iterator();
                while( it.hasNext() ) {
                    EventPOJO event = it.next();
                    if( event.client != clientID ) {
                        // filter the own events
                        onMessage( event );
                    }
                }
                // poll interval
                Thread.sleep( 1000 );
            }
        } catch( Throwable th ) {
            if( !stopped ) {
                PersistenceLogger.LOGGER.error( th );
            }
        }
    }, CHANNEL );
    thread.setDaemon( true );
    thread.start();
}

<T> void send( T event, String clientID ) {
    EventPOJO evt = new EventPOJO();
    evt.id = ...
    evt.client = clientID;
    evt.type = event.getClass().getName();
    evt.message = ...

    collection.createItem( evt );
}
person Horcrux7    schedule 06.03.2021