Spring Integration DSL для исходящего SFTP с удалением

я использую

  • Интеграция Sprint (файл, SFTP и т. Д.) 4.3.6
  • Spring загрузки 1.4.3
  • Интеграция Spring Java DSL 1.1.4

и я пытаюсь настроить исходящий адаптер SFTP, который позволил бы мне переместить файл в каталог в удаленной системе, а также удалить или переименовать файл в моей локальной системе.

Так, например, я хотел бы поместить файл a.txt в локальный каталог и привязать его по SFTP к удаленному серверу в каталоге inbound. После завершения передачи я хотел бы удалить или переименовать локальную копию a.txt.

Я подбирал для этого несколько способов. Итак, вот мой общий SessionFactory для теста.

protected SessionFactory<ChannelSftp.LsEntry> buildSftpSessionFactory() {
    DefaultSftpSessionFactory sessionFactory = new DefaultSftpSessionFactory();
    sessionFactory.setHost("localhost");
    sessionFactory.setUser("user");
    sessionFactory.setAllowUnknownKeys(true);
    sessionFactory.setPassword("pass");
    CachingSessionFactory<ChannelSftp.LsEntry> cachingSessionFactory = new CachingSessionFactory<>(sessionFactory, 1);
    return cachingSessionFactory;
}

Это преобразователь, который мне нужно добавить в сообщение

@Override
public Message<File> transform(Message<File> source) {
    System.out.println("here is the thing : "+source);
    File file = (File)source.getPayload();
    Message<File> transformedMessage = MessageBuilder.withPayload(file)
            .copyHeaders(source.getHeaders())
            .setHeaderIfAbsent(FileHeaders.ORIGINAL_FILE, file)
            .setHeaderIfAbsent(FileHeaders.FILENAME, file.getName())
            .build();
    return transformedMessage;
}

Затем у меня есть поток интеграции, который использует опросчик для просмотра локального каталога и вызывает это:

@Bean
public IntegrationFlow pushTheFile(){
    return IntegrationFlows
            .from(s -> s.file(new File(DIR_TO_WATCH))
                            .patternFilter("*.txt").preventDuplicates(),
                    e -> e.poller(Pollers.fixedDelay(100)))
            .transform(outboundTransformer)
            .handle(Sftp.outboundAdapter(this.buildSftpSessionFactory())
                    .remoteFileSeparator("/")
                    .useTemporaryFileName(false)
                    .remoteDirectory("inbound/")
            )
            .get();
}

Это нормально работает, но оставляет локальный файл. Любые идеи о том, как удалить этот локальный файл после завершения загрузки? Должен ли я вместо этого смотреть на SftpOutboundGateway?

Заранее спасибо!

Ответ Артема сработал отлично! Вот быстрый пример, который удаляет локальный файл после его отправки.

@Bean
public IntegrationFlow pushTheFile(){
    return IntegrationFlows
            .from(s -> s.file(new File(DIR_TO_WATCH))
                            .patternFilter("*.txt").preventDuplicates(),
                    e -> e.poller(Pollers.fixedDelay(100)))
            .transform(outboundTransformer)
            .handle(Sftp.outboundAdapter(this.buildSftpSessionFactory())
                    .remoteFileSeparator("/")
                    .useTemporaryFileName(false)
                    .remoteDirectory("inbound/"), c -> c.advice(expressionAdvice(c))
            )
            .get();
}

@Bean
public Advice expressionAdvice(GenericEndpointSpec<FileTransferringMessageHandler<ChannelSftp.LsEntry>> c) {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setOnSuccessExpression("payload.delete()");
    advice.setOnFailureExpression("payload + ' failed to upload'");
    advice.setTrapException(true);
    return advice;
}

person Tristan    schedule 02.02.2017    source источник
comment
Может быть, это поможет вам - stackoverflow.com/questions/36247467/   -  person Amit Bhati    schedule 02.02.2017


Ответы (1)


Для этого можно использовать несколько подходов.

Все они основаны на том, что вы делаете что-то еще с исходным сообщением запроса для Sftp.outboundAdapter().

  1. .publishSubscribeChannel() позволяет отправить одно и то же сообщение нескольким подписчикам, когда его получит второй, только первый завершит свою работу. По умолчанию, если вы не укажете Executor

  2. routeToRecipients() позволяет получить тот же результат, но с помощью другого компонента.

  3. ExpressionEvaluatingRequestHandlerAdvice - вы добавляете его в .advice() определения Sftp.outboundAdapter() конечной точки - второй аргумент .handle() и выполняете file.delete() через onSuccessExpression:

    .transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
    
person Artem Bilan    schedule 02.02.2017
comment
Спасибо Артему за ответ. Я пытаюсь использовать третий вариант, о котором вы упомянули, но я немного запутался. Sftp.outboundAdapter(), похоже, не имеет метода advice(). Я уверен, что мне просто не хватает чего-то простого, но не могли бы вы помочь мне указать направление? - person Tristan; 03.02.2017
comment
Правда, вам нужно взглянуть на Consumer arg _2 _: github.com/spring-projects/spring-integration-java-dsl/wiki/ - person Artem Bilan; 03.02.2017
comment
Я понимаю, о чем вы сейчас говорите. Я отправлю пример, как только его почищу. Спасибо еще раз!! - person Tristan; 03.02.2017
comment
Добавление примера в пост - person Tristan; 03.02.2017
comment
@ArtemBilan что делать, если Sftp.outboundAdapter () не удалось загрузить файл. вторая подписка все еще получает сообщение для обработки. есть способ остановить это? - person Umair; 19.08.2020
comment
Извините, похоже, что он заслуживает отдельной темы SO с гораздо большей информацией. - person Artem Bilan; 19.08.2020
comment
@ArtemBilan stackoverflow.com/questions/63489690/ - person Umair; 19.08.2020