Работают ли Testcontainers / LocalStack с DynamoDb Streams KCL 1.x?

Постановка задачи

Я написал программу, которая использует DynamoDb Streams для получения уведомлений об обновлении, этот код отлично работает при использовании сервисов AWS, но, похоже, не работает с использованием Testcontainers / Localstack для моих интеграционных тестов.

Что было сделано

Пытался вернуться к более старой версии testcontainer / localstack

Прочтите эту статью, в конце которого есть комментарий, который вроде подразумевает, что в KCL 1.x отсутствуют некоторые API, которые не позволяют ему работать с Localstack

Однако клиентская библиотека Kinesis-1.x не предоставляет возможности указывать URL-адрес конечной точки сервиса AWS CloudWatch в качестве параметра конфигурации.

Я считаю, что адаптер DynamoDb Kinesis внутренне использует KCL 1.x, поэтому не думаю, что смогу переключиться на использование KCL 2.x. Кстати, похоже, что DynamoDb Kinesis Adapter был заархивирован, но Amazon Docs по-прежнему ссылается на него, и в этом репозитории git нет ничего, что указывало бы, почему он был заархивирован или что использовать вместо этого.

Что происходит?

Моя программа работает нормально, я не вижу ошибок, но я также не получаю никакой информации об обновлениях ни от каких шардов.

Дизайн приложений

Обычно запускаются тесты интеграции, он создает необходимые таблицы в LocalStack (подтверждено, что это делается через интерфейс командной строки AWS), а затем помещает 3 элемента в DynamoDb. Приложение Spring Boot запускается и считывает данные из DynamoDb в список. Затем тест интеграции вызывает конечную точку удаления, конечная точка удаления просто вызывает функцию CrudRepository.delete (реализованную через Spring Data DynamoDb). Я подтвердил, что фактический DynamoDb увеличился с 3 элементов до 2, при этом удаленный элемент был удален. Однако кеш приложений должен обновляться, когда мое приложение получает обновленные записи из KCL; этого никогда не происходит с TreatContainers / localStack.

Приложение использует InitialPositionInStream.LATEST для чтения осколков.

Вывод приложения

Вы можете увидеть, что приложение запускается и готово в 13:42:17.468, элемент удаляется в 13:42:24.768. Затем тест вызывает Thread.sleep (1000), чтобы дать KCL время обработать любые изменения, а затем в 13:42:25.793 тест вызывает сервер, чтобы проверить, присутствует ли элемент все еще в кэше, что так и есть.

Думая, что это может быть проблема времени, когда LocalStack обрабатывается медленно, я добавил в свои тесты функцию @AfterAll, которая ожидает в течение 2 минут. Я использовал curl для вызова конечной точки get на 13:44:38.723, и элемент все еще существует, и у Localstack должно было быть достаточно времени для обработки обновлений для шардов.

