-2

I have a folder where I will take text files (200-500mb -not very big, but its big text file) and I want to process this file in parallel. the file will have

"ComnanyTestIsert", "Firs Comment", "LA 132", "222-33-22", 1
"ComnanyTestIsert1", "Seconds Comment", "LA 132", "222-33-22", 1

for example, I use 2 such files. I don't quite understand when to use BufferedStream with parallel loop how to set the number of parallel operations? and how to make an insert correctly

static void Main(string[] args)
        {
            //Basic usage to help you get started:
            ProcessFileTaskItem(
                new string[] { "\\Insert.txt"
                                        , "\\Insert1.txt" }
                , "Data Source=(localdb)\\MSSQLLocalDB;Initial Catalog=test;Integrated Security=True;Connect Timeout=30;Encrypt=False;TrustServerCertificate=False;ApplicationIntent=ReadWrite;MultiSubnetFailover=False"
                , "test");
        }


        /// This will read an array of input files, process the lines in parallel, and upload
        /// everything into the database.
        public static void ProcessFileTaskItem(string[] SourceFiles, string DatabaseConnectionString, string DestinationTable)
        {
            //Make sure there's files to read
            if (SourceFiles != null && SourceFiles.Length > 0)
            {
                //Loop through the file array
                Parallel.For(0, SourceFiles.Length, x =>
                //for (int x = 0; x < SourceFiles.Length; x++)
                {
                    //Make sure the file exists and if so open it for reading.
                    if (File.Exists(SourceFiles[x]))
                    {
                        using (SqlConnection connectionDest = new SqlConnection(DatabaseConnectionString))
                        {
                            connectionDest.Open();

                            //Configure everything to upload to the database via bulk copy.
                            using (SqlBulkCopy sbc = new SqlBulkCopy(connectionDest, SqlBulkCopyOptions.TableLock, null))
                            {
                                //Configure the bulk copy settings
                                sbc.DestinationTableName = DestinationTable;
                                sbc.BulkCopyTimeout = 28800; //8 hours

                                //Now read and process the file
                                ProcessAllLinesInInputFile(SourceFiles[x], connectionDest, sbc);
                            }

                            connectionDest.Close();
                        }

                    }
                    
                } //for
                ); //End Parallel reading of files

                //Explicitly clean up before exiting
                Array.Clear(SourceFiles, 0, SourceFiles.Length);
            }
            
        } 

        /// Processes every line in the source input file.
        private static void ProcessAllLinesInInputFile(string SourceFiles, SqlConnection connectionDest, SqlBulkCopy sbc)
        {
            
            //Create a local data table. Should be the same name as the table
            //in the database you'll be uploading everything to.
            DataTable CurrentRecords = new DataTable("test");

            //The column names. They should match what's in the database table.
            string[] ColumnNames = new string[] { "Name", "Comment", "Address", "Phone", "IsActive" };

            
            using (FileStream fs = File.Open(SourceFiles, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
            using (BufferedStream bs = new BufferedStream(fs))
            using (StreamReader sr = new StreamReader(bs))
            {
                string s;
                while ((s = sr.ReadLine()) != null)
                {

                }
            }

            //Create the datatable with the column names.
            for (int x = 0; x < ColumnNames.Length; x++)
                CurrentRecords.Columns.Add(ColumnNames[x], typeof(string));


                //Now process each line in parallel.
                Parallel.For(0, SourceFiles, x =>
                {
                    List<object> values = null; //so each thread gets its own copy. 
                    


                }
        }
cickness
  • 737
  • 2
  • 6
  • 13

3 Answers3

4

Parallel.For automatically adjusts the number of threads used but it can be specified in the parallelOptions Parameter.

Do you have any reason to believe that doing this in parallel would improve performance? Multithreading will not magically make everything go faster. IO operations like this would typically benefit little, any from multithreading. Especially if you have spinning media where concurrent IO can drastically reduce the throughput, and even SSDs usually suffer from non sequential IO. Also, if you are concerned by performance, you should have measurements in place, so you can tell if you are actually improving things.

Unless otherwise specified, non-static framework methods are not threadsafe. So you should not try to read from the same stream from multiple threads. You could use multiple streams, but If you have sufficient memory I would suggest using File.ReadAllLines rather than ReadLine, I suspect that would be faster since it can read everything sequentially.

JonasH
  • 28,608
  • 2
  • 10
  • 23
1

Doing >1 bulk insert at the same time to the same table is not going to give you a good speedup, especially if you put SqlBulkCopyOptions.TableLock

A better strategy for speedup with minimal messing about on the DB side would be to combine your files into bigger bulk insert batches, or even just 1 batch. If you can fit the whole batch in memory then read all the files in parallel (pref using async methods, not parallel for), combine each together into your batch, and bulk insert the single batch.

You can make it upload in parallel, but you really need to do it to separate tables to make it work fast.

mikelegg
  • 1,197
  • 6
  • 10
  • how would you implement this in this context? –  cickness Dec 09 '20 at 09:33
  • 1
    Don't overthink it with parallel.for. Get it working without first. Step 1: loop through all the files and for each line, add to your batch. Step 2: upload the batch – mikelegg Dec 09 '20 at 09:39
0

The slow part of your process is the reading of the data in the file. Regularly your program would have to wait idly for the "hard disk" to provide a chunk of data. Instead of waiting idly, your program could already do some processing of the already fetched items.

Whenever you have a program where your process has to wait for some external process, like writing to a disk, querying data from a database management system, or fetching information from the internet, it is wise to consider to use async-await.

If you use async-await, then, whenever your process has to wait for some other process to finish, it won't wait idly, but will look around to see if it can do something else instead.

In your case, you could call an async function that async reads one file and async writes the read data to the database. If you start several of these Tasks, then whenever one of these Tasks has to await for the result from either the file reading or the database writing, it can look around to see if it can do anything for the other Tasks. So while it is waiting for a chunk of data from reading file X in Task A, it could already start writing data to the database in Task B.

As we are processing the file line by line, we need a function that returns an IEnumerable<string>, or the async equivalent: IAsyncEnumable<string>. See iterating with AsyncEnumerable

public async IAsyncEnumerable<string> ReadLines(string fileName)
{
    using (StreamReader reader = File.OpenText(fileName)
    {
        while(!reader.EndOfStream)
        {
            yield return await reader.ReadLineAsync().ConfigureAwait(false);
        }
    }
}

File.OpenText sadly only allows synchronous I/O; the async APIs are implemented poorly in that scenario. To open a true asynchronous file, you'd need to use one of the overloads of the FileStream constructors that have a Boolean parameter isAsync or FileOptions.Asynchronous.

Usage:

async Task DisplayFileContentsAsync(string fileName)
{
    await foreach(string line in ReadFileAsync(fileName))
    {
        Console.WriteLine(line);
    }
}

We also need a method that writes the read data to the database. I do it here line by line, if you want, you could change it such that is writes several lines at once.

async Task SaveInDbAsync(string line, string dbConnectionString)
{
    using (SqlConnection dbConnection = new SqlConnection(dbConnectionString))
    {
        // prepare the SQL command (consider using other methods)
        const string sqlCommandText = @"Insert into ...";
        var dbCommand = dbConnection.CreateCommand();
        dbCommand.CommandText = sqlCommandText;
        dbCommand.Parameters.Add(...)

        // async execute the dbCommand
        await dbConnection.OpenAsync();
        await dbCommand.ExecuteNonQueryAsync();
        // TODO: consider to use the return value to detect problems
    }
}

Put it all together: read one file and save the lines in the database:

async Task SaveFileInDbAsync(string fileName, string dbConnectionString)
{
    await foreach(string line in ReadFileAsync(fileName))
    {
        await SaveInDbAsync(line, dbConnectionString);
    }
}

To save all your files:

async Task SaveFilesInDbAsync(IEnumerable<string> fileNames, string dbConnectionString)
{
    // start all Tasks, do not await yet:
    List<Task> tasks = new List<Task>();
    foreach (string fileName in fileNames)
    {
        Task task = SaveFileInDbAsync(fileName, dbConnectionString);
        tasks.Add(task);
    }

    // now that all Tasks are started and happily reading files
    // and writing the read lines to the  database
    // await until all Tasks are finished.
    await Task.WhenAll(tasks);
}

Or if you need a synchronous version:

void SaveFilesInDb(IEnumerable<string> fileNames, string dbConnectionString)
{
    // start all Tasks, do not await yet:
    List<Task> tasks = new List<Task>();
    foreach (string fileName in fileNames)
    {
        Task task = SaveFileInDbAsync(fileName, dbConnectionString);
        tasks.Add(task);
    }

    // Wait until all Tasks are finished.
    Task.WaitAll(tasks);
}
Harald Coppoolse
  • 28,834
  • 7
  • 67
  • 116
  • thx. i use dapper (get list of models and await db.ExecuteAsync(sql, parameters) and see this result of time Time read: 00:00:00.0291921 and Time insert: 00:03:13.3398096 . in file only 20k line, what can you advise? –  cickness Dec 09 '20 at 12:46
  • Sometimes I think: do I have to tell them how to debug. Are you sure that the Time read is a `DateTime`, isn't it a `TimeSpan`? To investigate whether it comes from Dapper: What happens if you write the read data to a file? And what if you write it to a database using SQL and not Dapper? Test it in a small program, so you know that it has nothing to do with the async await part. – Harald Coppoolse Dec 09 '20 at 12:56
  • *"File.OpenText sadly only allows synchronous I/O; the async APIs are implemented poorly in that scenario."* <== (citation needed) – Theodor Zoulias Dec 09 '20 at 13:39
  • [I got it from Stephen Cleary here on stackoverflow](https://stackoverflow.com/questions/59043037/read-text-file-with-iasyncenumerable) – Harald Coppoolse Dec 09 '20 at 14:31