I am trying to aggregate data from 2 files, so I decided to send the data via separate writer processes to a named fifo, and launched a separate reader process to read and process the aggregated data. All reading/writing is happening on a ramdisk (/dev/shm) which is conveniently large around 100 Gigabytes.
This works file and I ensured that each of data line being written to the fifo is less than 512 bytes so pipe can retain its atomic behavior.
But after trying out multiple runs I have come to observe that reader process is receiving overlapped output and this starts to happen when I try to pipe around more than 10 million lines from each process. Each of my data lines is terminated with a new line.
I am opening fifo in "+< fifo" mode to read and ">> fifo" to write. Not using syscalls here, just using normal open to get a file handle and trying to process the data line by line.
How can I start to investigate this. Any ideas?
Many thanks.
update as on 2019/APR/29:
Note that my loops are now using syscalls. Previously I was not using them but decided to use them eventually.
The same thing can also be achieved by having 2 processes write to a single file but one needs to take caution as this will only work on POSIX compliant file systems OR if one doesn't have that - one could keep the log file (where multiple processes will perform write) in a RAMDISK as it also works. NFS drives are out of scope as it is not POSIX compliant and this technique does not work on it.
So if we talk about FIFO vs a textfile - multiple processes reading/writing to a file is faster than multiple processes reading/writing to a FIFO.
Just for the upcoming readers, here is my writer & reader process code. How you design your code to incorporate these sub-routines is up to you. Plenty of ways to do it.
Hope it was useful.
writer process
write_log => sub {
my ($filehandle, $log_message) = @_;
select $filehandle ; $|++;
syswrite ($filehandle, $log_message, length($log_message))
or die "write_log: syswrite fail!\n";
},
reader process:
read_log => sub
{
# In my endless reading loop,
# if I detect keyword END 2 times (as
# i have 2 processes), I exit the reading loop
# and do further operations.
#
my ($end_check_value) = @_;
sysopen (FH,$logfile, O_CREAT|O_RDONLY)
or die "($$) read_log: Failed to sysopen\n";
my ($h, $end) = (undef,0);
select FH ; $|++ ;
print STDOUT get_ts().'|'."($$) read_log: now tailing logfile with check count $end_check_value\n";
for (;;)
{
while (my $line = <FH>)
{
chomp $line;
$end++ if $line =~ m/END/g;
last if $end == $end_check_value;
my $key = (split(/\s/,$line))[0];
$h->{$key}++;
}
sleep(1) ; seek (FH,0,1);
# break out of for loop if we
# have collected the 'END' tags
# from all worker processes
if ($end == $end_check_value)
{
print STDOUT get_ts().'|'."($$) read_log: breaking for loop ",
"with end_check: $end_check_value\n";
last;
}
} close (FH);
},
Performance Stats:
Here is the performance stats for multiple processes writing to a single file on RAMDISK. On avrage it takes about 10 minutes plus minus 20 seconds to write 150,000,000 lines (150 mn) and read then into a hash.
test string is 238 bytes long
20190429-12:34:50.637|(11139) PARENT: each child will write (75000000) to (/dev/shm/multi_proc_test_logfile.log)
20190429-12:34:54.399|(11139) trunc_log_file: truncated (/dev/shm/multi_proc_test_logfile.log)
20190429-12:34:54.399|(11149) process no. (2) launched!
20190429-12:34:54.399|(11150) process no. (1) launched!
20190429-12:34:55.400|(11139) read_log: now tailing logfile with check count 2
20190429-12:44:21.565|(11150) process exiting with status code 0
20190429-12:44:34.164|(11149) process exiting with status code 0
20190429-12:45:03.956|(11139) read_log: breaking for loop with end_check: 2
20190429-12:45:03.957|(11139) read_log: Collected counts:
(11139) (11149):75000000
(11139) (11150):75000000
---------------
(11139) Finished!
real **10m13.466s**
user 9m31.627s
sys 0m39.650s
Here is the performance stats for FIFO where multiple processes write 25,000,000 lines each to the FIFO and reader process read them back into a hash. On average it took about 25-30 minutes. Its slower than processes writing to a file.
test string is 141 bytes long
20190426-10:25:13.455|28342|2-test-fifo.pl: Starting..
20190426-10:25:13.456|28345|CHILD starting (read_and_hash)
20190426-10:25:13.456|28345|READ_AND_HASH now hashing files
20190426-10:25:14.458|28346|CHILD starting (s1_data_gather)
20190426-10:25:14.458|28346|Working on sit1 data..
20190426-10:25:14.458|28347|CHILD starting (s2_data_gather)
20190426-10:25:14.458|28347|Working on sit2 data..
20190426-10:48:48.454|28346|Finished working on S1 data..
20190426-10:48:48.457|28342|Reaped 28346
20190426-10:48:48.462|28345|read LAST line from S2 data
20190426-10:48:52.657|28347|Finished working on s2 data..
20190426-10:48:52.660|28342|Reaped 28347
20190426-10:48:52.669|28345|read LAST line from S2 data
20190426-10:48:53.130|28345|READ_AND_HASH finished hashing files
(read_n_hash): finished hashing. keys count
s1 = 25000000
s2 = 25000000
20190426-10:48:53.130|28345|starting comparison. doing source to target
20190426-10:49:49.566|28345|finished comparing source to target. now comparing target to source
20190426-10:50:45.578|28345|comparing target to source ends. finished
20190426-10:51:57.220|28342|Reaped 28345
20190426-10:51:57.220|28342|2-test-fifo.pl: Ending..
You may have to turn autoflush on for the file to which you are writing. If you're opening the files using the open() function rather than via an OO interface like IO::File, then after you succeed in opening the file (as $fifo, say), you need code like so.
select $fifo;
$| = 1;
Note that select() chooses the output filehandle for prints and such like that don't specify a particular filehandle. If you want to revert back to targeting STDOUT, then select STDOUT
after the above, or, to be pedantic:
my $oldfh = select $fifo;
$| = 1;
select $oldfh;
I don't think the file modes ('+<' etc) have anything to do with it, since concepts like "clobbering" and "appending" don't apply to FIFOs. You'd probably do just as well with simple ">" and "<".