0

I have a DB table with 7 columns and 10rows. Each row is provided an input parameter to a web api call, and the response returned by the api, is inserted into a table. My problem is, the Parallel.Foreach is not producing the same result as the regular ForEach.

Specifically, if 1st row has address as "123 Jump Street Arizona Us", I get a response from web api with the standardized address as "123 Jump Street Arizona USA", like that I have 10 different rows with 10 different input address. However, the output response I get from Parallel.Foreach is for the same address repeated 5 times. And the next time i run it, it is a different result altogether

Could someone please point out why this is happening and the potential solution?

Here is my code:

public void Main()
{
    // TODO: Add your code here

    string query = "SELECT  ADDR_LINE_ONE,ADDR_LINE_TWO,ADDR_LINE_THREE,COUNTRY,PROVINCE,CITY_NAME,POSTAL_CODE FROM Addresstestpoc";

    try
    {
        using (OleDbConnection connection = new OleDbConnection(conn))
        {
            OleDbCommand command = new OleDbCommand(query, connection);

            connection.Open();
            OleDbDataReader reader = command.ExecuteReader();
            int i = reader.FieldCount;
            bool b = reader.HasRows;
            Parallel.ForEach(GetFromReader(reader), record =>
                {                                                                      
                    //AddrOne = record[0].ToString();                                                            
                    string AddrOne = record.GetString(0);
                    string AddrTwo = record.GetString(1);
                    string AddrThree = record.GetString(2);                                                                                  
                    string Country = record.GetString(3);                                                          
                    string Province = record.GetString(4);                                                                                  
                    string City = record.GetString(5);                                                                                  
                    string PostalCode = record.GetString(6);                                                                                 
                    string Sender = "G";                                                                               
                    //sqlk = (string)Dts.Variables["User::sqlconn"].Value;                                                                               
                    standardizeAddressReturn result;                                                                                
                    string data = string.Empty;                                                                                
                    string queri;


                    MDMStandardizeAddressService web = new MDMStandardizeAddressService();

                    try
                    {                                                                                   
                        result = web.standardizeAddress(AddrOne, AddrTwo, AddrThree, City, Province, PostalCode, Country, Sender);

                        data = SerializeToXml(result);                                                                                    
                        queri = "insert into [CPM].[dbo].[AddressResponsetest_new] values ('" + data + "')";                                                             
                        //MessageBox.Show(data);                                                         
                        insertintosql(queri);                                                                                
                    }                                                    
                    catch (Exception ex)
                    {                                                                            
                        MessageBox.Show(ex.Message);
                    }
               });

        }
    }
    catch (Exception ex)
    {
        string msg = ex.Message;
    }

}

IEnumerable<IDataRecord> GetFromReader(IDataReader reader)
{
    while (reader.Read()) yield return reader;
}
sab
  • 338
  • 1
  • 3
  • 21
  • `IDataReader`, according to the [docs](https://msdn.microsoft.com/en-us/library/system.data.idatareader(v=vs.110).aspx), *Provides a means of reading one or more forward-only streams of result sets obtained by executing a command at a data source..* And since your `GetFromReader()` returns **the reader itself** as one thread advances the reader every thread will see the reader pointing to the most recent record. You need to convert each row to a POCO inside the enumerator. – dbc May 01 '18 at 15:09
  • 1
    Also, [always use parameterized SQL](https://stackoverflow.com/q/7505808/3744182). – dbc May 01 '18 at 15:12
  • What are the **actual** and **expected** results? – Jimenemex May 01 '18 at 15:16
  • @dbc, I have no idea about POCO, could you please explain how to acheive that? – sab May 01 '18 at 16:26
  • @Jimenemex: if 1st row has address as "123 Jump Street Arizona Us", I get a response from web api with the standerdized address as "123 Jump Street Arizona USA", like that I have 10 different rows with 10 different input address, but the output response I get from parallel,foreach is for the same address repeated 5 times – sab May 01 '18 at 16:29
  • @sab - OK. But as an aside, you might be having an [XY problem](https://meta.stackexchange.com/questions/66377/what-is-the-xy-problem) here, and should consider a different approach, e.g. [Most efficient way to insert Rows into MySQL Database](https://stackoverflow.com/q/25323560) or [Fragmented XML Bulk Load to SQL Server in C#](https://stackoverflow.com/q/24679818) or [Any way to speed up this excel import?](https://stackoverflow.com/q/12050177). I'm not a database developer though so I can't really say for sure if there is a better way. – dbc May 01 '18 at 16:30
  • 1
    The sql string is crazy-vulnerable to sql injection attacks. – Joel Coehoorn May 01 '18 at 17:14
  • @JoelCoehoorn I will fix that, do you have a solution for the actual problem? – sab May 01 '18 at 23:35
  • Parameterized queries – Joel Coehoorn May 02 '18 at 00:13

1 Answers1

0

I think you need another approach. Command and DataReader are not thread safe. Even if DataReader was thread safe it is a forward only cursor so it is not going to be faster.

I recommend a producer consumer pattern (.e.g BlockingCollection). In the consumer is where you can parallel process the

MDMStandardizeAddressService web = new MDMStandardizeAddressService();
try
{

You could probably use tasks, await, asynch for producer consumer.

An easier approach might be to create a class for properties and put that in a List and then just Read that List. This will happen if a fraction of a second.

You can then parallel process the List.

Stub code:

public class WebMailer
{ 
    public void process()
    {
        List<Addr> Addrs = new List<Addr>();
        SqlCommand command = new SqlCommand();
        using (var reader = command.ExecuteReader())
        {
            using (SqlDataReader r = command.ExecuteReader())
            {
                Addrs.Add(new Addr(r.GetString(0), r.GetString(1)));
            }
        }
        foreach(Addr addr in Addrs)  // can use parallel here
        { }
    }
}
s
paparazzo
  • 44,497
  • 23
  • 105
  • 176