I have a file with 500.000.000 lines.
The lines are string of max 10 characters.
How can I process this file using multi threading and in batches of 100?
I have a file with 500.000.000 lines.
The lines are string of max 10 characters.
How can I process this file using multi threading and in batches of 100?
Using MoreLinq's Batch
method, this will create a collection of IEnumerable<string>
which will contain the line batch size of 100, it will spin a new task for every 100 lines.
This is a basic implementation, it might be wise to use a Semaphore
to only run a certain amount of tasks at any given time, and also seeing what overhead File.ReadAllLines
will have on performance with 500,000,000 lines.
public class FileProcessor
{
public async Task ProcessFile()
{
List<Task> tasks = new List<Task>();
var lines = File.ReadAllLines("File.txt").Batch(100);
foreach (IEnumerable<string> linesBatch in lines)
{
IEnumerable<string> localLinesBatch = linesBatch;
Task task = Task.Factory.StartNew(() =>
{
// Perform operation on localLinesBatch
});
tasks.Add(task);
}
await Task.WhenAll(tasks);
}
}
public static class LinqExtensions
{
public static IEnumerable<IEnumerable<TSource>> Batch<TSource>(
this IEnumerable<TSource> source, int size)
{
TSource[] bucket = null;
var count = 0;
foreach (var item in source)
{
if (bucket == null)
bucket = new TSource[size];
bucket[count++] = item;
if (count != size)
continue;
yield return bucket;
bucket = null;
count = 0;
}
if (bucket != null && count > 0)
yield return bucket.Take(count);
}
}
Using additional libraries is not required if you use Parallel.ForEach
from built-in TPL and write a couple of enumerators (listed below). Your code can look like this:
using (var input = new StreamReader(File.OpenRead(@"c:\path\to\my\file.txt")))
{
Parallel.ForEach(
input.ReadLines().TakeChunks(100),
new ParallelOptions() { MaxDegreeOfParallelism = 8 /* better be number of CPU cores */ },
batchOfLines => {
DoMyProcessing(batchOfLines);
});
}
for this to work, you need a couple of extension methods on IEnumerable<T>
and a couple of enumerators, defined as follows:
public static class EnumerableExtensions
{
public static IEnumerable<string> ReadLines(this StreamReader input)
{
return new LineReadingEnumerable(input);
}
public static IEnumerable<IReadOnlyList<T>> TakeChunks<T>(this IEnumerable<T> source, int length)
{
return new ChunkingEnumerable<T>(source, length);
}
public class LineReadingEnumerable : IEnumerable<string>
{
private readonly StreamReader _input;
public LineReadingEnumerable(StreamReader input)
{
_input = input;
}
public IEnumerator<string> GetEnumerator()
{
return new LineReadingEnumerator(_input);
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
public class LineReadingEnumerator : IEnumerator<string>
{
private readonly StreamReader _input;
private string _current;
public LineReadingEnumerator(StreamReader input)
{
_input = input;
}
public void Dispose()
{
_input.Dispose();
}
public bool MoveNext()
{
_current = _input.ReadLine();
return (_current != null);
}
public void Reset()
{
throw new NotSupportedException();
}
public string Current
{
get { return _current; }
}
object IEnumerator.Current
{
get { return _current; }
}
}
public class ChunkingEnumerable<T> : IEnumerable<IReadOnlyList<T>>
{
private readonly IEnumerable<T> _inner;
private readonly int _length;
public ChunkingEnumerable(IEnumerable<T> inner, int length)
{
_inner = inner;
_length = length;
}
public IEnumerator<IReadOnlyList<T>> GetEnumerator()
{
return new ChunkingEnumerator<T>(_inner.GetEnumerator(), _length);
}
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
}
public class ChunkingEnumerator<T> : IEnumerator<IReadOnlyList<T>>
{
private readonly IEnumerator<T> _inner;
private readonly int _length;
private IReadOnlyList<T> _current;
private bool _endOfInner;
public ChunkingEnumerator(IEnumerator<T> inner, int length)
{
_inner = inner;
_length = length;
}
public void Dispose()
{
_inner.Dispose();
_current = null;
}
public bool MoveNext()
{
var currentBuffer = new List<T>();
while (currentBuffer.Count < _length && !_endOfInner)
{
if (!_inner.MoveNext())
{
_endOfInner = true;
break;
}
currentBuffer.Add(_inner.Current);
}
if (currentBuffer.Count > 0)
{
_current = currentBuffer;
return true;
}
_current = null;
return false;
}
public void Reset()
{
_inner.Reset();
_current = null;
_endOfInner = false;
}
public IReadOnlyList<T> Current
{
get
{
if (_current != null)
{
return _current;
}
throw new InvalidOperationException();
}
}
object IEnumerator.Current
{
get
{
return this.Current;
}
}
}
}