javajunitsonarqubegoogle-cloud-dataflowgoogle-cloud-memorystore

Google Cloud Dataflow's Code Coverage on the new code is not passing the threshold in SonarQube after Java 17 upgrade


Recently, I have upgraded my cloud dataflow application from Java 11 to Java 17 and its corresponding dependencies. The application works fine and even the test cases work fine. I have also upgraded my apache beam version from 2.35.0 to 2.49.0.

However, in one of the custom classes, RedisWriteIO, there are some changes and now the tests are not passing in the new code coverage.

RedisWriteIO

package com.example.dataflow.io.redis;

import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

public class RedisWriteIO {

    public static Write write() {
        return (new AutoValue_RedisWriteIO_Write.Builder())
                .setConnectionConfiguration(CustomRedisConfigurations.create()).build();
    }

    @AutoValue
    public abstract static class Write extends PTransform<PCollection<KV<String,String>>, PDone> {
        public Write() {
        }

        @Nullable
        abstract CustomRedisConfigurations connectionConfiguration();

        @Nullable
        abstract Long expireTime();

        abstract Builder toBuilder();

        public Write withEndpoint(String host, int port) {
            Preconditions.checkArgument(host != null, "host can not be null");
            Preconditions.checkArgument(port > 0, "port can not be negative or 0");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build();
        }

        public Write withAuth(String auth) {
            Preconditions.checkArgument(auth != null, "auth can not be null");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build();
        }

        public Write withTimeout(int timeout) {
            Preconditions.checkArgument(timeout >= 0, "timeout can not be negative");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build();
        }

        public Write withConnectionConfiguration(CustomRedisConfigurations connection) {
            Preconditions.checkArgument(connection != null, "connection can not be null");
            return this.toBuilder().setConnectionConfiguration(connection).build();
        }

        public Write withExpireTime(Long expireTimeMillis) {
            Preconditions.checkArgument(expireTimeMillis != null, "expireTimeMillis can not be null");
            Preconditions.checkArgument(expireTimeMillis > 0L, "expireTimeMillis can not be negative or 0");
            return this.toBuilder().setExpireTime(expireTimeMillis).build();
        }

        public PDone expand(PCollection<KV<String, String>> input) {
            Preconditions.checkArgument(this.connectionConfiguration() != null, "withConnectionConfiguration() is required");
            input.apply(ParDo.of(new WriteFn(this)));
            return PDone.in(input.getPipeline());
        }

        private static class WriteFn extends DoFn<KV<String, String>, Void>{
            private static final int DEFAULT_BATCH_SIZE = 1000;
            private final RedisWriteIO.Write spec;
            private transient Jedis jedis;
            private transient @Nullable Transaction transaction;

            private int batchCount;

            public WriteFn(RedisWriteIO.Write spec) {
                this.spec = spec;
            }

            @Setup
            public void setup() {
                jedis = spec.connectionConfiguration().connect();
            }

            @StartBundle
            public void startBundle() {
                transaction = jedis.multi();
                batchCount = 0;
            }
            @ProcessElement
            public void processElement(DoFn<KV<String, String>, Void>.ProcessContext c) {

                KV<String, String> record = c.element();

                String fieldKey = record.getKey();
                String fieldValue = record.getValue();

                transaction.sadd(fieldKey,fieldValue);

                batchCount++;

                if (batchCount >= DEFAULT_BATCH_SIZE) {
                    transaction.exec();
                    transaction.multi();
                    batchCount = 0;
                }
            }

            @FinishBundle
            public void finishBundle() {
                if (batchCount > 0) {
                    transaction.exec();
                }
                if (transaction != null) {
                    transaction.close();
                }
                transaction = null;
                batchCount = 0;
            }

            @Teardown
            public void teardown() {
                jedis.close();
            }
        }

        @AutoValue.Builder
        abstract static class Builder {
            Builder() {
            }

            abstract Builder setConnectionConfiguration(CustomRedisConfigurations connectionConfiguration);

            abstract Builder setExpireTime(Long expireTimeMillis);

            abstract Write build();

        }
    }
}

The test class is as follows:

package com.example.dataflow.io.redis;

import com.github.fppt.jedismock.RedisServer;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.junit.*;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;

import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.*;


public class RedisWriteIOTest {

    private static final String REDIS_HOST = "localhost";
    private static final String[] INPUT_DATA = new String[]{
            "123456789",
            "Bruce",
            "Wayne"
    };

    @Mock
    static SSLSocketFactory socketFactory;
    private static RedisServer server;
    private static int port;

    @Mock
    private static Jedis jedis;

    @Mock
    private Transaction transaction;

    private int batchCount;

    @Rule
    public TestPipeline pipeline = TestPipeline.create();
    @Mock
    CustomRedisConfigurations connection;

    @Mock
    DoFn.OutputReceiver<KV<String, String>> out;

    @Before
    public void setUp() {
        MockitoAnnotations.openMocks(this);
        when(connection.connect()).thenReturn(jedis);
        when(jedis.multi()).thenReturn(transaction);
        batchCount = 0;
    }


