17

I had read this answer ADO.NET DataTable/DataRow Thread Safety, and can't understand some things. Particularly I can't understand [2] article. What kind of wrapper I need to use? Can anyone give an example?

Also I can't understand what author means talking about cascading lock and full lock. Please example too.

Community
  • 1
  • 1
Jonik
  • 1,208
  • 2
  • 12
  • 20
  • 2
    Frankly, if thread safety is a concern, the better approach here would be "stop using DataTable". What is it you are trying to do, and why do you feel DataTable is an appropriate solution? (hint: it very rarely is) – Marc Gravell Jan 23 '14 at 14:37
  • @MarcGravell The DataTable object is simple and comfortable way to fill the base with SqlBulkCopy I suppose. I use it to load parsed log into base. – Jonik Jan 24 '14 at 08:07
  • a: log loading is not a scenario that would require concurrency (the log should be prepared before passing to load), and b: all `SqlBulkCopy` needs is something that smells like an `IDataReader` - you can do that over any object model ([FastMember](https://www.nuget.org/packages/FastMember/) provides `ObjectReader`, for example, that even works with iterator blocks, aka `yield return`). – Marc Gravell Jan 24 '14 at 08:29
  • personally, I think that is still deeply inadviseable, but I will comment on your specific scenario in my answer in an edit – Marc Gravell Jan 24 '14 at 09:43

3 Answers3

30

DataTable is simply not designed or intended for concurrent usage (in particular where there is any form of mutation involved). The advisable "wrapper" here would, in my view, be either:

  • remove the need to work on the DataTable concurrently (when involving mutation), or:
  • remove the DataTable, instead using a data-structure that either directly supports what you need (for example a concurrent collection), or which is much simpler and can be trivially synchronized (either exclusive or reader/writer)

Basically: change the problem.


From comments:

The code looks like:

Parallel.ForEach(strings, str=>
{
    DataRow row;
    lock(table){
        row= table.NewRow();
    }
    MyParser.Parse(str, out row);
    lock(table){
        table.Rows.Add(row)
    }
});

I can only hope that out row is a typo here, as that won't actually lead to it populating the row created via NewRow(), but: if you absolutely have to use that approach, you can't use NewRow, as the pending row is kinda shared. Your best bet would be:

Parallel.ForEach(strings, str=> {
    object[] values = MyParser.Parse(str);
    lock(table) {
        table.Rows.Add(values);
    }
});

The important change in the above is that the lock covers the entire new row process. Note that you will have no guarantee of order when using Parallel.ForEach like this, so it is important that the final order does not need to match exactly (which shouldn't be a problem if the data includes a time component).

However! I still think you are approaching this the wrong way: for parallelism to be relevant, it must be non-trivial data. If you have non-trivial data, you really don't want to have to buffer it all in memory. I strongly suggest doing something like the following, which will work fine on a single thread:

using(var bcp = new SqlBulkCopy())
using(var reader = ObjectReader.Create(ParseFile(path)))
{
    bcp.DestinationTable = "MyLog";
    bcp.WriteToServer(reader);    
}
...
static IEnumerable<LogRow> ParseFile(string path)
{
    using(var reader = File.OpenText(path))
    {
        string line;
        while((line = reader.ReadLine()) != null)
        {
            yield return new LogRow {
                // TODO: populate the row from line here
            };
        }
    }
}
...
public sealed class LogRow {
    /* define your schema here */
}

Advantages:

  • no buffering - this is a fully streaming operation (yield return does not put things into a list or similar)
  • for that reason, the rows can start streaming immediately without needing to wait for the entire file to be pre-processed first
  • no memory saturation issues
  • no threading complications / overheads
  • you get to preserve the original order (not usually critical, but nice)
  • you are only constrained by how fast you can read the original file, which is typically faster on a single thread than it is from multiple threads (contention on a single IO device is just overhead)
  • avoids all the overheads of DataTable, which is overkill here - because it is so flexible it has significant overheads
  • read (from the log file) and write (to the database) are now concurrent rather than sequential

I do a lot of things like ^^^ in my own work, and from experience it is usually at least twice as fast than populating a DataTable in memory first.


And finally - here's an example of an IEnumerable<T> implementation that accepts concurrent readers and writers without requiring everything to be buffered in memory - which would allow multiple threads to parse the data (calling Add and finally Close) with a single thread for SqlBulkCopy via the IEnumerable<T> API:

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Acts as a container for concurrent read/write flushing (for example, parsing a
/// file while concurrently uploading the contents); supports any number of concurrent
/// writers and readers, but note that each item will only be returned once (and once
/// fetched, is discarded). It is necessary to Close() the bucket after adding the last
/// of the data, otherwise any iterators will never finish
/// </summary>
class ThreadSafeBucket<T> : IEnumerable<T>
{
    private readonly Queue<T> queue = new Queue<T>();

    public void Add(T value)
    {
        lock (queue)
        {
            if (closed) // no more data once closed
                throw new InvalidOperationException("The bucket has been marked as closed");

            queue.Enqueue(value);
            if (queue.Count == 1)
            { // someone may be waiting for data
                Monitor.PulseAll(queue);
            }
        }
    }

    public void Close()
    {
        lock (queue)
        {
            closed = true;
            Monitor.PulseAll(queue);
        }
    }
    private bool closed;

    public IEnumerator<T> GetEnumerator()
    {
        while (true)
        {
            T value;
            lock (queue)
            {
                if (queue.Count == 0)
                {
                    // no data; should we expect any?
                    if (closed) yield break; // nothing more ever coming

                    // else wait to be woken, and redo from start
                    Monitor.Wait(queue);
                    continue;
                }
                value = queue.Dequeue();
            }
            // yield it **outside** of the lock
            yield return value;
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

static class Program
{
    static void Main()
    {
        var bucket = new ThreadSafeBucket<int>();
        int expectedTotal = 0;
        ThreadPool.QueueUserWorkItem(delegate
        {
            int count = 0, sum = 0;
            foreach(var item in bucket)
            {
                count++;
                sum += item;
                if ((count % 100) == 0)
                    Console.WriteLine("After {0}: {1}", count, sum);
            }
            Console.WriteLine("Total over {0}: {1}", count, sum);
        });
        Parallel.For(0, 5000,
            new ParallelOptions { MaxDegreeOfParallelism = 3 },
            i => {
                bucket.Add(i);
                Interlocked.Add(ref expectedTotal, i);
            }
        );
        Console.WriteLine("all data added; closing bucket");
        bucket.Close();
        Thread.Sleep(100);
        Console.WriteLine("expecting total: {0}",
            Interlocked.CompareExchange(ref expectedTotal, 0, 0));
        Console.ReadLine();


    }

}
Marc Gravell
  • 1,026,079
  • 266
  • 2,566
  • 2,900
  • I have large log file and use parallelism while parsing. But I need some container which would collect parsed data before load to database. The code in common looks like: `Parallel.ForEach(strings, str=> { DataRow row; lock(table){ row= table.NewRow(); } MyParser.Parse(str, out row); lock(table){ table.Rows.Add(row) } });` Can you give some right solution for this. Or some links for reading to solve it in a right way. Excuse my bad English. – Jonik Jan 24 '14 at 09:53
  • @Jonik "and use parallelism while parsing" - I genuinely do not think that is helping you any – Marc Gravell Jan 24 '14 at 09:57
  • @Jonik note, `ObjectReader` in the above is from [FastMember](https://www.nuget.org/packages/FastMember/) – Marc Gravell Jan 24 '14 at 10:00
  • @Jonik then the middle code block ("Your best bet would be:") might be your limit - although personally I must say I'd probably be tempted to make a custom thread-safe semi-collection that allowed concurrent/parallel queue and dequeue-with-block. I can probably whip up an example if you really want. – Marc Gravell Jan 24 '14 at 12:41
  • Thank you very much. Is a good experience for me, I'll remember that. But in my case I need parallelism, because parsing function very heavy, it use Regular Expressions, parallelism gives a good time gain in my case. – Jonik Jan 24 '14 at 12:42
  • Jonik then the middle code block ("Your best bet would be:") might be your limit
    My real code is near the same.
    I can probably whip up an example if you really want.
    If you really do that - it would be great experience for me and for all who read it I suppose.
    – Jonik Jan 24 '14 at 12:49
  • @MarcGravell, I'm sure it's bad form to comment on such an old post; but I'm confused how reading from the file and writing to the database are concurrent. I imagine that `WriteToServer` iterates through the collection passed by `ParseFile`. Doesn't this mean that when it writes `BatchSize` rows it has to first read `BatchSize` rows from the file; thus reading and writing are never happening at the same time? To me the only way this can be concurrent is if the file is being read from independent of the writes thus violating your first and third bullet points under "Advantages". – philomathic_life Jun 13 '18 at 20:46
  • 3
    @basketballfan22 they are separate devices; if you do all the reads followed by all the writes, then there is never any overlap - but: if you read *some* data, then start writing... then while you're waiting on disk IO, network IO is quietly buffering, meaning that when you go back to get more data, there's a very good chance that it is already there; likewise, with disk write caches, when you exhaust the DB/NIC input buffer and *actually* wait for more, the disk will be flushing itself. It all works out nicely - even *before* we start adding niceties like `async`. – Marc Gravell Jun 13 '18 at 20:57
  • @MarcGravell, I was looking at the source code for `StreamReader`, `FileStream`, and `Win32Native`; and I still can't see how we're able to write to the network buffer while waiting on disk I/O. When a `StreamReader` object exhausts its internal buffer, it invokes the underlying `FileStream` object to read the number of bytes that the `StreamReader`'s buffer is equivalent to. The `FileStream` object then invokes `Win32Native`'s `ReadFile` function which belongs to `kernel32.dll`. All of these calls appear to be sequential. What am I missing? – philomathic_life Jul 24 '18 at 22:58
  • 3
    @basketballfan22 "OS pre-fetch" – Marc Gravell Jul 25 '18 at 08:19
2

Faced with the same problem, I decided to implement nested ConcurrentDictionaries

It is generic, but could be changed to use defined types instead. Example method to convert to DataTable included

/// <summary>
/// A thread safe data table
/// </summary>
/// <typeparam name="TX">The X axis type</typeparam>
/// <typeparam name="TY">The Y axis type</typeparam>
/// <typeparam name="TZ">The value type</typeparam>
public class HeatMap<TX,TY,TZ>
{
    public ConcurrentDictionary<TX, ConcurrentDictionary<TY, TZ>> Table { get; set; } = new ConcurrentDictionary<TX, ConcurrentDictionary<TY, TZ>>();

    public void SetValue(TX x, TY y, TZ val)
    {
        var row = Table.GetOrAdd(x, u => new ConcurrentDictionary<TY, TZ>());

        row.AddOrUpdate(y, v => val,
            (ty, v) => val);
    }

    public TZ GetValue(TX x, TY y)
    {
        var row = Table.GetOrAdd(x, u => new ConcurrentDictionary<TY, TZ>());

        if (!row.TryGetValue(y, out TZ val))
            return default;

        return val;

    }

    public DataTable GetDataTable()
    {
        var dataTable = new DataTable();

        dataTable.Columns.Add("");

        var columnList = new List<string>();
        foreach (var row in Table)
        {
            foreach (var valueKey in row.Value.Keys)
            {
                var columnName = valueKey.ToString();
                if (!columnList.Contains(columnName))
                    columnList.Add(columnName);
            }
        }

        foreach (var s in columnList)
            dataTable.Columns.Add(s);

        foreach (var row in Table)
        {
            var dataRow = dataTable.NewRow();
            dataRow[0] = row.Key.ToString();
            foreach (var column in row.Value)
            {
                dataRow[column.Key.ToString()] = column.Value;
            }

            dataTable.Rows.Add(dataRow);
        }

        return dataTable;
    }
}
Johan Sonesson
  • 401
  • 4
  • 4
1

Introduction

If the concurrency or parallelism is a requirement for the DataTable object program, is possible to make this. Let's view two examples (basically, we will see the use of AsEnumerable() method as common in all examples):

1-Parallel iteraction on DataTable:

The .NET provide native resources to iterate in Parallel on DataTable, as the code bellow:

DataTable dt = new DataTable();
dt.Columns.Add("ID");
dt.Columns.Add("NAME");

dt.Rows.Add(1, "One");
dt.Rows.Add(2, "Two");
dt.Rows.Add(3, "Three");
dt.PrimaryKey = new DataColumn[] { dt1.Columns["ID"] };

Parallel.ForEach(dt.AsEnumerable(), row =>
{
    int rowId = int.Parse(row["ID"]);
    string rowName = row["NAME"].ToString();
    //TO DO the routine that useful for each DataRow object...
});

2-Adding multiples items to DataTable:

I think this is the non-trivial approach because the core of DataTable is not a thread-safe collection/matrix; then, you need the support of ConcurrentBag to guarantees to not break an Exception on your code.

At the "ConcurrentBag - Add Multiple Items?", I write a detailed example with add items from DataTable objects to ConcurrentBag derived class, considering the programming needs to use concurrency on DataTables. Then, it's possible to convert the ConcurrentBag collection to DataTable after making the program business rule additions on ConcurrentBag enjoying the parallelism resources.

Pankwood
  • 1,799
  • 5
  • 24
  • 43
Antonio Leonardo
  • 1,805
  • 1
  • 8
  • 18
  • Thanks for this answer. It was really helpful as I needed to process `DataRow` object in multi threading mode. – vibs2006 Sep 20 '22 at 16:29