Consumer#seek(long timestamp)
позволяет сбросить подписку на заданную метку времени. После поиска потребитель начнет получать сообщения со временем публикации, равным или большим, чем временная метка, переданная методу seek
.
В приведенном ниже примере показано, как сбросить потребителя на предыдущий час:
try (
// Create PulsarClient
PulsarClient client = PulsarClient
.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// Create Consumer subscription
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
.subscribe()
) {
// Seek consumer to previous hour
consumer.seek(Instant.now().minus( Duration.ofHours(1)).toEpochMilli());
while (true) {
final Message<String> msg = consumer.receive();
System.out.printf(
"Message received: key=%s, value=%s, topic=%s, id=%s%n",
msg.getKey(),
msg.getValue(),
msg.getTopicName(),
msg.getMessageId().toString());
consumer.acknowledge(msg);
}
}
Обратите внимание, что если у вас есть несколько потребителей, принадлежащих к одному индексу (например, Key_Shared), то все потребители будут сброшены.
person
fhussonnois
schedule
29.10.2020