SubscriptionClient с проблемой сеансов: java.lang.UnsupportedOperationException

Использование тем и подписок шины служебных сообщений Azure.

Создал тему aaaa и подписку zzzsubscription - с учетом сеанса.

Использование приведенного ниже кода для отправки сообщений в тему с идентификаторами сеанса:

String senderString="Endpoint=sb://xxxx.servicebus.windows.net/;SharedAccessKeyName=sampler-sender-only-policy;SharedAccessKey=mmmm;EntityPath=aaaa";
ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder(senderString,
                "aaaa");
TopicClient client=new TopicClient(connectionStringBuilder);
for(int session =1 ;session<=10;session++){
        for(int message =1 ;message<=10;message++){
                Message sendMessage=new Message("message "+message);
                sendMessage.setMessageId(UUID.randomUUID().toString());
                sendMessage.setSessionId("Session "+session );
                client.sendAsync(sendMessage);
        }
}

Используя приведенный ниже код для чтения сообщений из подписки:

String listenerString = "Endpoint=sb://xxxx.servicebus.windows.net/;SharedAccessKeyName=sample-listen-only-policy;SharedAccessKey=yyyy;EntityPath=aaaa";
ConnectionStringBuilder connectionStringBuilderListen = new ConnectionStringBuilder(listenerString,
                "aaaa/subscriptions/zzzsubscription");
SubscriptionClient subscriptionClient = new SubscriptionClient(connectionStringBuilderListen, ReceiveMode.PEEKLOCK);

subscriptionClient.registerSessionHandler(new ISessionHandler() {
        @Override
        public CompletableFuture<Void> onMessageAsync(IMessageSession session, IMessage message) {
                System.out.println(message.getSessionId()+" - "+ new String(message.getBody()));
                return subscriptionClient.completeAsync(message.getLockToken());
        }
        @Override
         public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
                System.out.printf(exceptionPhase + "-" + throwable.getMessage());
            }
        @Override
        public CompletableFuture<Void> OnCloseSessionAsync(IMessageSession session) {
                return subscriptionClient.closeAsync();
        }
});

Получение исключения ниже в строке return subscriptionClient.completeAsync(message.getLockToken());:

2018-02-23 11:05:15,471 [onPool-worker-2] - ERROR MessageAndSessionPump          - onMessage with message containing sequence number '15481123719086103' threw exception
java.lang.UnsupportedOperationException: Receiver not created. Registering a MessageHandler creates a receiver.
    at com.microsoft.azure.servicebus.MessageAndSessionPump.checkInnerReceiveCreated(MessageAndSessionPump.java:712)
    at com.microsoft.azure.servicebus.MessageAndSessionPump.completeAsync(MessageAndSessionPump.java:636)
    at com.microsoft.azure.servicebus.SubscriptionClient.completeAsync(SubscriptionClient.java:208)
    at com.microsoft.azure.servicebus.samples.topicsgettingstarted.TopicsGettingStarted$1.onMessageAsync(TopicsGettingStarted.java:31)

Используя следующую версию пакета SDK для служебной шины Azure:

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-servicebus</artifactId>
    <version>1.1.1</version>
</dependency>

Изменить 1:

К приведенному выше коду, если я добавлю следующее:

SessionHandlerOptions sessionHandlerOptions = new SessionHandlerOptions(1, true, Duration.ofMinutes(5));
subscriptionClient.registerSessionHandler(new ISessionHandler() {...}, sessionHandlerOptions);

и измените возвращение в onMessageAsync на следующее:

return CompletableFuture.completedFuture(null);

Затем я получаю следующие ошибки. Получены некоторые сообщения, но я получаю следующую ошибку, и программа зависает:

2018-02-23 11:37:17,814 [24-7360c77df5c2] - ERROR RequestResponseLink            - Opening internal send link of requestresponselink to sample-order-status/subscriptions/sample-hybris-order-status-subscription/$management failed.
com.microsoft.azure.servicebus.primitives.AuthorizationFailedException: Unauthorized access. 'Send' claim(s) are required to perform this operation. Resource: 'sb://ashokgoli.servicebus.windows.net/sample-order-status/subscriptions/sample-hybris-order-status-subscription/$management'. TrackingId:cbe7e9aa831a455abdaa17aacb814497_G27, SystemTracker:gateway6, Timestamp:2/23/2018 5:37:17 PM
    at com.microsoft.azure.servicebus.primitives.ExceptionUtil.toException(ExceptionUtil.java:50)
    at com.microsoft.azure.servicebus.primitives.RequestResponseLink$InternalSender.onClose(RequestResponseLink.java:743)
    at com.microsoft.azure.servicebus.amqp.BaseLinkHandler.processOnClose(BaseLinkHandler.java:68)
    at com.microsoft.azure.servicebus.amqp.BaseLinkHandler.onLinkRemoteClose(BaseLinkHandler.java:42)
    at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176)
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:309)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:276)
    at com.microsoft.azure.servicebus.primitives.MessagingFactory$RunReactor.run(MessagingFactory.java:451)
    at java.base/java.lang.Thread.run(Thread.java:844)

Решили проблему, указанную выше, добавив Отправить заявку в политику только для прослушивания следующим образом: https://github.com/Azure/azure-service-bus-java/issues/110

а также

2018-02-23 11:37:47,541 [nPool-worker-15] - ERROR RequestResponseLinkcache       - Creating requestresponselink to 'sample-order-status/subscriptions/sample-hybris-order-status-subscription/$management' failed.
com.microsoft.azure.servicebus.primitives.TimeoutException: Open operation on RequestResponseLink(05a91c-RequestResponse) on Entity(sample-order-status/subscriptions/sample-hybris-order-status-subscription/$management) timed out at 2018-02-23T11:37:47.540528-06:00[America/Chicago].
    at com.microsoft.azure.servicebus.primitives.RequestResponseLink$1.run(RequestResponseLink.java:77)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
    at java.base/java.lang.Thread.run(Thread.java:844)



Ответы (1)


вернуть будущее на OnCloseSessionAsync. Вы можете обратиться к ссылке ниже для справки.

https://github.com/Azure/azure-service-bus-java/issues/187

person Abhishek Kumar    schedule 02.03.2018