I have a quite large file(> 15 GB
)(never mind what kind of file).
I have to read file, do some processing with data, then write processed data to a blank file.
I do it in chunks. Each chunk contains a header of some sort, followed by the data. The simplest file of multiple chunks would contain:
Number of block bytes
Block bytes
Number of block bytes
Block bytes
So, I create one thread for reading file by chunks, some threads for processing each read chunk, and one thread for writing by chunks processed data.
And i have a sort of problem with managing that threads.
I don't know the order in which each chunk will have been processed, though i must to write chunks to file in order like it has been read.
So, my question is what kind of approach i have to use for manage that multithreaded processing.
I guess, It might be better, If i use producer concumer pattern. What a data structure is best to use in that case for storing the data already has been processed? I have one guesses - an stack based on an array, that i need sort once before start writing.
But I'm not sure. So, please help me with an approach.
//sample of my code, but without any logic of threads managing
public class DataBlock
{
public byte[] Data { get; }
public long Index { get; }
public DataBlock(byte[] data, long index)
{
this.Data = data;
this.Index = index;
}
}
int bufferSize = 1024*64; //65536
long processedBlockCounter = 0L;
MyStack<DataBlock> processedBlockStore = new MyStack<DataBlock>();
using (FileStream fs = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize))
{
using (BufferedStream bs = new BufferedStream(fs, bufferSize))
{
byte[] buffer = new byte[bufferSize];
int byteRead;
while ((byteRead = bs.Read(buffer, 0, bufferSize)) > 0)
{
byte[] originalBytes;
using (MemoryStream mStream = new MemoryStream())
{
mStream.Write(buffer, 0, byteRead);
originalBytes = mStream.ToArray();
}
long dataBlockIndex = Interlocked.Increment(ref processedBlockCounter);
Thread processThread = new Thread(() =>
{
byte[] processedBytes = MyProcessor.Process(originalBytes);
DataBlock processedBlock = new DataBlock(processedBytes, processedBlockCounter);
lock(processedBlockStore)
{
processedBlockStore.Add(processedBlock);
}
});
processThread.Start();
}
}
}