2021-04-05 13:42:07.712  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : KCL shard lease table, test-table, exists and is ACTIVE
2021-04-05 13:42:07.770  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : KCL shard lease table, kcl-shard-lease-lock, exists and is ACTIVE
2021-04-05 13:42:07.772  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : Describing table=test-table
2021-04-05 13:42:07.821  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : Got description for table=test-table
2021-04-05 13:42:07.822  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : Got stream arn (arn:aws:dynamodb:us-east-1:000000000000:table/test-table/stream/2021-04-05T17:41:21.860) for table=test-table with tableArn=arn:aws:dynamodb:us-east-1:000000000000:table/test-table
2021-04-05 13:42:07.904  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : Creating KCL worker
2021-04-05 13:42:07.938  INFO 11414 --- [    Test worker] c.a.s.k.leases.impl.LeaseCoordinator     : With failover time 10000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 3308 ms, takeleases every 20050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time.
2021-04-05 13:42:07.941  INFO 11414 --- [    Test worker] c.a.s.k.clientlibrary.lib.worker.Worker  : Shard sync strategy determined as SHARD_END.
2021-04-05 13:42:07.941  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : KCL Worker created!
2021-04-05 13:42:07.944  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Initialization attempt 1
2021-04-05 13:42:07.945  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Initializing LeaseCoordinator
2021-04-05 13:42:14.525  INFO 11414 --- [    Test worker] c.c.u.d.s.controller.TestItemController  : *** TestItemController Started ***
2021-04-05 13:42:14.650  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Syncing Kinesis shard info
2021-04-05 13:42:14.656  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : syncShardLeases: begin
2021-04-05 13:42:14.656  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : getShardList: begin
2021-04-05 13:42:14.757  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : getShardList: done
2021-04-05 13:42:14.779  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : determineNewLeasesToCreate: begin
2021-04-05 13:42:14.781  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : determineNewLeasesToCreate: done
2021-04-05 13:42:14.863  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : cleanupGarbageLeases: begin
2021-04-05 13:42:14.863  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : cleanupGarbageLeases: done
2021-04-05 13:42:14.863  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : cleanupLeasesOfFinishedShards: begin
2021-04-05 13:42:14.863  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : cleanupLeasesOfFinishedShards: done
2021-04-05 13:42:14.863  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : syncShardLeases: done
2021-04-05 13:42:14.866  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Starting LeaseCoordinator
2021-04-05 13:42:14.900  INFO 11414 --- [cessingThread-0] c.a.s.kinesis.leases.impl.LeaseRenewer   :  Worker Test-application-lMtQuWkZeFmq+ found lease {
  "leaseKey" : "shardId-00000001617600000000-000000000000",
  "leaseOwner" : "Test-application-lMtQuWkZeFmq+",
  "leaseCounter" : 0,
  "concurrencyToken" : null,
  "lastCounterIncrementNanos" : null,
  "checkpoint" : {
    "sequenceNumber" : "LATEST",
    "subSequenceNumber" : 0
  },
  "pendingCheckpoint" : null,
  "ownerSwitchesSinceCheckpoint" : 0,
  "parentShardIds" : [ ]
}
2021-04-05 13:42:14.949  WARN 11414 --- [cessingThread-0] c.a.s.k.metrics.impl.MetricsHelper       : No metrics scope set in thread KCLProcessingThread-0, getMetricsScope returning NullMetricsScope.
2021-04-05 13:42:15.011  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:42:15.011  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:42:16.709  INFO 11414 --- [    Test worker] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2021-04-05 13:42:17.309  INFO 11414 --- [    Test worker] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 2 endpoint(s) beneath base path '/actuator'
2021-04-05 13:42:17.451  INFO 11414 --- [    Test worker] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 50376 (http) with context path ''
2021-04-05 13:42:17.464  INFO 11414 --- [    Test worker] d.d.r.u.Entity2DynamoDBTableSynchronizer : Checking repository classes with DynamoDB tables test-table for ContextRefreshedEvent
2021-04-05 13:42:17.468  INFO 11414 --- [    Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp    : Started DynamoDbStreamsTestApp in 54.763 seconds (JVM running for 77.176)
2021-04-05 13:42:17.872  INFO 11414 --- [    Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp    : *** ASK SERVER TO DELETE ITEM 23456 ***
2021-04-05 13:42:17.996  INFO 11414 --- [o-auto-1-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-04-05 13:42:17.996  INFO 11414 --- [o-auto-1-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2021-04-05 13:42:18.019  INFO 11414 --- [o-auto-1-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 23 ms
2021-04-05 13:42:18.068  INFO 11414 --- [o-auto-1-exec-1] c.c.u.d.s.controller.TestItemController  : Deleting itemNumber=23456
2021-04-05 13:42:21.670  INFO 11414 --- [o-auto-1-exec-1] c.c.u.d.s.controller.TestItemController  : Found inventory item to delete
2021-04-05 13:42:24.768  INFO 11414 --- [o-auto-1-exec-1] c.c.u.d.s.controller.TestItemController  : Item deleted
2021-04-05 13:42:24.789  INFO 11414 --- [    Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp    : *** SERVER SAYS ITEM 23456 DELETED ***
2021-04-05 13:42:24.954  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Initialization complete. Starting worker loop.
2021-04-05 13:42:24.970  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Created new shardConsumer for : ShardInfo [shardId=shardId-00000001617600000000-000000000000, concurrencyToken=b773dd9e-d385-44cd-8189-3cf330b94351, parentShardIds=[], checkpoint={SequenceNumber: LATEST,SubsequenceNumber: 0}]
2021-04-05 13:42:24.972  INFO 11414 --- [dProcessor-0000] c.a.s.k.c.l.w.BlockOnParentShardTask     : No need to block on parents [] of shard shardId-00000001617600000000-000000000000
2021-04-05 13:42:25.793  INFO 11414 --- [    Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp    : *** VERIFY ITEM 23456 WAS DELETED OR NOT ***
2021-04-05 13:42:31.304  INFO 11414 --- [o-auto-1-exec-2] c.c.u.d.s.controller.TestItemController  : Getting itemNumber=23456
2021-04-05 13:42:34.586  INFO 11414 --- [o-auto-1-exec-2] c.c.u.d.s.controller.TestItemController  : Item=TestItem{itemNumber='23456', description='A doo', price=10.99}
2021-04-05 13:42:34.589  INFO 11414 --- [dProcessor-0000] c.a.s.k.c.lib.worker.KinesisDataFetcher  : Initializing shard shardId-00000001617600000000-000000000000 with LATEST
2021-04-05 13:42:34.611 ERROR 11414 --- [    Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp    : *** SERVER STILL HAS ITEM 23456 ***
2021-04-05 13:42:34.663  WARN 11414 --- [    Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp    : Giving system 2 MINUTES before shut down
2021-04-05 13:42:35.088  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:42:35.088  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:42:55.171  INFO 11414 --- [oordinator-0001] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:42:55.171  INFO 11414 --- [oordinator-0001] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:43:08.731  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Current stream shard assignments: shardId-00000001617600000000-000000000000
2021-04-05 13:43:08.732  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Sleeping ...
2021-04-05 13:43:15.248  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:43:15.248  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:43:35.322  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:43:35.323  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:43:55.399  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:43:55.399  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:44:09.937  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Current stream shard assignments: shardId-00000001617600000000-000000000000
2021-04-05 13:44:09.937  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Sleeping ...
2021-04-05 13:44:15.470  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:44:15.470  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:44:35.550  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:44:35.551  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:44:38.723  INFO 11414 --- [o-auto-1-exec-5] c.c.u.d.s.controller.TestItemController  : Getting itemNumber=23456
2021-04-05 13:44:41.374  INFO 11414 --- [o-auto-1-exec-5] c.c.u.d.s.controller.TestItemController  : Item=TestItem{itemNumber='23456', description='A doo', price=10.99}

Информация о конфигурации

  • динамодб-потоки-кинезис-адаптер: 1.5.2
  • амазонка-кинезис-клиент: 1.13.3
  • junit-jupiter-api: 5.6.0
  • юнит-юпитер: 1.15.2
  • локальный стек: 1.15.2
  • локальный стек: 0.12.9
  • локальные утилиты: 0.2.10
  • AWS-Java-SDK-Dynamodb: 1.11.858
  • Spring-Boot-Starter-Web: 2.3.3.RELEASE
  • Spring data Dynamodb (из форка boostchicken): 5.2.5
  • MacOS Catalina: 10.15.7
  • Java: 15.0.2
  • Докер:
Client: Docker Engine - Community
 Cloud integration: 1.0.9
 Version:           20.10.5
 API version:       1.41
 Go version:        go1.13.15
 Git commit:        55c4c88
 Built:             Tue Mar  2 20:13:00 2021
 OS/Arch:           darwin/amd64
 Context:           default
 Experimental:      true

Server: Docker Engine - Community
 Engine:
  Version:          20.10.5
  API version:      1.41 (minimum version 1.12)
  Go version:       go1.13.15
  Git commit:       363e9a8
  Built:            Tue Mar  2 20:15:47 2021
  OS/Arch:          linux/amd64
  Experimental:     true
 containerd:
  Version:          1.4.3
  GitCommit:        269548fa27e0089a8b8278fc4fc781d7f65a939b
 runc:
  Version:          1.0.0-rc92
  GitCommit:        ff819c7e9184c13b7c2607fe6c30ae19403a7aff
 docker-init:
  Version:          0.19.0
  GitCommit:        de40ad0
 Kubernetes:
  Version:          Unknown
  StackAPI:         Unknown



Ответы (1)


Решение

После многих часов игры с этим я наконец заметил это сообщение

2021-04-05 13: 42: 24.954 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker: Инициализация завершена. Запуск рабочего цикла.

Посмотрев на это некоторое время, стало очевидно, что при использовании Testcontainer / Localstack для инициализации и готовности рабочего процесса требуется 10 секунд. Это было достаточно легко решить, поскольку com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker имеет прослушиватель изменения состояния, поэтому я мог настроить прослушиватель, чтобы определить, когда локальный стек был готов, а затем позвольте коду продолжить.

Заключительные вопросы

К сожалению, это не решило проблему полностью, оказалось, что реализация потоков DynamoDb в Testcontainer / Localstack выполняется очень медленно. Под этим я подразумеваю создание / удаление или изменение записи, если процессору осколков требовалось более 7 секунд для получения обновлений. Итак, я закончил тем, что поставил 10-секундный сон между моментом, когда я удалил элемент, и до того, как я проверил, чтобы убедиться, что элемент действительно был удален.

Заключение

Так что это оказалось проблемой времени, Testconatiners / Localstack просто очень медленно отправляет данные в шарды (по крайней мере, по сравнению с тем же самым точным кодом, запускаемым для DynamoDb и DynamoDb Streams).

person Jim M.    schedule 07.04.2021