I am working on a task to clear the cache of memorystore if the input file to be processed by dataflow has data. This means, if the input file has no records, the memorystore won't be flushed, but the input file has even one record, the memorystore should be flushed and then the input file should be processed.
My dataflow application is a multi-pipeline application which reads, processes and then stores the data in the memorystore. The pipeline is executing successfully. However, the flushing of the memorystore is working but after flushing, the insertion is not happening.
I have written a function that flushes the memorystore after checking if the input file has a record.
FlushingMemorystore.java
package com.click.example.functions;
import afu.org.checkerframework.checker.nullness.qual.Nullable;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
public class FlushingMemorystore {
private static final Logger LOGGER = LoggerFactory.getLogger(FlushingMemorystore.class);
public static FlushingMemorystore.Read read() {
return (new AutoValue_FlushingMemorystore_Read.Builder())
.setConnectionConfiguration(RedisConnectionConfiguration.create()).build();
}
@AutoValue
public abstract static class Read extends PTransform<PCollection<Long>, PDone> {
public Read() {
}
@Nullable
abstract RedisConnectionConfiguration connectionConfiguration();
@Nullable
abstract Long expireTime();
abstract FlushingMemorystore.Read.Builder toBuilder();
public FlushingMemorystore.Read withEndpoint(String host, int port) {
Preconditions.checkArgument(host != null, "host cannot be null");
Preconditions.checkArgument(port > 0, "port cannot be negative or 0");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build();
}
public FlushingMemorystore.Read withAuth(String auth) {
Preconditions.checkArgument(auth != null, "auth cannot be null");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build();
}
public FlushingMemorystore.Read withTimeout(int timeout) {
Preconditions.checkArgument(timeout >= 0, "timeout cannot be negative");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build();
}
public FlushingMemorystore.Read withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) {
Preconditions.checkArgument(connectionConfiguration != null, "connection cannot be null");
return this.toBuilder().setConnectionConfiguration(connectionConfiguration).build();
}
public FlushingMemorystore.Read withExpireTime(Long expireTimeMillis) {
Preconditions.checkArgument(expireTimeMillis != null, "expireTimeMillis cannot be null");
Preconditions.checkArgument(expireTimeMillis > 0L, "expireTimeMillis cannot be negative or 0");
return this.toBuilder().setExpireTime(expireTimeMillis).build();
}
public PDone expand(PCollection<Long> input) {
Preconditions.checkArgument(this.connectionConfiguration() != null, "withConnectionConfiguration() is required");
input.apply(ParDo.of(new FlushingMemorystore.Read.ReadFn(this)));
return PDone.in(input.getPipeline());
}
private static class ReadFn extends DoFn<Long, String> {
private static final int DEFAULT_BATCH_SIZE = 1000;
private final FlushingMemorystore.Read spec;
private transient Jedis jedis;
private transient Pipeline pipeline;
private int batchCount;
public ReadFn(FlushingMemorystore.Read spec) {
this.spec = spec;
}
@Setup
public void setup() {
this.jedis = this.spec.connectionConfiguration().connect();
}
@StartBundle
public void startBundle() {
this.pipeline = this.jedis.pipelined();
this.pipeline.multi();
this.batchCount = 0;
}
@ProcessElement
public void processElement(DoFn<Long, String>.ProcessContext c) {
Long count = c.element();
batchCount++;
if(count==null && count < 0) {
LOGGER.info("No Records are there in the input file");
} else {
if (pipeline.isInMulti()) {
pipeline.exec();
pipeline.sync();
jedis.flushDB();
}
LOGGER.info("*****The memorystore is flushed*****");
}
}
@FinishBundle
public void finishBundle() {
if (this.pipeline.isInMulti()) {
this.pipeline.exec();
this.pipeline.sync();
}
this.batchCount=0;
}
@Teardown
public void teardown() {
this.jedis.close();
}
}
@AutoValue.Builder
abstract static class Builder {
Builder() {
}
abstract FlushingMemorystore.Read.Builder setExpireTime(Long expireTimeMillis);
abstract FlushingMemorystore.Read build();
abstract FlushingMemorystore.Read.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration);
}
}
}
I am using the function in my Starter Pipeline code.
Code snippet of starter pipeline where the function is being used:
StorageToRedisOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(StorageToRedisOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<String> lines = p.apply(
"ReadLines", TextIO.read().from(options.getInputFile()));
/**
* Flushing the Memorystore if there are records in the input file
*/
lines.apply("Checking Data in input file", Count.globally())
.apply("Flushing the data store", FlushingMemorystore.read()
.withConnectionConfiguration(RedisConnectionConfiguration
.create(options.getRedisHost(), options.getRedisPort())));
Code snippet for the processed data to be inserted after clearing the cache:
dataset.apply(SOME_DATASET_TRANSFORMATION, RedisIO.write()
.withMethod(RedisIO.Write.Method.SADD)
.withConnectionConfiguration(RedisConnectionConfiguration
.create(options.getRedisHost(), options.getRedisPort())));
The dataflow executes fine and it flushes the memorystore as well but the insertion is not working after that. Could you please point out where I am going wrong? Any solution for resolving the issue is truly appreciated. Thanks in advance!
Edit:
Providing additional information as requested in the comments
The runtime used is Java 11, and it is using Apache Beam SDK for 2.24.0
If the input file has records, it will process the data with some logic. For example, if the input file has data like:
abcabc|Bruce|Wayne|2000
abbabb|Tony|Stark|3423
The dataflow will count the number of records which 2 in this case and will process the id, first name, etc. according to the logic, and then it stores in memorystore. This input file will be coming everyday hence, the memorystore should be cleared (or flushed) if the input file has records.
Although the pipeline is not breaking, but I think I am missing out something.
I suspect the problem here is that you need to ensure the "Flush" step runs (and completes) before the RedisIO.write step happens. Beam has a Wait.on transform that you can use for this.
To accomplish this, we can use the output from the flushing PTransform as a signal that we've flushed the database - and we only write to the database after we are done flushing. The process
call for your flushing DoFn would look like this:
@ProcessElement
public void processElement(DoFn<Long, String>.ProcessContext c) {
Long count = c.element();
if(count==null && count < 0) {
LOGGER.info("No Records are there in the input file");
} else {
if (pipeline.isInMulti()) {
pipeline.exec();
pipeline.sync();
jedis.flushDB();
}
LOGGER.info("*****The memorystore is flushed*****");
}
c.output("READY");
}
Once we have a signal pointing that the database has been flushed, we can use it to wait before writing the new data to it:
Pipeline p = Pipeline.create(options);
PCollection<String> lines = p.apply(
"ReadLines", TextIO.read().from(options.getInputFile()));
/**
* Flushing the Memorystore if there are records in the input file
*/
PCollection<String> flushedSignal = lines
.apply("Checking Data in input file", Count.globally())
.apply("Flushing the data store", FlushingMemorystore.read()
.withConnectionConfiguration(RedisConnectionConfiguration
.create(options.getRedisHost(), options.getRedisPort())));
// Then we use the flushing signal to start writing to Redis:
dataset
.apply(Wait.on(flushedSignal))
.apply(SOME_DATASET_TRANSFORMATION, RedisIO.write()
.withMethod(RedisIO.Write.Method.SADD)
.withConnectionConfiguration(RedisConnectionConfiguration
.create(options.getRedisHost(), options.getRedisPort())));