0

I am using threads to upload images on a FTP. Now I have a problem in limiting the number of threads. when I am creating same number of threads equal to images then it's fine i.e. it is working fine. But now I want to create only suppose maximum of 5 number of threads to upload 100 or more images. I have a datatable in which these 100 images are with a unique field ID which stores suppose 0,1,2,3....and so on for every images. Now I want to start only five threads once so that it may start uploading 5 images parallely. On a Timer, I am checking the status of threads and if I found a thread which is not live now, I want to assign it the 6th Image for uploading and in the same way, if I found other thread which finished its uploading/work, I want to give it 7th image to upload and so on. i.e. this process will run until 100 images are uploaded. Can you please suggest me a structure by using which I may achieve this? Currently I am creating 100 threads for 100 images and it is working perfect. But I am afraid of creating that much number of threads. Will that affect performance?

My Current Code is:

// A page level variable
Thread [] tr=null;
//On Load of the Control
 tr = new Thread[dt.Rows.Count];
        //tr = new Thread[MaxID];
        for (int i = 0; i < dt.Rows.Count; i++)
        //for (int i = 0; i < MaxID; i++)
        {
            tr[i] = new Thread(new ThreadStart(ProcessItems));
            tr[i].Name = Convert.ToString(dt.Rows[i]["Id"]);
            tr[i].IsBackground = true;
        }

        //Start each thread
        foreach (Thread x in tr)
        {
            x.Start();
        }

    //The method which is used to upload images
            public object tLock = new object();
    private void ProcessItems()
    {
        //if (dict.Count == 0)
        //    pthread.Suspend();
        //ArrayList toRemove = new ArrayList();
        lock (tLock)
        {
            try
            {
                //int NoofAttempts = 0;
                //foreach (DictionaryEntry e in dict)
                //{
                    //Thread.Sleep(500);
                  dr = dt.Select("Is_Uploaded=0 And Id=" + Thread.CurrentThread.Name).FirstOrDefault();

                    uxImageAndProgress pbCtl = panelControl1.Controls[dr["Image_ID"].ToString()] as uxImageAndProgress;
                    //NoofAttempts = 0;
                    string Path = "";
                    if (ftpPath == "")
                    {
                        Path = Global.FTPRemotePath + "/ProductImages/" + dr["Image_ID"] + dr["Extension"].ToString();
                    }
                    else
                    {
                        Path = ftpPath + dr["Image_ID"] + dr["Extension"].ToString();
                    }
                    //object[] loader = e.Value as object[];
                    int length = (int)(dr["ActualData"] as byte[]).Length;
                    Stream stream = new MemoryStream(dr["ActualData"] as byte[]);
                    byte[] rBuffer = ReadToEnd(stream);
                    int d = length - (int)stream.Length;
                    d = Math.Min(d, rnd.Next(10) + 1);

                    if (ftpRequest == null)
                    {
                        try
                        {
                            #region New Code
                            ftpRequest = (FtpWebRequest)FtpWebRequest.Create(new Uri(Path));
                            ftpRequest.Method = WebRequestMethods.Ftp.UploadFile;
                            ftpRequest.Credentials = new NetworkCredential(Global.FTPLogIn, Global.FTPPassword);
                            ftpRequest.UsePassive = true;
                            ftpRequest.UseBinary = true;
                            ftpRequest.KeepAlive = true;
                            ftpRequest.Timeout = 20000;
                            ftpRequest.ContentLength = length;
                            byte[] buffer = new byte[length > 4097 ? 4097 : length];
                            int bytes = 0;
                            int total_bytes = (int)length;
                            System.IO.Stream rs = ftpRequest.GetRequestStream();
                            while (total_bytes > 0)
                            {
                                bytes = stream.Read(buffer, 0, buffer.Length);
                                rs.Write(buffer, 0, bytes);
                                total_bytes = total_bytes - bytes;
                            }
                            dr["Is_Uploaded"] = 1;
                            dt.AcceptChanges();
                            ftpRequest = null;
                            pbCtl.Is_Uploaded = true;
                            rs.Close();
                            #endregion
                        }
                        catch (Exception eeex)
                        {
                            ftpRequest = null;
                            if (ErrorText == "")
                                ErrorText = eeex.Message.ToString();
                            else
                                ErrorText = ErrorText + "," + eeex.Message.ToString();

                            if (Image_IDsToDelete == "")
                                Image_IDsToDelete = dr["Image_ID"].ToString();
                            else
                                Image_IDsToDelete = Image_IDsToDelete + "," + dr["Image_ID"].ToString();

                            if (NotUploadedFiles == "")
                                NotUploadedFiles = Convert.ToString(dr["FileName"]);//dr["Image_ID"] + dr["Extension"].ToString();
                            else
                                NotUploadedFiles = NotUploadedFiles + ", " + Convert.ToString(dr["FileName"]); 

                            dr["Is_Uploaded"] = true;
                            dt.AcceptChanges();
                            ftpRequest = null;
                            pbCtl.Is_Uploaded = true;
                            pbCtl.Is_WithError = true;
                        }
                    }
            }
            catch (Exception ex)
            {
                XtraMessageBox.Show(ex.Message.ToString(), Global.Header, MessageBoxButtons.OK);
                //pthread.Suspend();
            }
        }
    }
