Recently, I have upgraded my cloud dataflow application from Java 11 to Java 17 and its corresponding dependencies. The application works fine and even the test cases work fine. I have also upgraded my apache beam version from 2.35.0 to 2.49.0.
However, in one of the custom classes, RedisWriteIO
, there are some changes and now the tests are not passing in the new code coverage.
RedisWriteIO
package com.example.dataflow.io.redis;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
public class RedisWriteIO {
public static Write write() {
return (new AutoValue_RedisWriteIO_Write.Builder())
.setConnectionConfiguration(CustomRedisConfigurations.create()).build();
}
@AutoValue
public abstract static class Write extends PTransform<PCollection<KV<String,String>>, PDone> {
public Write() {
}
@Nullable
abstract CustomRedisConfigurations connectionConfiguration();
@Nullable
abstract Long expireTime();
abstract Builder toBuilder();
public Write withEndpoint(String host, int port) {
Preconditions.checkArgument(host != null, "host can not be null");
Preconditions.checkArgument(port > 0, "port can not be negative or 0");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build();
}
public Write withAuth(String auth) {
Preconditions.checkArgument(auth != null, "auth can not be null");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build();
}
public Write withTimeout(int timeout) {
Preconditions.checkArgument(timeout >= 0, "timeout can not be negative");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build();
}
public Write withConnectionConfiguration(CustomRedisConfigurations connection) {
Preconditions.checkArgument(connection != null, "connection can not be null");
return this.toBuilder().setConnectionConfiguration(connection).build();
}
public Write withExpireTime(Long expireTimeMillis) {
Preconditions.checkArgument(expireTimeMillis != null, "expireTimeMillis can not be null");
Preconditions.checkArgument(expireTimeMillis > 0L, "expireTimeMillis can not be negative or 0");
return this.toBuilder().setExpireTime(expireTimeMillis).build();
}
public PDone expand(PCollection<KV<String, String>> input) {
Preconditions.checkArgument(this.connectionConfiguration() != null, "withConnectionConfiguration() is required");
input.apply(ParDo.of(new WriteFn(this)));
return PDone.in(input.getPipeline());
}
private static class WriteFn extends DoFn<KV<String, String>, Void>{
private static final int DEFAULT_BATCH_SIZE = 1000;
private final RedisWriteIO.Write spec;
private transient Jedis jedis;
private transient @Nullable Transaction transaction;
private int batchCount;
public WriteFn(RedisWriteIO.Write spec) {
this.spec = spec;
}
@Setup
public void setup() {
jedis = spec.connectionConfiguration().connect();
}
@StartBundle
public void startBundle() {
transaction = jedis.multi();
batchCount = 0;
}
@ProcessElement
public void processElement(DoFn<KV<String, String>, Void>.ProcessContext c) {
KV<String, String> record = c.element();
String fieldKey = record.getKey();
String fieldValue = record.getValue();
transaction.sadd(fieldKey,fieldValue);
batchCount++;
if (batchCount >= DEFAULT_BATCH_SIZE) {
transaction.exec();
transaction.multi();
batchCount = 0;
}
}
@FinishBundle
public void finishBundle() {
if (batchCount > 0) {
transaction.exec();
}
if (transaction != null) {
transaction.close();
}
transaction = null;
batchCount = 0;
}
@Teardown
public void teardown() {
jedis.close();
}
}
@AutoValue.Builder
abstract static class Builder {
Builder() {
}
abstract Builder setConnectionConfiguration(CustomRedisConfigurations connectionConfiguration);
abstract Builder setExpireTime(Long expireTimeMillis);
abstract Write build();
}
}
}
The test class is as follows:
package com.example.dataflow.io.redis;
import com.github.fppt.jedismock.RedisServer;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.junit.*;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.*;
public class RedisWriteIOTest {
private static final String REDIS_HOST = "localhost";
private static final String[] INPUT_DATA = new String[]{
"123456789",
"Bruce",
"Wayne"
};
@Mock
static SSLSocketFactory socketFactory;
private static RedisServer server;
private static int port;
@Mock
private static Jedis jedis;
@Mock
private Transaction transaction;
private int batchCount;
@Rule
public TestPipeline pipeline = TestPipeline.create();
@Mock
CustomRedisConfigurations connection;
@Mock
DoFn.OutputReceiver<KV<String, String>> out;
@Before
public void setUp() {
MockitoAnnotations.openMocks(this);
when(connection.connect()).thenReturn(jedis);
when(jedis.multi()).thenReturn(transaction);
batchCount = 0;
}
@BeforeClass
public static void beforeClass() throws Exception {
server = RedisServer.newRedisServer(8000);
server.start();
port = server.getBindPort();
jedis = new Jedis(server.getHost(), server.getBindPort());
}
@AfterClass
public static void afterClass() throws IOException {
jedis.close();
server.stop();
}
@Test
public void WriteMemoryStoreWithEmptyAuth() {
RedisWriteIO.write()
.withEndpoint(REDIS_HOST, port).withAuth("");
}
@Test
public void WriteMemoryStoreWithAuth() {
RedisWriteIO.write()
.withAuth("AuthString");
}
@Test
public void WriteTimeOut() {
RedisWriteIO.write()
.withTimeout(10);
}
@Test
public void WriteMemoryStoreWithExpireTime() {
RedisWriteIO.Write write = RedisWriteIO.write();
write = write.withExpireTime(1000L);
assertNotNull(write);
}
@Test(expected = IllegalArgumentException.class)
public void WriteMemoryStoreWithoutExpireTime() {
RedisWriteIO.write()
.withExpireTime(0L);
}
@Test(expected = IllegalArgumentException.class)
public void WriteMemoryStoreWithNegativeExpireTime() {
RedisWriteIO.write()
.withExpireTime(-10L);
}
@Test
public void WriteMemoryStoryWithConnectionConfiguration() {
connection = CustomRedisConfigurations.create().withHost(REDIS_HOST).withPort(port);
RedisWriteIO.Write write = RedisWriteIO.write()
.withConnectionConfiguration(connection);
assertNotNull(write);
}
@Test(expected = IllegalArgumentException.class)
public void WriteMemoryStoryWithNullConnectionConfiguration() {
RedisWriteIO.Write write = RedisWriteIO.write()
.withConnectionConfiguration(null);
}
@Test
public void testBatchProcessingWithTransactionExecuted() {
RedisWriteIO.Write spec = RedisWriteIO.write().withConnectionConfiguration(connection);
PCollection<String> flushFlag = pipeline.apply("Read File", TextIO.read().from("files/fileHavingFiveThousandRecords.txt"));
List<KV<String, String>> recordEntries = new ArrayList<>();
for (int i = 0; i <= 10000; i++) {
// adding unique entries 10000 times
recordEntries.add(KV.of("Bruce:Wayne" + i, "123456789" + i));
}
// outputData will be written to Redis (memorystore)
PCollection<KV<String, String>> outputData = pipeline.apply(Create.of(recordEntries));
outputData.apply("Waiting until clearing Redis database", Wait.on(flushFlag))
.apply("Writing the data into Redis database", RedisWriteIO.write()
.withConnectionConfiguration(CustomRedisConfigurations
.create(REDIS_HOST, port)
.withTimeout(100)
.withAuth("credentials")
.enableSSL()));
pipeline.run();
}
}
RedisWriteIO
is a utility class that would write the data from files into Redis database. It works as expected, and the test cases written are working as expected. However, the below block of code is not getting covered by SonarQube.
if (batchCount >= DEFAULT_BATCH_SIZE) {
transaction.exec();
transaction.multi();
batchCount = 0;
}
When the file is having more than 1000 records, the above block should execute. It doesn't work in the test class. I have tried covering this block of code in the testBatchProcessingWithTransactionExecuted()
method with a test file having 5000 records but still the block of code doesn't execute.
I need help in writing the test case covering all the lines.
I was able to write the test case for covering all the lines. I just increased the size of the list to 20000, by doing so, the RedisWriteIO
class is functioning as intended to handle even larger datasets.
The batchcount of 1000 acts as the threshold as specified by DEFAULT_BATCH_SIZE
, which when it reaches that, then the transaction is executed(transaction.exec()
) and a new transaction is started (transaction.multi()
).
@Test
public void testBatchProcessingWithTransactionExecuted() {
RedisWriteIO.Write spec = RedisWriteIO.write().withConnectionConfiguration(connection);
PCollection<String> flushFlag = pipeline.apply("Read File", TextIO.read().from("files/fileHavingFiveThousandRecords.txt"));
List<KV<String, String>> recordEntries = new ArrayList<>();
for (int i = 0; i <= 20000; i++) {
// adding unique entries 20000 times
recordEntries.add(KV.of("Bruce:Wayne" + i, "123456789" + i));
}
// outputData will be written to Redis (memorystore)
PCollection<KV<String, String>> outputData = pipeline.apply(Create.of(recordEntries));
outputData.apply("Waiting until clearing Redis database", Wait.on(flushFlag))
.apply("Writing the data into Redis database", RedisWriteIO.write()
.withConnectionConfiguration(CustomRedisConfigurations
.create(REDIS_HOST, port)
.withTimeout(100)
.withAuth("credentials")
.enableSSL()));
pipeline.run();
}