Постановка задачи
Я написал программу, которая использует DynamoDb Streams для получения уведомлений об обновлении, этот код отлично работает при использовании сервисов AWS, но, похоже, не работает с использованием Testcontainers / Localstack для моих интеграционных тестов.
Что было сделано
Пытался вернуться к более старой версии testcontainer / localstack
Прочтите эту статью, в конце которого есть комментарий, который вроде подразумевает, что в KCL 1.x отсутствуют некоторые API, которые не позволяют ему работать с Localstack
Однако клиентская библиотека Kinesis-1.x не предоставляет возможности указывать URL-адрес конечной точки сервиса AWS CloudWatch в качестве параметра конфигурации.
Я считаю, что адаптер DynamoDb Kinesis внутренне использует KCL 1.x, поэтому не думаю, что смогу переключиться на использование KCL 2.x. Кстати, похоже, что DynamoDb Kinesis Adapter был заархивирован, но Amazon Docs по-прежнему ссылается на него, и в этом репозитории git нет ничего, что указывало бы, почему он был заархивирован или что использовать вместо этого.
Что происходит?
Моя программа работает нормально, я не вижу ошибок, но я также не получаю никакой информации об обновлениях ни от каких шардов.
Дизайн приложений
Обычно запускаются тесты интеграции, он создает необходимые таблицы в LocalStack (подтверждено, что это делается через интерфейс командной строки AWS), а затем помещает 3 элемента в DynamoDb. Приложение Spring Boot запускается и считывает данные из DynamoDb в список. Затем тест интеграции вызывает конечную точку удаления, конечная точка удаления просто вызывает функцию CrudRepository.delete (реализованную через Spring Data DynamoDb). Я подтвердил, что фактический DynamoDb увеличился с 3 элементов до 2, при этом удаленный элемент был удален. Однако кеш приложений должен обновляться, когда мое приложение получает обновленные записи из KCL; этого никогда не происходит с TreatContainers / localStack.
Приложение использует InitialPositionInStream.LATEST
для чтения осколков.
Вывод приложения
Вы можете увидеть, что приложение запускается и готово в 13:42:17.468
, элемент удаляется в 13:42:24.768
. Затем тест вызывает Thread.sleep (1000), чтобы дать KCL время обработать любые изменения, а затем в 13:42:25.793
тест вызывает сервер, чтобы проверить, присутствует ли элемент все еще в кэше, что так и есть.
Думая, что это может быть проблема времени, когда LocalStack обрабатывается медленно, я добавил в свои тесты функцию @AfterAll, которая ожидает в течение 2 минут. Я использовал curl
для вызова конечной точки get на 13:44:38.723
, и элемент все еще существует, и у Localstack должно было быть достаточно времени для обработки обновлений для шардов.
2021-04-05 13:42:07.712 INFO 11414 --- [ Test worker] c.c.u.d.s.processor.MyKclProcessor : KCL shard lease table, test-table, exists and is ACTIVE
2021-04-05 13:42:07.770 INFO 11414 --- [ Test worker] c.c.u.d.s.processor.MyKclProcessor : KCL shard lease table, kcl-shard-lease-lock, exists and is ACTIVE
2021-04-05 13:42:07.772 INFO 11414 --- [ Test worker] c.c.u.d.s.processor.MyKclProcessor : Describing table=test-table
2021-04-05 13:42:07.821 INFO 11414 --- [ Test worker] c.c.u.d.s.processor.MyKclProcessor : Got description for table=test-table
2021-04-05 13:42:07.822 INFO 11414 --- [ Test worker] c.c.u.d.s.processor.MyKclProcessor : Got stream arn (arn:aws:dynamodb:us-east-1:000000000000:table/test-table/stream/2021-04-05T17:41:21.860) for table=test-table with tableArn=arn:aws:dynamodb:us-east-1:000000000000:table/test-table
2021-04-05 13:42:07.904 INFO 11414 --- [ Test worker] c.c.u.d.s.processor.MyKclProcessor : Creating KCL worker
2021-04-05 13:42:07.938 INFO 11414 --- [ Test worker] c.a.s.k.leases.impl.LeaseCoordinator : With failover time 10000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 3308 ms, takeleases every 20050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time.
2021-04-05 13:42:07.941 INFO 11414 --- [ Test worker] c.a.s.k.clientlibrary.lib.worker.Worker : Shard sync strategy determined as SHARD_END.
2021-04-05 13:42:07.941 INFO 11414 --- [ Test worker] c.c.u.d.s.processor.MyKclProcessor : KCL Worker created!
2021-04-05 13:42:07.944 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Initialization attempt 1
2021-04-05 13:42:07.945 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Initializing LeaseCoordinator
2021-04-05 13:42:14.525 INFO 11414 --- [ Test worker] c.c.u.d.s.controller.TestItemController : *** TestItemController Started ***
2021-04-05 13:42:14.650 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Syncing Kinesis shard info
2021-04-05 13:42:14.656 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : syncShardLeases: begin
2021-04-05 13:42:14.656 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : getShardList: begin
2021-04-05 13:42:14.757 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : getShardList: done
2021-04-05 13:42:14.779 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : determineNewLeasesToCreate: begin
2021-04-05 13:42:14.781 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : determineNewLeasesToCreate: done
2021-04-05 13:42:14.863 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : cleanupGarbageLeases: begin
2021-04-05 13:42:14.863 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : cleanupGarbageLeases: done
2021-04-05 13:42:14.863 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : cleanupLeasesOfFinishedShards: begin
2021-04-05 13:42:14.863 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : cleanupLeasesOfFinishedShards: done
2021-04-05 13:42:14.863 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : syncShardLeases: done
2021-04-05 13:42:14.866 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Starting LeaseCoordinator
2021-04-05 13:42:14.900 INFO 11414 --- [cessingThread-0] c.a.s.kinesis.leases.impl.LeaseRenewer : Worker Test-application-lMtQuWkZeFmq+ found lease {
"leaseKey" : "shardId-00000001617600000000-000000000000",
"leaseOwner" : "Test-application-lMtQuWkZeFmq+",
"leaseCounter" : 0,
"concurrencyToken" : null,
"lastCounterIncrementNanos" : null,
"checkpoint" : {
"sequenceNumber" : "LATEST",
"subSequenceNumber" : 0
},
"pendingCheckpoint" : null,
"ownerSwitchesSinceCheckpoint" : 0,
"parentShardIds" : [ ]
}
2021-04-05 13:42:14.949 WARN 11414 --- [cessingThread-0] c.a.s.k.metrics.impl.MetricsHelper : No metrics scope set in thread KCLProcessingThread-0, getMetricsScope returning NullMetricsScope.
2021-04-05 13:42:15.011 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:42:15.011 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : TakeLeases took 0 seconds.
2021-04-05 13:42:16.709 INFO 11414 --- [ Test worker] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2021-04-05 13:42:17.309 INFO 11414 --- [ Test worker] o.s.b.a.e.web.EndpointLinksResolver : Exposing 2 endpoint(s) beneath base path '/actuator'
2021-04-05 13:42:17.451 INFO 11414 --- [ Test worker] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 50376 (http) with context path ''
2021-04-05 13:42:17.464 INFO 11414 --- [ Test worker] d.d.r.u.Entity2DynamoDBTableSynchronizer : Checking repository classes with DynamoDB tables test-table for ContextRefreshedEvent
2021-04-05 13:42:17.468 INFO 11414 --- [ Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp : Started DynamoDbStreamsTestApp in 54.763 seconds (JVM running for 77.176)
2021-04-05 13:42:17.872 INFO 11414 --- [ Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp : *** ASK SERVER TO DELETE ITEM 23456 ***
2021-04-05 13:42:17.996 INFO 11414 --- [o-auto-1-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-04-05 13:42:17.996 INFO 11414 --- [o-auto-1-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2021-04-05 13:42:18.019 INFO 11414 --- [o-auto-1-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 23 ms
2021-04-05 13:42:18.068 INFO 11414 --- [o-auto-1-exec-1] c.c.u.d.s.controller.TestItemController : Deleting itemNumber=23456
2021-04-05 13:42:21.670 INFO 11414 --- [o-auto-1-exec-1] c.c.u.d.s.controller.TestItemController : Found inventory item to delete
2021-04-05 13:42:24.768 INFO 11414 --- [o-auto-1-exec-1] c.c.u.d.s.controller.TestItemController : Item deleted
2021-04-05 13:42:24.789 INFO 11414 --- [ Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp : *** SERVER SAYS ITEM 23456 DELETED ***
2021-04-05 13:42:24.954 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Initialization complete. Starting worker loop.
2021-04-05 13:42:24.970 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Created new shardConsumer for : ShardInfo [shardId=shardId-00000001617600000000-000000000000, concurrencyToken=b773dd9e-d385-44cd-8189-3cf330b94351, parentShardIds=[], checkpoint={SequenceNumber: LATEST,SubsequenceNumber: 0}]
2021-04-05 13:42:24.972 INFO 11414 --- [dProcessor-0000] c.a.s.k.c.l.w.BlockOnParentShardTask : No need to block on parents [] of shard shardId-00000001617600000000-000000000000
2021-04-05 13:42:25.793 INFO 11414 --- [ Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp : *** VERIFY ITEM 23456 WAS DELETED OR NOT ***
2021-04-05 13:42:31.304 INFO 11414 --- [o-auto-1-exec-2] c.c.u.d.s.controller.TestItemController : Getting itemNumber=23456
2021-04-05 13:42:34.586 INFO 11414 --- [o-auto-1-exec-2] c.c.u.d.s.controller.TestItemController : Item=TestItem{itemNumber='23456', description='A doo', price=10.99}
2021-04-05 13:42:34.589 INFO 11414 --- [dProcessor-0000] c.a.s.k.c.lib.worker.KinesisDataFetcher : Initializing shard shardId-00000001617600000000-000000000000 with LATEST
2021-04-05 13:42:34.611 ERROR 11414 --- [ Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp : *** SERVER STILL HAS ITEM 23456 ***
2021-04-05 13:42:34.663 WARN 11414 --- [ Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp : Giving system 2 MINUTES before shut down
2021-04-05 13:42:35.088 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:42:35.088 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : TakeLeases took 0 seconds.
2021-04-05 13:42:55.171 INFO 11414 --- [oordinator-0001] c.a.s.d.s.leases.StreamsLeaseTaker : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:42:55.171 INFO 11414 --- [oordinator-0001] c.a.s.d.s.leases.StreamsLeaseTaker : TakeLeases took 0 seconds.
2021-04-05 13:43:08.731 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Current stream shard assignments: shardId-00000001617600000000-000000000000
2021-04-05 13:43:08.732 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Sleeping ...
2021-04-05 13:43:15.248 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:43:15.248 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : TakeLeases took 0 seconds.
2021-04-05 13:43:35.322 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:43:35.323 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : TakeLeases took 0 seconds.
2021-04-05 13:43:55.399 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:43:55.399 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : TakeLeases took 0 seconds.
2021-04-05 13:44:09.937 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Current stream shard assignments: shardId-00000001617600000000-000000000000
2021-04-05 13:44:09.937 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Sleeping ...
2021-04-05 13:44:15.470 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:44:15.470 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : TakeLeases took 0 seconds.
2021-04-05 13:44:35.550 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:44:35.551 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : TakeLeases took 0 seconds.
2021-04-05 13:44:38.723 INFO 11414 --- [o-auto-1-exec-5] c.c.u.d.s.controller.TestItemController : Getting itemNumber=23456
2021-04-05 13:44:41.374 INFO 11414 --- [o-auto-1-exec-5] c.c.u.d.s.controller.TestItemController : Item=TestItem{itemNumber='23456', description='A doo', price=10.99}
Информация о конфигурации
- динамодб-потоки-кинезис-адаптер: 1.5.2
- амазонка-кинезис-клиент: 1.13.3
- junit-jupiter-api: 5.6.0
- юнит-юпитер: 1.15.2
- локальный стек: 1.15.2
- локальный стек: 0.12.9
- локальные утилиты: 0.2.10
- AWS-Java-SDK-Dynamodb: 1.11.858
- Spring-Boot-Starter-Web: 2.3.3.RELEASE
- Spring data Dynamodb (из форка boostchicken): 5.2.5
- MacOS Catalina: 10.15.7
- Java: 15.0.2
- Докер:
Client: Docker Engine - Community
Cloud integration: 1.0.9
Version: 20.10.5
API version: 1.41
Go version: go1.13.15
Git commit: 55c4c88
Built: Tue Mar 2 20:13:00 2021
OS/Arch: darwin/amd64
Context: default
Experimental: true
Server: Docker Engine - Community
Engine:
Version: 20.10.5
API version: 1.41 (minimum version 1.12)
Go version: go1.13.15
Git commit: 363e9a8
Built: Tue Mar 2 20:15:47 2021
OS/Arch: linux/amd64
Experimental: true
containerd:
Version: 1.4.3
GitCommit: 269548fa27e0089a8b8278fc4fc781d7f65a939b
runc:
Version: 1.0.0-rc92
GitCommit: ff819c7e9184c13b7c2607fe6c30ae19403a7aff
docker-init:
Version: 0.19.0
GitCommit: de40ad0
Kubernetes:
Version: Unknown
StackAPI: Unknown