-4

I have a file with 500.000.000 lines.

The lines are string of max 10 characters.

How can I process this file using multi threading and in batches of 100?

user2818430
  • 5,853
  • 21
  • 82
  • 148
  • For batch, have a look here http://stackoverflow.com/a/13731823/5062791 – ColinM Nov 25 '16 at 22:51
  • http://cc.davelozinski.com/c-sharp/the-fastest-way-to-read-and-process-text-files – Jeremy Thompson Nov 25 '16 at 23:00
  • 1
    Write codes and then compare them..... or better read SO docs how to ask a good question.... – L.B Nov 25 '16 at 23:02
  • 1
    Multi threading won't help you because I/O is the bottleneck and you have only one bus connection to the hard drive. I/O is serialized at the hardware level. Even if you have RAID it won't make any difference because I/O is much slower than CPU. I guess it could matter if the processing is so complex that it's comparable to I/O but I doubt that's the case. See https://stackoverflow.com/questions/902425/does-multithreading-make-sense-for-io-bound-operations – sashoalm Nov 25 '16 at 23:06
  • @sashoalm, it depends on what processing is done on the read lines. If the processing is CPU-intensive, then multiple threads can still be useful. In such case, with number of threads equal to number of CPU cores, this approach can achieve parallel processing effect. – felix-b Nov 25 '16 at 23:11

2 Answers2

2

Using MoreLinq's Batch method, this will create a collection of IEnumerable<string> which will contain the line batch size of 100, it will spin a new task for every 100 lines.

This is a basic implementation, it might be wise to use a Semaphore to only run a certain amount of tasks at any given time, and also seeing what overhead File.ReadAllLines will have on performance with 500,000,000 lines.

public class FileProcessor
{
    public async Task ProcessFile()
    {
        List<Task> tasks = new List<Task>();
        var lines = File.ReadAllLines("File.txt").Batch(100);
        foreach (IEnumerable<string> linesBatch in lines)
        {
            IEnumerable<string> localLinesBatch = linesBatch;
            Task task = Task.Factory.StartNew(() =>
            {
                // Perform operation on localLinesBatch
            });
            tasks.Add(task);
        }

        await Task.WhenAll(tasks);
    }
}

public static class LinqExtensions
{
    public static IEnumerable<IEnumerable<TSource>> Batch<TSource>(
              this IEnumerable<TSource> source, int size)
    {
        TSource[] bucket = null;
        var count = 0;

        foreach (var item in source)
        {
            if (bucket == null)
                bucket = new TSource[size];

            bucket[count++] = item;
            if (count != size)
                continue;

            yield return bucket;

            bucket = null;
            count = 0;
        }

        if (bucket != null && count > 0)
            yield return bucket.Take(count);
    }
}
ColinM
  • 2,622
  • 17
  • 29
  • my understanding is that File.ReadAllLines("File.txt") would load all into memory, which then you are batching into sizes for.. threads*, this is not the desired effect, this would load 500,000,000 at once. https://learn.microsoft.com/en-us/dotnet/api/system.io.file.readalllines?view=netframework-4.8 -> `Opens a text file, reads all lines of the file into a string array, and then closes the file.` – Seabizkit Dec 11 '19 at 11:04
1

Using additional libraries is not required if you use Parallel.ForEach from built-in TPL and write a couple of enumerators (listed below). Your code can look like this:

using (var input = new StreamReader(File.OpenRead(@"c:\path\to\my\file.txt")))
{
    Parallel.ForEach(
        input.ReadLines().TakeChunks(100),
        new ParallelOptions() { MaxDegreeOfParallelism = 8 /* better be number of CPU cores */ },
        batchOfLines => {
            DoMyProcessing(batchOfLines);
        });
}

for this to work, you need a couple of extension methods on IEnumerable<T> and a couple of enumerators, defined as follows:

public static class EnumerableExtensions
{
    public static IEnumerable<string> ReadLines(this StreamReader input)
    {
        return new LineReadingEnumerable(input);
    }

    public static IEnumerable<IReadOnlyList<T>> TakeChunks<T>(this IEnumerable<T> source, int length)
    {
        return new ChunkingEnumerable<T>(source, length);
    }

    public class LineReadingEnumerable : IEnumerable<string>
    {
        private readonly StreamReader _input;

