Как вставить данные после очистки кеша в Cloud Memorystore с помощью Google Cloud Dataflow?

Я работаю над задачей очистки кеша хранилища памяти, если во входном файле, который будет обрабатываться потоком данных, есть данные. Это означает, что если во входном файле нет записей, хранилище памяти не будет очищено, но во входном файле есть хотя бы одна запись, хранилище памяти должно быть очищено, а затем должен быть обработан входной файл.

Мое приложение потока данных представляет собой многоконвейерное приложение, которое считывает, обрабатывает и затем сохраняет данные в хранилище памяти. Конвейер работает успешно. Однако сброс хранилища памяти работает, но после сброса вставка не происходит.

Я написал функцию, которая очищает хранилище памяти после проверки наличия записи во входном файле.

FlushingMemorystore.java

package com.click.example.functions;

import afu.org.checkerframework.checker.nullness.qual.Nullable;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;

public class FlushingMemorystore {


    private static final Logger LOGGER = LoggerFactory.getLogger(FlushingMemorystore.class);

    public static FlushingMemorystore.Read read() {
        return (new AutoValue_FlushingMemorystore_Read.Builder())
                .setConnectionConfiguration(RedisConnectionConfiguration.create()).build();
    }

    @AutoValue
    public abstract static class Read extends PTransform<PCollection<Long>, PDone> {

        public Read() {
        }

        @Nullable
        abstract RedisConnectionConfiguration connectionConfiguration();

        @Nullable
        abstract Long expireTime();
        abstract FlushingMemorystore.Read.Builder toBuilder();

        public FlushingMemorystore.Read withEndpoint(String host, int port) {
            Preconditions.checkArgument(host != null, "host cannot be null");
            Preconditions.checkArgument(port > 0, "port cannot be negative or 0");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build();
        }

        public FlushingMemorystore.Read withAuth(String auth) {
            Preconditions.checkArgument(auth != null, "auth cannot be null");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build();
        }

        public FlushingMemorystore.Read withTimeout(int timeout) {
            Preconditions.checkArgument(timeout >= 0, "timeout cannot be negative");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build();
        }

        public FlushingMemorystore.Read withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) {
            Preconditions.checkArgument(connectionConfiguration != null, "connection cannot be null");
            return this.toBuilder().setConnectionConfiguration(connectionConfiguration).build();
        }

        public FlushingMemorystore.Read withExpireTime(Long expireTimeMillis) {
            Preconditions.checkArgument(expireTimeMillis != null, "expireTimeMillis cannot be null");
            Preconditions.checkArgument(expireTimeMillis > 0L, "expireTimeMillis cannot be negative or 0");
            return this.toBuilder().setExpireTime(expireTimeMillis).build();
        }

        public PDone expand(PCollection<Long> input) {
            Preconditions.checkArgument(this.connectionConfiguration() != null, "withConnectionConfiguration() is required");
            input.apply(ParDo.of(new FlushingMemorystore.Read.ReadFn(this)));
            return PDone.in(input.getPipeline());
        }

        private static class ReadFn extends DoFn<Long, String> {
            private static final int DEFAULT_BATCH_SIZE = 1000;
            private final FlushingMemorystore.Read spec;
            private transient Jedis jedis;
            private transient Pipeline pipeline;
            private int batchCount;

            public ReadFn(FlushingMemorystore.Read spec) {
                this.spec = spec;
            }

            @Setup
            public void setup() {
                this.jedis = this.spec.connectionConfiguration().connect();
            }

            @StartBundle
            public void startBundle() {
                this.pipeline = this.jedis.pipelined();
                this.pipeline.multi();
                this.batchCount = 0;
            }

            @ProcessElement
            public void processElement(DoFn<Long, String>.ProcessContext c) {
                Long count = c.element();
                batchCount++;

                if(count==null && count < 0) {
                    LOGGER.info("No Records are there in the input file");
                } else {
                    if (pipeline.isInMulti()) {
                        pipeline.exec();
                        pipeline.sync();
                        jedis.flushDB();
                    }
                    LOGGER.info("*****The memorystore is flushed*****");
                }
            }

            @FinishBundle
            public void finishBundle() {
                if (this.pipeline.isInMulti()) {
                    this.pipeline.exec();
                    this.pipeline.sync();
                }
                this.batchCount=0;
            }

            @Teardown
            public void teardown() {
                this.jedis.close();
            }

        }

        @AutoValue.Builder
        abstract static class Builder {

            Builder() {
            }

            abstract FlushingMemorystore.Read.Builder setExpireTime(Long expireTimeMillis);

            abstract FlushingMemorystore.Read build();

            abstract FlushingMemorystore.Read.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration);

        }

    }

}