    @BeforeClass
    public static void beforeClass() throws Exception {
        server = RedisServer.newRedisServer(8000);
        server.start();
        port = server.getBindPort();
        jedis = new Jedis(server.getHost(), server.getBindPort());
    }

    @AfterClass
    public static void afterClass() throws IOException {
        jedis.close();
        server.stop();
    }

    @Test
    public void WriteMemoryStoreWithEmptyAuth() {
        RedisWriteIO.write()
                .withEndpoint(REDIS_HOST, port).withAuth("");
    }

    @Test
    public void WriteMemoryStoreWithAuth() {
        RedisWriteIO.write()
                .withAuth("AuthString");
    }

    @Test
    public void WriteTimeOut() {
        RedisWriteIO.write()
                .withTimeout(10);
    }

    @Test
    public void WriteMemoryStoreWithExpireTime() {
        RedisWriteIO.Write write = RedisWriteIO.write();
        write = write.withExpireTime(1000L);
        assertNotNull(write);
    }

    @Test(expected = IllegalArgumentException.class)
    public void WriteMemoryStoreWithoutExpireTime() {
        RedisWriteIO.write()
                .withExpireTime(0L);
    }


    @Test(expected = IllegalArgumentException.class)
    public void WriteMemoryStoreWithNegativeExpireTime() {
        RedisWriteIO.write()
                .withExpireTime(-10L);
    }

    @Test
    public void WriteMemoryStoryWithConnectionConfiguration() {
        connection = CustomRedisConfigurations.create().withHost(REDIS_HOST).withPort(port);
        RedisWriteIO.Write write = RedisWriteIO.write()
                .withConnectionConfiguration(connection);
        assertNotNull(write);
    }

    @Test(expected = IllegalArgumentException.class)
    public void WriteMemoryStoryWithNullConnectionConfiguration() {
        RedisWriteIO.Write write = RedisWriteIO.write()
                .withConnectionConfiguration(null);
    }


    @Test
    public void testBatchProcessingWithTransactionExecuted() {
        RedisWriteIO.Write spec = RedisWriteIO.write().withConnectionConfiguration(connection);
        PCollection<String> flushFlag = pipeline.apply("Read File", TextIO.read().from("files/fileHavingFiveThousandRecords.txt"));

        List<KV<String, String>> recordEntries = new ArrayList<>();
        for (int i = 0; i <= 10000; i++) {
            // adding unique entries 10000 times
            recordEntries.add(KV.of("Bruce:Wayne" + i, "123456789" + i));
        }

        // outputData will be written to Redis (memorystore)
        PCollection<KV<String, String>> outputData = pipeline.apply(Create.of(recordEntries));

        outputData.apply("Waiting until clearing Redis database", Wait.on(flushFlag))
               .apply("Writing the data into Redis database", RedisWriteIO.write()
                    .withConnectionConfiguration(CustomRedisConfigurations
                            .create(REDIS_HOST, port)
                            .withTimeout(100)
                            .withAuth("credentials")
                            .enableSSL()));
        pipeline.run();

    }

}

RedisWriteIO is a utility class that would write the data from files into Redis database. It works as expected, and the test cases written are working as expected. However, the below block of code is not getting covered by SonarQube.

if (batchCount >= DEFAULT_BATCH_SIZE) {
     transaction.exec();
     transaction.multi();
     batchCount = 0;
}

When the file is having more than 1000 records, the above block should execute. It doesn't work in the test class. I have tried covering this block of code in the testBatchProcessingWithTransactionExecuted() method with a test file having 5000 records but still the block of code doesn't execute.

I need help in writing the test case covering all the lines.


Solution

  • I was able to write the test case for covering all the lines. I just increased the size of the list to 20000, by doing so, the RedisWriteIO class is functioning as intended to handle even larger datasets.

    The batchcount of 1000 acts as the threshold as specified by DEFAULT_BATCH_SIZE, which when it reaches that, then the transaction is executed(transaction.exec()) and a new transaction is started (transaction.multi()).

        @Test
        public void testBatchProcessingWithTransactionExecuted() {
            RedisWriteIO.Write spec = RedisWriteIO.write().withConnectionConfiguration(connection);
            PCollection<String> flushFlag = pipeline.apply("Read File", TextIO.read().from("files/fileHavingFiveThousandRecords.txt"));
    
            List<KV<String, String>> recordEntries = new ArrayList<>();
            for (int i = 0; i <= 20000; i++) {
                // adding unique entries 20000 times
                recordEntries.add(KV.of("Bruce:Wayne" + i, "123456789" + i));
            }
    
            // outputData will be written to Redis (memorystore)
            PCollection<KV<String, String>> outputData = pipeline.apply(Create.of(recordEntries));
    
            outputData.apply("Waiting until clearing Redis database", Wait.on(flushFlag))
                   .apply("Writing the data into Redis database", RedisWriteIO.write()
                        .withConnectionConfiguration(CustomRedisConfigurations
                                .create(REDIS_HOST, port)
                                .withTimeout(100)
                                .withAuth("credentials")
                                .enableSSL()));
            pipeline.run();
    
        }