//The Timer Event on which I am checking the Status of threads and taking appropriate action
        private void timer1_Tick(object sender, EventArgs e)
    {
        bool Is_AllFinished=true;
        //Start each thread
        foreach (Thread x in tr)
        {
            if (x.IsAlive == true)
            {
                Is_AllFinished = false;
                break;
            }
            else
            {
                //DataRow[] drs = dt.Select("Is_Uploaded=0");
                //if (drs.Count() > 0)
                //{
                    //x. = Convert.ToString(MaxID + 1);
                    //x.Start();
                    //MaxID = MaxID + 1;
                //}
            }
        }

        if (Is_AllFinished == true)
        {
            timer1.Enabled = false;
            if (Image_IDsToDelete != "")
            {
                RetailHelper.ExecuteNonQuery("Delete from images where Image_ID in (" + Image_IDsToDelete + ")");
            }

            if (ErrorText != "")
            {
                NotUploadedFiles = NotUploadedFiles + ".";
                XtraMessageBox.Show("Unable to connect to server. The following files were not uploaded:" + System.Environment.NewLine + NotUploadedFiles + ".", Global.Header, MessageBoxButtons.OK, MessageBoxIcon.Information);
            }

            Is_Done = true;

        }
    }

Now, I want to convert this code to use a fixed number of threads. Please help me. Thanking you!

