Пример публикации / подписки с помощью Azure Cosmos DB на Java
Home
Вопросы и ответы
Пример публикации / подписки с помощью Azure Cosmos DB на Java
Мне нужна система сообщений о событиях pub / sub с Azure Cosmos DB. Я использую пакет SDK для Java для Azure Cosmos DB v4.
Я пробую использовать ChangeFeedProcessor на основе этого образца https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples/blob/main/src/main/java/com/azure/cosmos/examples/changefeed/SampleChangeFeedProcessor.java , но он работает не так, как ожидалось.
Мои проблемы:
Сбор корма / рост контейнера продолжаются. Как я могу удалить событие после того, как все активные узлы получили событие?
Задержка событий кажется относительно большой. Около минуты.
Только один узел получает события. Это кажется интересным для балансировки нагрузки, но это не мой вариант использования.
person
Horcrux7
schedule
04.03.2021
source
источник
Ответы (1)
С версией 4.12.0 Java SDK у меня работает следующий фрагмент кода. Но он использует бета-код драйвера. Это может измениться в будущем.
private static final String CHANNEL = "events";
private CosmosContainer collection;
private boolean stopped;
void start( String clientID ) {
CosmosContainerProperties props = new CosmosContainerProperties( CHANNEL, "/type" );
// delete all events after 60 seconds. All nodes should receive it in the meantime.
props.setDefaultTimeToLiveInSeconds( 60 );
collection = getOrCreateContainer( props );
Thread thread = new Thread( () -> {
String[] continuation = new String[1];
try {
while( !stopped ) {
// sample code: https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java
CosmosChangeFeedRequestOptions options = continuation[0] == null ? //
CosmosChangeFeedRequestOptions.createForProcessingFromNow( FeedRange.forFullRange() ) : // initial value
CosmosChangeFeedRequestOptions.createForProcessingFromContinuation( continuation[0] ); // continue value
Iterator<EventPOJO> it = collection //
.queryChangeFeed( options, EventPOJO.class ) //
.handle( ( response ) -> continuation[0] = response.getContinuationToken() ) //
.iterator();
while( it.hasNext() ) {
EventPOJO event = it.next();
if( event.client != clientID ) {
// filter the own events
onMessage( event );
}
}
// poll interval
Thread.sleep( 1000 );
}
} catch( Throwable th ) {
if( !stopped ) {
PersistenceLogger.LOGGER.error( th );
}
}
}, CHANNEL );
thread.setDaemon( true );
thread.start();
}
<T> void send( T event, String clientID ) {
EventPOJO evt = new EventPOJO();
evt.id = ...
evt.client = clientID;
evt.type = event.getClass().getName();
evt.message = ...
collection.createItem( evt );
}
person
Horcrux7
schedule
06.03.2021