Camel: как разделить, а затем объединить, когда количество элементов меньше размера партии

У меня есть маршрут Camel, который читает файл из S3 и обрабатывает входной файл следующим образом:

  1. Разберите каждую строку в POJO (студент) с помощью Bindy
  2. Разделить вывод по телу ()
  3. Агрегировать по атрибуту тела (.semester) и размеру пакета 2
  4. Вызвать службу сохранения для загрузки в БД заданными партиями

Проблема в том, что при размере пакета 2 и нечетном количестве записей всегда есть одна запись, которая не сохраняется.

Предоставляется код Kotlin, но он не должен сильно отличаться от эквивалентного кода Java (за исключением косой черты перед "\ $ { простое выражение} "или отсутствие точек с запятой в завершающих операторах.

Если я установил размер пакета равным 1, то каждая запись будет сохранена, в противном случае последняя запись никогда не будет сохранена.

Я несколько раз проверял документацию по message-processor, но это не так. Похоже, что это касается именно этого сценария.

Я также установил [completionTimeout | completionInterval] в дополнение к completionSize, но это не имеет никакого значения.

Кто-нибудь раньше сталкивался с этой проблемой?

val csvDataFormat = BindyCsvDataFormat(Student::class.java)

from("aws-s3://$student-12-bucket?amazonS3Client=#amazonS3&delay=5000")
    .log("A new Student input file has been received in S3: '\${header.CamelAwsS3BucketName}/\${header.CamelAwsS3Key}'")
    .to("direct:move-input-s3-object-to-in-progress")
    .to("direct:process-s3-file")
    .to("direct:move-input-s3-object-to-completed")
    .end()

from("direct:process-s3-file")
    .unmarshal(csvDataFormat)
    .split(body())
    .streaming()
    .parallelProcessing()
    .aggregate(simple("\${body.semester}"), GroupedBodyAggregationStrategy())
    .completionSize(2)
    .bean(persistenceService)
    .end()

С входным CSV-файлом, включающим семь (7) записей, это результат сгенерированного вывода (с некоторыми добавленными журналами отладки):

WARN 19540 --- [student-12-move] c.a.s.s.internal.S3AbortableInputStream  : Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
 INFO 19540 --- [student-12-move] student-workflow-main                    : A new Student input file has been received in S3: 'student-12-bucket/inbox/foo.csv'
 INFO 19540 --- [student-12-move] move-input-s3-object-to-in-progress      : Moving S3 file 'inbox/foo.csv' to 'in-progress' folder...
 INFO 19540 --- [student-12-move] student-workflow-main                    : Moved input S3 file 'in-progress/foo.csv' to 'in-progress' folder...
 INFO 19540 --- [student-12-move] pre-process-s3-file-records              : Start saving to database...
DEBUG 19540 --- [read #7 - Split] c.b.i.d.s.StudentPersistenceServiceImpl  : Saving record to database: Student(id=7, name=Student 7, semester=2nd, javaMarks=25)
DEBUG 19540 --- [read #7 - Split] c.b.i.d.s.StudentPersistenceServiceImpl  : Saving record to database: Student(id=5, name=Student 5, semester=2nd, javaMarks=81)
DEBUG 19540 --- [read #3 - Split] c.b.i.d.s.StudentPersistenceServiceImpl  : Saving record to database: Student(id=6, name=Student 6, semester=1st, javaMarks=15)
DEBUG 19540 --- [read #3 - Split] c.b.i.d.s.StudentPersistenceServiceImpl  : Saving record to database: Student(id=2, name=Student 2, semester=1st, javaMarks=62)
DEBUG 19540 --- [read #2 - Split] c.b.i.d.s.StudentPersistenceServiceImpl  : Saving record to database: Student(id=3, name=Student 3, semester=2nd, javaMarks=72)
DEBUG 19540 --- [read #2 - Split] c.b.i.d.s.StudentPersistenceServiceImpl  : Saving record to database: Student(id=1, name=Student 1, semester=2nd, javaMarks=87)
 INFO 19540 --- [student-12-move] device-group-workflow-main               : End pre-processing S3 CSV file records...
 INFO 19540 --- [student-12-move] move-input-s3-object-to-completed        : Moving S3 file 'in-progress/foo.csv' to 'completed' folder...
 INFO 19540 --- [student-12-move] device-group-workflow-main               : Moved S3 file 'in-progress/foo.csv' to 'completed' folder...

person Lex Luthor    schedule 03.01.2019    source источник
comment
ЗавершениеTimeout должно запускать последнюю строку по истечении времени ожидания. Было бы странно, если бы это не сработало.   -  person Claus Ibsen    schedule 03.01.2019
comment
Он действительно показывает правильное поведение, если я заменяю простой ($ {body.semester}) на постоянный (true). Наверное, это ошибка ...   -  person Lex Luthor    schedule 05.01.2019
comment
Какую версию Camel вы используете? Групповой ключ, будь то body.semester или константа, не должен влиять на тайм-аут.   -  person Claus Ibsen    schedule 05.01.2019
comment
Я использую следующие компоненты: + camel.version = 2.23.0 + spring-boot.version = 2.1.1.RELEASE + kotlin.version = 1.3.10 + aws-java-sdk.version = 1.11.461   -  person Lex Luthor    schedule 06.01.2019
comment
И вы уверены, что с вашим bean-компонентом все в порядке, что не работает только с 1 записью. Пробовали ли вы добавить журнал после агрегата, чтобы увидеть, что он что-то регистрирует, когда срабатывает тайм-аут завершения и т.д. И сделать возможным простую работу без учетной записи AWS и т. Д.   -  person Claus Ibsen    schedule 07.01.2019
comment
@LexLuthor, вы смогли это исправить? Я столкнулся с аналогичной проблемой.   -  person Sarang    schedule 02.12.2020
comment
В итоге я преобразовал свою логику обработки в реактивный поток с помощью библиотеки Project Reactor. Затем я подписываюсь на него из маршрута Camel верхнего уровня. чистый подход, очень доволен результатами.   -  person Lex Luthor    schedule 08.12.2020


Ответы (1)


Если вам нужно немедленно завершить свое сообщение, вы можете указать предикат завершения, который основан на свойствах обмена, установленных разделителем. Я не пробовал, но думаю

.completionPredicate( simple( "${exchangeProperty.CamelSplitComplete}" ) )

обработает последнее сообщение.

Еще меня беспокоит то, что вы установили parallelProcessing в своем сплиттере, что может означать, что сообщения обрабатываются не по порядку. Это действительно сплиттер, к которому вы хотите применить параллельную обработку, или агрегатор? Похоже, что вы ничего не делаете с разделенными записями, кроме их агрегирования, а затем их обработки, поэтому может быть лучше переместить инструкцию parallelProcessing в агрегатор.

person Screwtape    schedule 03.01.2019
comment
Это не решило проблему, но помогло мне понять причину проблемы. Проблема, по всей видимости, заключается в том, что при группировке по определенному атрибуту тела могут быть некоторые обмены переносом в конце, для которых ни одно из условий завершения никогда не будет истинным: * размер завершений обмена == 2 * exchangeProperty.CamelSplitComplete == правда Не знаю, как это решить .. - person Lex Luthor; 04.01.2019
comment
Возможно, отключите параллельную обработку, так как вы можете включить обработку вне очереди. - person Claus Ibsen; 06.01.2019
comment
Я отключил параллельную обработку во время разделения, как было предложено. хотя конечный результат был таким же .. - person Lex Luthor; 07.01.2019