3

I am trying to aggregate data from 2 files, so I decided to send the data via separate writer processes to a named fifo, and launched a separate reader process to read and process the aggregated data. All reading/writing is happening on a ramdisk (/dev/shm) which is conveniently large around 100 Gigabytes.

This works file and I ensured that each of data line being written to the fifo is less than 512 bytes so pipe can retain its atomic behavior.

But after trying out multiple runs I have come to observe that reader process is receiving overlapped output and this starts to happen when I try to pipe around more than 10 million lines from each process. Each of my data lines is terminated with a new line.

I am opening fifo in "+< fifo" mode to read and ">> fifo" to write. Not using syscalls here, just using normal open to get a file handle and trying to process the data line by line.

How can I start to investigate this. Any ideas?

Many thanks.

update as on 2019/APR/29:

Note that my loops are now using syscalls. Previously I was not using them but decided to use them eventually.

The same thing can also be achieved by having 2 processes write to a single file but one needs to take caution as this will only work on POSIX compliant file systems OR if one doesn't have that - one could keep the log file (where multiple processes will perform write) in a RAMDISK as it also works. NFS drives are out of scope as it is not POSIX compliant and this technique does not work on it.

So if we talk about FIFO vs a textfile - multiple processes reading/writing to a file is faster than multiple processes reading/writing to a FIFO.

Just for the upcoming readers, here is my writer & reader process code. How you design your code to incorporate these sub-routines is up to you. Plenty of ways to do it.

Hope it was useful.

writer process

  write_log => sub {
    my ($filehandle, $log_message) = @_;
    select $filehandle ; $|++;
    syswrite ($filehandle, $log_message, length($log_message))
      or die "write_log: syswrite fail!\n";
  },

reader process:

  read_log => sub
  {
    # In my endless reading loop,
    # if I detect keyword END 2 times (as 
    # i have 2 processes), I exit the reading loop
    # and do further operations.
    #
    my ($end_check_value) = @_;

    sysopen (FH,$logfile, O_CREAT|O_RDONLY)
      or die "($$) read_log: Failed to sysopen\n";

    my ($h, $end) = (undef,0);

    select FH ; $|++ ;

    print STDOUT get_ts().'|'."($$) read_log: now tailing logfile with check count $end_check_value\n";

    for (;;)
    {
      while (my $line = <FH>)
      {
        chomp $line;
        $end++ if $line =~ m/END/g;
        last if $end == $end_check_value;
        my $key = (split(/\s/,$line))[0];
        $h->{$key}++;
      }

      sleep(1) ; seek (FH,0,1);

      # break out of for loop if we
      # have collected the 'END' tags
      # from all worker processes
      if ($end == $end_check_value)
      {
        print STDOUT get_ts().'|'."($$) read_log: breaking for loop ",
                     "with end_check: $end_check_value\n";
        last;
      }
    } close (FH);
  },

Performance Stats:

Here is the performance stats for multiple processes writing to a single file on RAMDISK. On avrage it takes about 10 minutes plus minus 20 seconds to write 150,000,000 lines (150 mn) and read then into a hash.

test string is 238 bytes long
20190429-12:34:50.637|(11139) PARENT: each child will write (75000000) to (/dev/shm/multi_proc_test_logfile.log)
20190429-12:34:54.399|(11139) trunc_log_file: truncated (/dev/shm/multi_proc_test_logfile.log)
20190429-12:34:54.399|(11149) process no. (2) launched!
20190429-12:34:54.399|(11150) process no. (1) launched!
20190429-12:34:55.400|(11139) read_log: now tailing logfile with check count 2
20190429-12:44:21.565|(11150) process exiting with status code 0
20190429-12:44:34.164|(11149) process exiting with status code 0
20190429-12:45:03.956|(11139) read_log: breaking for loop with end_check: 2
20190429-12:45:03.957|(11139) read_log: Collected counts:
(11139) (11149):75000000
(11139) (11150):75000000
---------------
(11139) Finished!

real    **10m13.466s**
user    9m31.627s
sys     0m39.650s

Here is the performance stats for FIFO where multiple processes write 25,000,000 lines each to the FIFO and reader process read them back into a hash. On average it took about 25-30 minutes. Its slower than processes writing to a file.

