I solve a task of counting unique lines in a text file. Each string is one valid ip-address. The file can be of any size (literally, hundreds and thousands of gigabytes are possible). I wrote a simple class that implements a bit array and using it for counting.
public class IntArrayBitCounter {
public static final long MIN_BIT_CAPACITY = 1L;
public static final long MAX_BIT_CAPACITY = 1L << 32;
private final int intArraySize;
private final int[] intArray;
private long counter;
public IntArrayBitCounter(long bitCapacity) {
if (bitCapacity < MIN_BIT_CAPACITY || bitCapacity > MAX_BIT_CAPACITY) {
throw new IllegalArgumentException("Capacity must be in range [1.." + MAX_BIT_CAPACITY + "].");
}
this.intArraySize = 1 + (int) ((bitCapacity - 1) >> 5);
this.intArray = new int[intArraySize];
}
private void checkBounds(long bitIndex) {
if (bitIndex < 0 || bitIndex > ((long) intArraySize << 5)) {
throw new IndexOutOfBoundsException("Bit index must be in range [0.." + (MAX_BIT_CAPACITY - 1) + "].");
}
}
public void setBit(long bitIndex) {
checkBounds(bitIndex);
int index = (int) (bitIndex >> 5);
int bit = 1 << (bitIndex & 31);
if ((intArray[index] & bit) == 0) {
counter++;
intArray[index] |= bit;
}
}
public boolean isBitSets(long bitIndex) {
checkBounds(bitIndex);
int index = (int) (bitIndex >> 5);
int bit = 1 << (bitIndex & 31);
return (intArray[index] & bit) != 0;
}
public int getIntArraySize() {
return intArraySize;
}
public long getBitCapacity() {
return (long) intArraySize << 5;
}
public long getCounter() {
return counter;
}
}
My simple single-threaded approach works well enough. It almost completely utilizes the reading speed of my old HDD which is approximately 130-135 MB/s. The System Monitor in Linux shows the reading from the disk to my program about 100-110 MB/s.
public class IpCounterApp {
private static long toLongValue(String ipString) throws UnknownHostException {
long result = 0;
for (byte b : InetAddress.getByName(ipString).getAddress())
result = (result << 8) | (b & 255);
return result;
}
public static void main(String[] args) {
long startTime = System.nanoTime();
String fileName = "src/test/resources/test.txt";
var counter = new IntArrayBitCounter(1L << 32);
long linesProcessed = 0;
try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileName))) {
String line;
while ((line = reader.readLine()) != null) {
counter.setBit(toLongValue(line));
linesProcessed++;
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.printf("%d unique lines in %d processed\n", counter.getCounter(), linesProcessed);
long elapsedTime = System.nanoTime() - startTime;
System.out.println("duration: " + elapsedTime / 1000000 + " milliseconds");
}
}
Then I tried to start reading from the disk and processing rows in two different threads in the hope of a slight improvement. I created a blocking queue. The first thread reads the lines and writes in this queue. The second thread reads out of the queue and makes counting. However, the speed of execution on the test file in 10_000_000 addresses of which 5_000_000 uniquely collapsed almost 2 times. The read speed also fell by half to 50-55 MB / s.
public class ConcurrentIpCounterApp {
public static void main(String[] args) {
long startTime = System.nanoTime();
String fileName = "src/test/resources/test.txt";
var stringsQueue = new ArrayBlockingQueue<String>(1024);
var reader = new BlockingQueueFileReader(stringsQueue, fileName);
var counter = new BlockingQueueCounter(stringsQueue);
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<Long> linesProcessed = executorService.submit(reader);
Future<Long> uniqueLines = executorService.submit(counter);
try {
System.out.printf("%d unique lines in %d processed\n", uniqueLines.get(), linesProcessed.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
long elapsedTime = System.nanoTime() - startTime;
System.out.println("duration: " + elapsedTime / 1000000 + " milliseconds");
}
}
public class BlockingQueueCounter implements Callable<Long> {
private final BlockingQueue<String> queue;
private final IntArrayBitCounter counter;
public BlockingQueueCounter(BlockingQueue<String> queue) {
this.queue = queue;
this.counter = new IntArrayBitCounter(1L << 32);
}
private static long toLongValue(String ipString) throws UnknownHostException {
long result = 0;
for (byte b : InetAddress.getByName(ipString).getAddress())
result = (result << 8) | (b & 255);
return result;
}
@Override
public Long call() {
String line;
while (true) {
try {
line = queue.take();
if ("EOF".equals(line)) {
break;
}
counter.setBit(toLongValue(line));
} catch (InterruptedException | UnknownHostException e) {
e.printStackTrace();
}
}
return counter.getCounter();
}
}
public class BlockingQueueFileReader implements Callable<Long> {
private final BlockingQueue<String> queue;
private final String fileName;
private long totalLines;
public BlockingQueueFileReader(BlockingQueue<String> queue, String fileName) {
this.queue = queue;
this.fileName = fileName;
}
@Override
public Long call() {
try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileName))) {
String line;
while ((line = reader.readLine()) != null) {
queue.put(line);
totalLines++;
}
queue.add("EOF");
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
return totalLines;
}
}
Please help me understand why this happens. I could not find the answer myself.
To answer the question why the multithreaded attempt works two times slower than singlethreaded,try to measure
I think that's where you get your answer.