        public LineReadingEnumerable(StreamReader input)
        {
            _input = input;
        }
        public IEnumerator<string> GetEnumerator()
        {
            return new LineReadingEnumerator(_input);
        }
        IEnumerator IEnumerable.GetEnumerator()
        {
            return GetEnumerator();
        }
    }

    public class LineReadingEnumerator : IEnumerator<string>
    {
        private readonly StreamReader _input;
        private string _current;

        public LineReadingEnumerator(StreamReader input)
        {
            _input = input;
        }
        public void Dispose()
        {
            _input.Dispose();
        }
        public bool MoveNext()
        {
            _current = _input.ReadLine();
            return (_current != null);
        }
        public void Reset()
        {
            throw new NotSupportedException();
        }
        public string Current
        {
            get { return _current; }
        }
        object IEnumerator.Current
        {
            get { return _current; }
        }
    }

    public class ChunkingEnumerable<T> : IEnumerable<IReadOnlyList<T>>
    {
        private readonly IEnumerable<T> _inner;
        private readonly int _length;

        public ChunkingEnumerable(IEnumerable<T> inner, int length)
        {
            _inner = inner;
            _length = length;
        }
        public IEnumerator<IReadOnlyList<T>> GetEnumerator()
        {
            return new ChunkingEnumerator<T>(_inner.GetEnumerator(), _length);
        }
        IEnumerator IEnumerable.GetEnumerator()
        {
            return this.GetEnumerator();
        }
    }

    public class ChunkingEnumerator<T> : IEnumerator<IReadOnlyList<T>>
    {
        private readonly IEnumerator<T> _inner;
        private readonly int _length;
        private IReadOnlyList<T> _current;
        private bool _endOfInner;

        public ChunkingEnumerator(IEnumerator<T> inner, int length)
        {
            _inner = inner;
            _length = length;
        }
        public void Dispose()
        {
            _inner.Dispose();
            _current = null;
        }
        public bool MoveNext()
        {
            var currentBuffer = new List<T>();

            while (currentBuffer.Count < _length && !_endOfInner)
            {
                if (!_inner.MoveNext())
                {
                    _endOfInner = true;
                    break;
                }

                currentBuffer.Add(_inner.Current);
            }

            if (currentBuffer.Count > 0)
            {
                _current = currentBuffer;
                return true;
            }

            _current = null;
            return false;
        }
        public void Reset()
        {
            _inner.Reset();
            _current = null;
            _endOfInner = false;
        }
        public IReadOnlyList<T> Current
        {
            get
            {
                if (_current != null)
                {
                    return _current;
                }

                throw new InvalidOperationException();
            }
        }
        object IEnumerator.Current
        {
            get
            {
                return this.Current;
            }
        }
    }
}
felix-b
  • 8,178
  • 1
  • 26
  • 36
  • One important thing to mention here is that `Parallel.ForEach` will return when the loop completes, in a long running operation it is possible that this may block the UI thread. One way to get past this is to spin off a new task on the initial calling method. – ColinM Dec 01 '16 at 09:24
  • Unless I'm missing something, this implementation of chunking isn't thread safe. – Eric J. Dec 12 '16 at 22:16
  • It is not thread safe @Eric J, and it doesn't have to be. Parallel.ForEach takes care of thread safety while partitioning the enumerable among parallel threads (look at System.Collections.Concurrent.Partitioner and its nested classes InternalPartitionEnumerable and InternalPartitionEnumerator). – felix-b Dec 13 '16 at 17:00
  • does `input.ReadLines().TakeChunks(100)` in the Parallel do 100 or the entire contents?, how is the contents of input being tracked to ensure that two threads don't do the same data or each MoveNext at the same time. – Seabizkit Dec 11 '19 at 10:43
  • im wanting to write a version of sorts of what you have but want to ensure it meets what im trying to achieve, the key in mine is it must be thread safe, and the records need to be synced acrross threads, ie, lets say the items had numbers, 1-1000, how would this ensure that threads dont repeat ranges... assuming you want to use skip and take..... assuming two threads can and will hit MoveNext, only way i can see it working without introducing locks to ensure ranges, is to pre calc the ranges, is this correct? – Seabizkit Dec 11 '19 at 10:59