test string is 141 bytes long
20190426-10:25:13.455|28342|2-test-fifo.pl: Starting..
20190426-10:25:13.456|28345|CHILD starting (read_and_hash)
20190426-10:25:13.456|28345|READ_AND_HASH now hashing files
20190426-10:25:14.458|28346|CHILD starting (s1_data_gather)
20190426-10:25:14.458|28346|Working on sit1 data..
20190426-10:25:14.458|28347|CHILD starting (s2_data_gather)
20190426-10:25:14.458|28347|Working on sit2 data..
20190426-10:48:48.454|28346|Finished working on S1 data..
20190426-10:48:48.457|28342|Reaped 28346
20190426-10:48:48.462|28345|read LAST line from S2 data
20190426-10:48:52.657|28347|Finished working on s2 data..
20190426-10:48:52.660|28342|Reaped 28347
20190426-10:48:52.669|28345|read LAST line from S2 data
20190426-10:48:53.130|28345|READ_AND_HASH finished hashing files
(read_n_hash): finished hashing. keys count
        s1 = 25000000
        s2 = 25000000
20190426-10:48:53.130|28345|starting comparison. doing source to target
20190426-10:49:49.566|28345|finished comparing source to target. now comparing target to source
20190426-10:50:45.578|28345|comparing target to source ends. finished
20190426-10:51:57.220|28342|Reaped 28345
20190426-10:51:57.220|28342|2-test-fifo.pl: Ending..
User9102d82
  • 1,172
  • 9
  • 19

2 Answers2

1

You may have to turn autoflush on for the file to which you are writing. If you're opening the files using the open() function rather than via an OO interface like IO::File, then after you succeed in opening the file (as $fifo, say), you need code like so.

select $fifo;
$| = 1;

Note that select() chooses the output filehandle for prints and such like that don't specify a particular filehandle. If you want to revert back to targeting STDOUT, then select STDOUT after the above, or, to be pedantic:

my $oldfh = select $fifo;
$| = 1;
select $oldfh;

I don't think the file modes ('+<' etc) have anything to do with it, since concepts like "clobbering" and "appending" don't apply to FIFOs. You'd probably do just as well with simple ">" and "<".

TFBW
  • 989
  • 7
  • 12
  • I will try it out and come back with results. If it works, I shall accept your answer as solution. One thing - opening with '<' mode seems not to work. It stops reading after reading one line. – User9102d82 Apr 25 '19 at 15:27
  • Ah—if you read the file in '<' mode, I think you will encounter EOF each time there is no more data in the pipe. You can reset that condition with `seek($fh, 0, 1);` but perhaps the '+<' mode is a quirky way of avoiding EOF conditions. The last time I had a job like this I used datagram-based sockets instead of a FIFO, so none of this quirkiness was an issue. Maybe you could consider that as an alternative approach. – TFBW Apr 25 '19 at 15:41
  • Re "*I think you will encounter EOF each time there is no more data in the pipe.*", That's not true. The read will block (or return error `EAGAIN` or `EWOULDBLOCK` on a nonblocking handle). It does return EOF when the writer closes the handle though. – ikegami Apr 25 '19 at 15:55
  • Yeah, I'm thinking of the case where you're chasing the tail of a real file. But do the writers close the FIFO between writes? It's a named FIFO, so such behaviour is possible. We'd need to see the code. – TFBW Apr 25 '19 at 16:07
  • select on file handle worked like a charm. Now there are no more overlapping data lines in the output. Thanks for sharing your knowledge. I really appreciate that. Answer accepted and up-voted as it was the right thing to do. Cheers. – User9102d82 Apr 29 '19 at 11:49
0

It's possible that what you are seeing here is a simple product of concurrency. You are assuming that the reader pulls data out of the FIFO in a timely manner. What if both writers have the opportunity to write several records before the reader gets another shot at reading? What if the FIFO reaches capacity part-way through a write? The writer will block part way through a write, and then the reader will have an opportunity to empty the queue, but there's no guarantee that the writer which wrote the partial line will be the next one to write. This would result in interleaved lines.

If my answer regarding autoflush does not solve your problem, you may have to consider the possibility that writes are being interleaved in this manner.

As mentioned in a comment above, you may be better off using datagram sockets (SOCK_DGRAM) rather than a FIFO. In this way, each message is an atomic unit with no chance of interleaving.

TFBW
  • 989
  • 7
  • 12