Я работаю над задачей очистки кеша хранилища памяти, если во входном файле, который будет обрабатываться потоком данных, есть данные. Это означает, что если во входном файле нет записей, хранилище памяти не будет очищено, но во входном файле есть хотя бы одна запись, хранилище памяти должно быть очищено, а затем должен быть обработан входной файл.
Мое приложение потока данных представляет собой многоконвейерное приложение, которое считывает, обрабатывает и затем сохраняет данные в хранилище памяти. Конвейер работает успешно. Однако сброс хранилища памяти работает, но после сброса вставка не происходит.
Я написал функцию, которая очищает хранилище памяти после проверки наличия записи во входном файле.
FlushingMemorystore.java
package com.click.example.functions;
import afu.org.checkerframework.checker.nullness.qual.Nullable;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
public class FlushingMemorystore {
private static final Logger LOGGER = LoggerFactory.getLogger(FlushingMemorystore.class);
public static FlushingMemorystore.Read read() {
return (new AutoValue_FlushingMemorystore_Read.Builder())
.setConnectionConfiguration(RedisConnectionConfiguration.create()).build();
}
@AutoValue
public abstract static class Read extends PTransform<PCollection<Long>, PDone> {
public Read() {
}
@Nullable
abstract RedisConnectionConfiguration connectionConfiguration();
@Nullable
abstract Long expireTime();
abstract FlushingMemorystore.Read.Builder toBuilder();
public FlushingMemorystore.Read withEndpoint(String host, int port) {
Preconditions.checkArgument(host != null, "host cannot be null");
Preconditions.checkArgument(port > 0, "port cannot be negative or 0");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build();
}
public FlushingMemorystore.Read withAuth(String auth) {
Preconditions.checkArgument(auth != null, "auth cannot be null");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build();
}
public FlushingMemorystore.Read withTimeout(int timeout) {
Preconditions.checkArgument(timeout >= 0, "timeout cannot be negative");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build();
}
public FlushingMemorystore.Read withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) {
Preconditions.checkArgument(connectionConfiguration != null, "connection cannot be null");
return this.toBuilder().setConnectionConfiguration(connectionConfiguration).build();
}
public FlushingMemorystore.Read withExpireTime(Long expireTimeMillis) {
Preconditions.checkArgument(expireTimeMillis != null, "expireTimeMillis cannot be null");
Preconditions.checkArgument(expireTimeMillis > 0L, "expireTimeMillis cannot be negative or 0");
return this.toBuilder().setExpireTime(expireTimeMillis).build();
}
public PDone expand(PCollection<Long> input) {
Preconditions.checkArgument(this.connectionConfiguration() != null, "withConnectionConfiguration() is required");
input.apply(ParDo.of(new FlushingMemorystore.Read.ReadFn(this)));
return PDone.in(input.getPipeline());
}
private static class ReadFn extends DoFn<Long, String> {
private static final int DEFAULT_BATCH_SIZE = 1000;
private final FlushingMemorystore.Read spec;
private transient Jedis jedis;
private transient Pipeline pipeline;
private int batchCount;
public ReadFn(FlushingMemorystore.Read spec) {
this.spec = spec;
}
@Setup
public void setup() {
this.jedis = this.spec.connectionConfiguration().connect();
}
@StartBundle
public void startBundle() {
this.pipeline = this.jedis.pipelined();
this.pipeline.multi();
this.batchCount = 0;
}
@ProcessElement
public void processElement(DoFn<Long, String>.ProcessContext c) {
Long count = c.element();
batchCount++;
if(count==null && count < 0) {
LOGGER.info("No Records are there in the input file");
} else {
if (pipeline.isInMulti()) {
pipeline.exec();
pipeline.sync();
jedis.flushDB();
}
LOGGER.info("*****The memorystore is flushed*****");
}
}
@FinishBundle
public void finishBundle() {
if (this.pipeline.isInMulti()) {
this.pipeline.exec();
this.pipeline.sync();
}
this.batchCount=0;
}
@Teardown
public void teardown() {
this.jedis.close();
}
}
@AutoValue.Builder
abstract static class Builder {
Builder() {
}
abstract FlushingMemorystore.Read.Builder setExpireTime(Long expireTimeMillis);
abstract FlushingMemorystore.Read build();
abstract FlushingMemorystore.Read.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration);
}
}
}
Я использую эту функцию в своем коде Starter Pipeline.
Фрагмент кода начального конвейера, в котором используется функция:
StorageToRedisOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(StorageToRedisOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<String> lines = p.apply(
"ReadLines", TextIO.read().from(options.getInputFile()));
/**
* Flushing the Memorystore if there are records in the input file
*/
lines.apply("Checking Data in input file", Count.globally())
.apply("Flushing the data store", FlushingMemorystore.read()
.withConnectionConfiguration(RedisConnectionConfiguration
.create(options.getRedisHost(), options.getRedisPort())));
Фрагмент кода для вставки обработанных данных после очистки кеша:
dataset.apply(SOME_DATASET_TRANSFORMATION, RedisIO.write()
.withMethod(RedisIO.Write.Method.SADD)
.withConnectionConfiguration(RedisConnectionConfiguration
.create(options.getRedisHost(), options.getRedisPort())));
Поток данных выполняется нормально, и он также очищает хранилище памяти, но после этого вставка не работает. Не могли бы вы указать, где я ошибаюсь? Любое решение для решения проблемы действительно ценится. Заранее спасибо!
Изменить:
Предоставление дополнительной информации по запросу в комментариях
Используемая среда выполнения — Java 11, и она использует Apache Beam SDK для 2.24.0.
Если во входном файле есть записи, он будет обрабатывать данные с некоторой логикой. Например, если во входном файле есть такие данные, как:
abcabc|Bruce|Wayne|2000
abbabb|Tony|Stark|3423
Поток данных подсчитает количество записей, которых в данном случае 2, и обработает идентификатор, имя и т. д. в соответствии с логикой, а затем сохранит в памяти. Этот входной файл будет поступать каждый день, поэтому хранилище памяти должно быть очищено (или сброшено), если во входном файле есть записи.
Хотя конвейер не ломается, но я думаю, что что-то упускаю.