vicky
  • 23
  • 1
  • 1
  • 9
  • Consider [Parallel.ForEach](http://msdn.microsoft.com/en-us/library/dd460720(v=vs.110).aspx) – Bas Jul 15 '14 at 07:29

2 Answers2

1

Use a Semaphore it is good enough. You can polish the code yourself.

    const int maxThreads = 5;
    Semaphore sm = new Semaphore(maxThreads, maxThreads); // maximum concurrent threads
    for (int i = 0; i < dt.Rows.Count; i++)
    {
        try
        {
            sm.WaitOne();

            Thread tr = new Thread(new ThreadStart(ProcessItems));
            tr.Name = Convert.ToString(dt.Rows[i]["Id"]);
            tr.IsBackground = true;

            tr.Start();
         }
         finally
         {
             sm.Release();
         }
    }


// You don't need the timer anymore
// Wait for the semaphore to be completely released

        for (int i=0; i<maxThreads ; i++)
            sm.WaitOne();

        sm.Release(maxThreads);


        if (Image_IDsToDelete != "")
        {
            RetailHelper.ExecuteNonQuery("Delete from images where Image_ID in (" + Image_IDsToDelete + ")");
        }

        if (ErrorText != "")
        {
            NotUploadedFiles = NotUploadedFiles + ".";
            XtraMessageBox.Show("Unable to connect to server. The following files were not uploaded:" + System.Environment.NewLine + NotUploadedFiles + ".", Global.Header, MessageBoxButtons.OK, MessageBoxIcon.Information);
        }


//The method which is used to upload images
private void ProcessItems()
{
    //if (dict.Count == 0)
    //    pthread.Suspend();
    //ArrayList toRemove = new ArrayList();
    try
    {
        sm.WaitOne(); 
        try
        {
            //int NoofAttempts = 0;
            //foreach (DictionaryEntry e in dict)
            //{
                //Thread.Sleep(500);
              dr = dt.Select("Is_Uploaded=0 And Id=" + Thread.CurrentThread.Name).FirstOrDefault();

                uxImageAndProgress pbCtl = panelControl1.Controls[dr["Image_ID"].ToString()] as uxImageAndProgress;
                //NoofAttempts = 0;
                string Path = "";
                if (ftpPath == "")
                {
                    Path = Global.FTPRemotePath + "/ProductImages/" + dr["Image_ID"] + dr["Extension"].ToString();
                }
                else
                {
                    Path = ftpPath + dr["Image_ID"] + dr["Extension"].ToString();
                }
                //object[] loader = e.Value as object[];
                int length = (int)(dr["ActualData"] as byte[]).Length;
                Stream stream = new MemoryStream(dr["ActualData"] as byte[]);
                byte[] rBuffer = ReadToEnd(stream);
                int d = length - (int)stream.Length;
                d = Math.Min(d, rnd.Next(10) + 1);

                if (ftpRequest == null)
                {
                    try
                    {
                        #region New Code
                        ftpRequest = (FtpWebRequest)FtpWebRequest.Create(new Uri(Path));
                        ftpRequest.Method = WebRequestMethods.Ftp.UploadFile;
                        ftpRequest.Credentials = new NetworkCredential(Global.FTPLogIn, Global.FTPPassword);
                        ftpRequest.UsePassive = true;
                        ftpRequest.UseBinary = true;
                        ftpRequest.KeepAlive = true;
                        ftpRequest.Timeout = 20000;
                        ftpRequest.ContentLength = length;
                        byte[] buffer = new byte[length > 4097 ? 4097 : length];
                        int bytes = 0;
                        int total_bytes = (int)length;
                        System.IO.Stream rs = ftpRequest.GetRequestStream();
                        while (total_bytes > 0)
                        {
                            bytes = stream.Read(buffer, 0, buffer.Length);
                            rs.Write(buffer, 0, bytes);
                            total_bytes = total_bytes - bytes;
                        }
                        dr["Is_Uploaded"] = 1;
                        dt.AcceptChanges();
                        ftpRequest = null;
                        pbCtl.Is_Uploaded = true;
                        rs.Close();
                        #endregion
                    }
                    catch (Exception eeex)
                    {
                        ftpRequest = null;
                        if (ErrorText == "")
                            ErrorText = eeex.Message.ToString();
                        else
                            ErrorText = ErrorText + "," + eeex.Message.ToString();

                        if (Image_IDsToDelete == "")
                            Image_IDsToDelete = dr["Image_ID"].ToString();
                        else
                            Image_IDsToDelete = Image_IDsToDelete + "," + dr["Image_ID"].ToString();

                        if (NotUploadedFiles == "")
                            NotUploadedFiles = Convert.ToString(dr["FileName"]);//dr["Image_ID"] + dr["Extension"].ToString();
                        else
                            NotUploadedFiles = NotUploadedFiles + ", " + Convert.ToString(dr["FileName"]); 

                        dr["Is_Uploaded"] = true;
                        dt.AcceptChanges();
                        ftpRequest = null;
                        pbCtl.Is_Uploaded = true;
                        pbCtl.Is_WithError = true;
                    }
                }
        }
        catch (Exception ex)
        {
            XtraMessageBox.Show(ex.Message.ToString(), Global.Header, MessageBoxButtons.OK);
            //pthread.Suspend();
        }
    }
    finally
    {
         sm.Release();
    }
}
Alireza
  • 4,976
  • 1
  • 23
  • 36
  • Hi @Alireza, Sorry to say that the above code with Semaphore is not working. It only uploaded a single Image out of my 11 selected Images. Also it didn't threw any error message of not uploaded. My previous code was working fine in which I am just creating threads equal to images. – vicky Jul 15 '14 at 09:59
  • Maybe it is because I removed the `lock` from the start of your method. put it back and it should work. Anyway why using threads when you put a lock on the whole operation. It actually won't run in parallel. – Alireza Jul 15 '14 at 10:04
  • No, it's running parallel. As previously when I was using a single background thread, it was taking much time to upload even 10 images but after I used multiple threads i.e. 10 threads created for 10 images even by using that Lock object, it is taking less time and I can see the uploading status for every images. I have created a dynamic interface in which there is a progress bar appear with every images and it displays the uploading status too. – vicky Jul 15 '14 at 10:17
  • Can you please tell me how to know all images uploaded i.e. no any thread is working now if we don't use a timer where we have to check the status of those 5 threads created? so that I may display the error message if come for any image. – vicky Jul 15 '14 at 10:29
  • That code after the main `for` loops waits for the `Semaphore` to be completely released, then shows the results. I strongly recommend that you make your `ProcessItems` thread-safe or put the `lock` back. – Alireza Jul 15 '14 at 10:36
  • Yes I tested it by putting the Lock object there in ProcessItems method but still it didn't wait to complete all the uploadings of images and came to the error check code where it didn't have any error and passed the code. – vicky Jul 15 '14 at 12:00
  • It's strange because my simpler test code works fine. In my code, I replaced `ProcessItems` with a simple `Console.Writeln()`. Do you want that code? – Alireza Jul 15 '14 at 13:30
  • Yes if possible please mail me on: vicky@bememorable.co.uk. Thanks. – vicky Jul 16 '14 at 05:26
  • OK. And please delete your email address :) – Alireza Jul 16 '14 at 06:00
0

It sounds like a producer / consumer queue is the structure you are looking for. Take a look a this answer and the others in the thread for examples of how to employ it.

Community
  • 1
  • 1
Pete S
  • 265
  • 2
  • 11
  • Hi @Pete S, Thanks for your response. But if Possible can you please make it using my code. – vicky Jul 15 '14 at 07:56