Я использую эту функцию в своем коде Starter Pipeline.

Фрагмент кода начального конвейера, в котором используется функция:

 StorageToRedisOptions options = PipelineOptionsFactory.fromArgs(args)
                .withValidation()
                .as(StorageToRedisOptions.class);

        Pipeline p = Pipeline.create(options);

        PCollection<String> lines = p.apply(
                "ReadLines", TextIO.read().from(options.getInputFile()));

        /**
         * Flushing the Memorystore if there are records in the input file
         */
        lines.apply("Checking Data in input file", Count.globally())
                .apply("Flushing the data store", FlushingMemorystore.read()
                        .withConnectionConfiguration(RedisConnectionConfiguration
                        .create(options.getRedisHost(), options.getRedisPort())));

Фрагмент кода для вставки обработанных данных после очистки кеша:

 dataset.apply(SOME_DATASET_TRANSFORMATION, RedisIO.write()
                .withMethod(RedisIO.Write.Method.SADD)
                .withConnectionConfiguration(RedisConnectionConfiguration
                        .create(options.getRedisHost(), options.getRedisPort())));

Поток данных выполняется нормально, и он также очищает хранилище памяти, но после этого вставка не работает. Не могли бы вы указать, где я ошибаюсь? Любое решение для решения проблемы действительно ценится. Заранее спасибо!

Изменить:

Предоставление дополнительной информации по запросу в комментариях

Используемая среда выполнения — Java 11, и она использует Apache Beam SDK для 2.24.0.

Если во входном файле есть записи, он будет обрабатывать данные с некоторой логикой. Например, если во входном файле есть такие данные, как:

abcabc|Bruce|Wayne|2000
abbabb|Tony|Stark|3423

Поток данных подсчитает количество записей, которых в данном случае 2, и обработает идентификатор, имя и т. д. в соответствии с логикой, а затем сохранит в памяти. Этот входной файл будет поступать каждый день, поэтому хранилище памяти должно быть очищено (или сброшено), если во входном файле есть записи.

Хотя конвейер не ломается, но я думаю, что что-то упускаю.


comment
Не могли бы вы предоставить нам среду выполнения Java, которую вы используете? Пожалуйста, дайте нам пример вывода, который ваша программа может определить, есть ли запись во входном файле. Кроме того, не могли бы вы подтвердить, что можете установить соединение с Memorystore при использовании функции в начальном конвейере? Пожалуйста, обновите пост своим ответом.   -  person Dondi    schedule 03.11.2020
comment
я предоставил больше информации   -  person viveknaskar    schedule 03.11.2020
comment
Вы можете поделиться, откуда берется ваш набор данных? может быть, SOME_DATASET_TRANSFORMATION пишет в Redis параллельно, когда вы его очищаете? Итак, проблема в том, как дождаться сброса данных перед загрузкой набора данных? Это правильно?   -  person Pablo    schedule 05.11.2020


Ответы (2)


Я подозреваю, что проблема здесь в том, что вам нужно убедиться, что шаг Flush выполняется (и завершается) до того, как произойдет шаг RedisIO.write. Beam имеет Wait.on преобразование, которое вы можете использовать для этого.

Чтобы добиться этого, мы можем использовать выходные данные очищающего PTransform в качестве сигнала о том, что мы очистили базу данных, и мы записываем в базу данных только после того, как мы закончили очистку. Вызов process для сброса DoFn будет выглядеть так:

@ProcessElement
public void processElement(DoFn<Long, String>.ProcessContext c) {
    Long count = c.element();

    if(count==null && count < 0) {
       LOGGER.info("No Records are there in the input file");
    } else {
       if (pipeline.isInMulti()) {
           pipeline.exec();
           pipeline.sync();
           jedis.flushDB();
       }
       LOGGER.info("*****The memorystore is flushed*****");
   }
   c.output("READY");
}

Когда у нас есть сигнал, указывающий, что база данных очищена, мы можем использовать его для ожидания перед записью в нее новых данных:

Pipeline p = Pipeline.create(options);

PCollection<String> lines = p.apply(
        "ReadLines", TextIO.read().from(options.getInputFile()));

/**
 * Flushing the Memorystore if there are records in the input file
 */
PCollection<String> flushedSignal = lines
     .apply("Checking Data in input file", Count.globally())
     .apply("Flushing the data store", FlushingMemorystore.read()
                     .withConnectionConfiguration(RedisConnectionConfiguration
                     .create(options.getRedisHost(), options.getRedisPort())));

// Then we use the flushing signal to start writing to Redis:

dataset
    .apply(Wait.on(flushedSignal))
    .apply(SOME_DATASET_TRANSFORMATION, RedisIO.write()
                .withMethod(RedisIO.Write.Method.SADD)
                .withConnectionConfiguration(RedisConnectionConfiguration
                        .create(options.getRedisHost(), options.getRedisPort())));
