Я пытаюсь реализовать следующий сценарий:
У меня есть файл триггера и файл данных, которые хранятся в разных каталогах. Только если я получу файл триггера, я смогу получить доступ к файлу данных, а затем выполнить логику разделения и дальнейшей обработки. Кроме того, дело в том, что будет один файл триггера, но несколько файлов данных. Таким образом, после извлечения файла триггера я смогу обработать все файлы данных.
Ниже приведен код, который я использовал, но он извлекает только из одного каталога.
private static final Logger LOGGER = LoggerFactory.getLogger(DatastreamApplication.class);
private static final String DATA_DIRECTORY_PATH = "dataDirectoryLocation";
@SuppressWarnings("deprecation")
public static void main(String[] args) {
new SpringApplicationBuilder(DatastreamApplication.class).web(false).run(args);
}
@Bean
@InboundChannelAdapter(channel = "fileInputChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(DATA_DIRECTORY_PATH));
source.setFilter(new AcceptOnceFileListFilter<>());
return source;
}
@Splitter(inputChannel = "fileInputChannel")
@Bean
public FileSplitter fileSplitter() {
FileSplitter fileSplitter = new FileSplitter();
fileSplitter.setOutputChannelName("chunkingChannel");
return fileSplitter;
}
@ServiceActivator(inputChannel = "chunkingChannel")
@Bean
public AggregatingMessageHandler chunker() {
AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor());
aggregator.setReleaseStrategy(new MessageCountReleaseStrategy(1000));
aggregator.setExpireGroupsUponCompletion(true);
aggregator.setGroupTimeoutExpression(new ValueExpression<>(100L));
aggregator.setSendPartialResultOnExpiry(true);
aggregator.setOutputChannelName("processFileChannel");
return aggregator;
}
@Bean
@ServiceActivator(inputChannel = "processFileChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
List<String> strings = (List<String>) message.getPayload();
System.out.println( "List Size : "+ strings.size() + " for List " + strings.toString());
}
};
}