0

I have a quite large file(> 15 GB)(never mind what kind of file). I have to read file, do some processing with data, then write processed data to a blank file. I do it in chunks. Each chunk contains a header of some sort, followed by the data. The simplest file of multiple chunks would contain:

Number of block bytes
Block bytes
Number of block bytes
Block bytes

So, I create one thread for reading file by chunks, some threads for processing each read chunk, and one thread for writing by chunks processed data.

And i have a sort of problem with managing that threads.

I don't know the order in which each chunk will have been processed, though i must to write chunks to file in order like it has been read.

So, my question is what kind of approach i have to use for manage that multithreaded processing.

I guess, It might be better, If i use producer concumer pattern. What a data structure is best to use in that case for storing the data already has been processed? I have one guesses - an stack based on an array, that i need sort once before start writing.

But I'm not sure. So, please help me with an approach.

//sample of my code, but without any logic of threads managing

public class DataBlock
{
    public byte[] Data { get; }
    public long Index { get; }

    public DataBlock(byte[] data, long index)
    {
        this.Data = data;
        this.Index = index;
    }
}


int bufferSize = 1024*64; //65536
long processedBlockCounter = 0L;
MyStack<DataBlock> processedBlockStore = new MyStack<DataBlock>();

using (FileStream fs = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize))
{
    using (BufferedStream bs = new BufferedStream(fs, bufferSize))
    {
        byte[] buffer = new byte[bufferSize];
        int byteRead;
        while ((byteRead = bs.Read(buffer, 0, bufferSize)) > 0)
        {
            byte[] originalBytes;
            using (MemoryStream mStream = new MemoryStream())
            {
                mStream.Write(buffer, 0, byteRead);
                originalBytes = mStream.ToArray();
            }

            long dataBlockIndex = Interlocked.Increment(ref processedBlockCounter);

            Thread processThread = new Thread(() =>
            {
                byte[] processedBytes = MyProcessor.Process(originalBytes);
                DataBlock processedBlock = new DataBlock(processedBytes, processedBlockCounter);
                lock(processedBlockStore)
                {
                     processedBlockStore.Add(processedBlock);
                }
            });
            processThread.Start();
        }
    }
}
isxaker
  • 8,446
  • 12
  • 60
  • 87
  • Is it an option to add an _index_ to structure you use to keep chunk data in memory? Reader will fill it sequentially and writer will wait for _right_ next chunk to write. As data structure I'd simply use a blocking sorted collection. – Adriano Repetti Jan 29 '16 at 09:48
  • @AdrianoRepetti My processing finishs without any order, so i need to sort before writing and that's why i store index. What complexity of adding and removing from blocking sorted collection? Does it sort elements after each adding? – isxaker Jan 29 '16 at 09:58
  • Is your processing time consuming? Or it would finish very quick? – Sriram Sakthivel Jan 29 '16 at 10:02
  • It depends how it's implemented.In general I'd use a single linked list but it depends on specific time scenario. Even an unsorted collection + a support variable to store latest received packet index may work (in this case you have better insertion performance but much worse writing performance, which BTW involving I/O I wouldn't care). – Adriano Repetti Jan 29 '16 at 10:04
  • @SriramSakthivel Certainly, it would be better if processing finishes quite quickly =) – isxaker Jan 29 '16 at 10:05
  • @AdrianoRepetti You mean, that time cost of I/O operation is much more than any data structure performance? – isxaker Jan 29 '16 at 10:13
  • please, take a look to this post: http://stackoverflow.com/questions/2161895/reading-large-text-files-with-streams-in-c-sharp – Geoffrey Jan 29 '16 at 10:23
  • @Geoffrey I've already seen that, but thanks ;-) – isxaker Jan 29 '16 at 10:27
  • @isxaker no, it would be a too general assertion. However a small optimization (for example sorted linked list Vs unsorted vector with sorting when writing) _may_ not be noticeable with intensive I/O (but it depends on many factors...concurrency level, list size and so on). – Adriano Repetti Jan 29 '16 at 10:33

1 Answers1

2

You're creating new thread for each iteration. That isn't going to scale. I'd recommend you to use ThreadPool instead. Preferred way is to use TPL which internally uses ThreadPool.

Since you need ordering and parallel processing and they doesn't go hand in hand, you can either make your code completely synchronous if that's an option.

If you need to process in parallel I'd recommend the following Fork-Join strategy given that your file is larger than 15 GB and your processing is time consuming too.

  • Chunkify your file
  • Start a Task with each chunk
  • Make each task write the output to a temporary file named index. 1.txt, 2.txt etc
  • Wait for all Tasks to complete
  • Finally read those temporary files and create your output file in order.
  • Then of course delete those temporary files. You're done.
Sriram Sakthivel
  • 72,067
  • 7
  • 111
  • 189