person Pablo    schedule 04.11.2020
comment
Спасибо, Пабло. Преобразование Wait.on сделало свое дело. Однако мне пришлось немного изменить свой FlushingMemorystore.java, чтобы получить PCollection‹String›, которую я опубликую в отдельном ответе. Ваш ответ - тот, который решил мою проблему. - person viveknaskar; 05.11.2020

Проблема решена после того, как я применил преобразование Wait.on, поскольку ответ Пабло уже объяснил это. Однако мне пришлось немного переписать FlushingMemorystore.java в PCollection для флага flushSignal.

Вот функция:

package com.click.example.functions;

import afu.org.checkerframework.checker.nullness.qual.Nullable;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;

public class FlushingMemorystore extends DoFn<Long, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(FlushingMemorystore.class);

    public static FlushingMemorystore.Read read() {
        return (new AutoValue_FlushingMemorystore_Read.Builder())
                .setConnectionConfiguration(RedisConnectionConfiguration.create()).build();
    }

    @AutoValue
    public abstract static class Read extends PTransform<PCollection<Long>, PCollection<String>> {

        public Read() {
        }

        @Nullable
        abstract RedisConnectionConfiguration connectionConfiguration();

        @Nullable
        abstract Long expireTime();
        abstract FlushingMemorystore.Read.Builder toBuilder();

        public FlushingMemorystore.Read withEndpoint(String host, int port) {
            Preconditions.checkArgument(host != null, "host cannot be null");
            Preconditions.checkArgument(port > 0, "port cannot be negative or 0");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build();
        }

        public FlushingMemorystore.Read withAuth(String auth) {
            Preconditions.checkArgument(auth != null, "auth cannot be null");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build();
        }

        public FlushingMemorystore.Read withTimeout(int timeout) {
            Preconditions.checkArgument(timeout >= 0, "timeout cannot be negative");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build();
        }

        public FlushingMemorystore.Read withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) {
            Preconditions.checkArgument(connectionConfiguration != null, "connection cannot be null");
            return this.toBuilder().setConnectionConfiguration(connectionConfiguration).build();
        }

        public FlushingMemorystore.Read withExpireTime(Long expireTimeMillis) {
            Preconditions.checkArgument(expireTimeMillis != null, "expireTimeMillis cannot be null");
            Preconditions.checkArgument(expireTimeMillis > 0L, "expireTimeMillis cannot be negative or 0");
            return this.toBuilder().setExpireTime(expireTimeMillis).build();
        }

        public PCollection<String> expand(PCollection<Long> input) {
            Preconditions.checkArgument(this.connectionConfiguration() != null, "withConnectionConfiguration() is required");
           return input.apply(ParDo.of(new FlushingMemorystore.Read.ReadFn(this)));
        }

        @Setup
        public Jedis setup() {
            return this.connectionConfiguration().connect();
        }

        private static class ReadFn extends DoFn<Long, String> {
            private static final int DEFAULT_BATCH_SIZE = 1000;
            private final FlushingMemorystore.Read spec;
            private transient Jedis jedis;
            private transient Pipeline pipeline;
            private int batchCount;

            public ReadFn(FlushingMemorystore.Read spec) {
                this.spec = spec;
            }

            @Setup
            public void setup() {
                this.jedis = this.spec.connectionConfiguration().connect();
            }

            @StartBundle
            public void startBundle() {
                this.pipeline = this.jedis.pipelined();
                this.pipeline.multi();
                this.batchCount = 0;
            }

            @ProcessElement
            public void processElement(@Element Long count, OutputReceiver<String> out) {
                batchCount++;

                if(count!=null && count > 0) {
                    if (pipeline.isInMulti()) {
                        pipeline.exec();
                        pipeline.sync();
                        jedis.flushDB();
                        LOGGER.info("*****The memorystore is flushed*****");
                    }
                    out.output("SUCCESS");
                } else {
                    LOGGER.info("No Records are there in the input file");
                    out.output("FAILURE");
                }

            }

            @FinishBundle
            public void finishBundle() {
                if (this.pipeline.isInMulti()) {
                    this.pipeline.exec();
                    this.pipeline.sync();
                }
                this.batchCount=0;
            }

            @Teardown
            public void teardown() {
                this.jedis.close();
            }

        }

        @AutoValue.Builder
        abstract static class Builder {

            Builder() {
            }

            abstract FlushingMemorystore.Read.Builder setExpireTime(Long expireTimeMillis);

            abstract FlushingMemorystore.Read build();

          abstract FlushingMemorystore.Read.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration);

        }

    }

}
person viveknaskar    schedule 08.11.2020