1

I wrote a method to download data from the internet and save it to my database. I wrote this using PLINQ to take advantage of my multi-core processor and because it is downloading thousands of different files in a very short period of time. I have added comments below in my code to show where it stops but the program just sits there and after awhile, I get an out of memory exception. This being my first time using TPL and PLINQ, I'm extremely confused so I could really use some advice on what to do to fix this.

UPDATE: I found out that I was getting a webexception constantly because the webclient was timing out. I fixed this by increasing the max amount of connections according to this answer here. I was then getting exceptions for the connection not opening and I fixed it by using this answer here. I'm now getting connection timeout errors for the database even though it is a local sql server. I still haven't been able to get any of my code to run so I could totally use some advice

static void Main(string[] args)
    {
        try
        {
            while (true)
            {
                // start the download process for market info
                startDownload();
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
            Console.WriteLine(ex.StackTrace);
        }
    }

    public static void startDownload()
    {
        DateTime currentDay = DateTime.Now;
        List<Task> taskList = new List<Task>();

        if (Helper.holidays.Contains(currentDay) == false)
        {
            List<string> markets = new List<string>() { "amex", "nasdaq", "nyse", "global" };

            Parallel.ForEach(markets, market =>
            {
                Downloads.startInitialMarketSymbolsDownload(market);
            }
            );

            Console.WriteLine("All downloads finished!");
        }

        // wait 24 hours before you do this again
        Task.Delay(TimeSpan.FromHours(24)).Wait();
    }

public static void startInitialMarketSymbolsDownload(string market)
    {
        try
        {
            List<string> symbolList = new List<string>();
            symbolList = Helper.getStockSymbols(market);

            var historicalGroups = symbolList.AsParallel().Select((x, i) => new { x, i })
                      .GroupBy(x => x.i / 100)
                      .Select(g => g.Select(x => x.x).ToArray());

            historicalGroups.AsParallel().ForAll(g => getHistoricalStockData(g, market));
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
            Console.WriteLine(ex.StackTrace);
        }
    }

public static void getHistoricalStockData(string[] symbols, string market)
    {
        // download data for list of symbols and then upload to db tables
        Uri uri;
        string url, line;
        decimal open = 0, high = 0, low = 0, close = 0, adjClose = 0;
        DateTime date;
        Int64 volume = 0;
        string[] lineArray;
        List<string> symbolError = new List<string>();
        Dictionary<string, string> badNameError = new Dictionary<string, string>();

        Parallel.ForEach(symbols, symbol =>
                {
                        url = "http://ichart.finance.yahoo.com/table.csv?s=" + symbol + "&a=00&b=1&c=1900&d=" + (DateTime.Now.Month - 1) + "&e=" + DateTime.Now.Day + "&f=" + DateTime.Now.Year + "&g=d&ignore=.csv";
                        uri = new Uri(url);

                        using (dbEntities entity = new dbEntities())
                        using (WebClient client = new WebClient())
                        using (Stream stream = client.OpenRead(uri))
                        using (StreamReader reader = new StreamReader(stream))
                        {
                            while (reader.EndOfStream == false)
                            {
                                line = reader.ReadLine();
                                lineArray = line.Split(',');

                                // if it isn't the very first line
                                if (lineArray[0] != "Date")
                                {
                                    // set the data for each array here
                                    date = Helper.parseDateTime(lineArray[0]);
                                    open = Helper.parseDecimal(lineArray[1]);
                                    high = Helper.parseDecimal(lineArray[2]);
                                    low = Helper.parseDecimal(lineArray[3]);
                                    close = Helper.parseDecimal(lineArray[4]);
                                    volume = Helper.parseInt(lineArray[5]);
                                    adjClose = Helper.parseDecimal(lineArray[6]);

                                    switch (market)
                                    {
                                        case "nasdaq":
                                            DailyNasdaqData nasdaqData = new DailyNasdaqData();
                                            var nasdaqQuery = from r in entity.DailyNasdaqDatas.AsParallel().AsEnumerable()
                                                              where r.Date == date
                                                              select new StockData { Close = r.AdjustedClose };

                                            List<StockData> nasdaqResult = nasdaqQuery.AsParallel().ToList(); // hits this line
                                            break;
                                        default:
                                            break;
                                    }
                                }
                            }

                            // now save everything
                            entity.SaveChanges();
                        }
                }
        );
    }
Community
  • 1
  • 1
DarthVegan
  • 1,719
  • 7
  • 25
  • 42
  • Too much code. Especially delete the code that you marked as never hit. – usr Dec 03 '14 at 10:22
  • @usr I deleted some code to make it easier to read but I can't delete anymore. I definitely can't delete the code that never gets hit because that is the whole focus of my issue – DarthVegan Dec 03 '14 at 10:31

1 Answers1

2

Async lambdas work like async methods in one regard: They do not complete synchronously but they return a Task. In your parallel loop you are simply generating tasks as fast as you can. Those tasks hold onto memory and other resources such as DB connections.

The simplest fix is probably to just use synchronous database commits. This will not result in a loss of throughput because the database cannot deal with high amounts of concurrent DML anyway.

usr
  • 168,620
  • 35
  • 240
  • 369
  • What you say makes sense and I made those changes but it is taking 10 minutes for a breakpoint at the if (linearray[0] line to be hit. I really feel like I'm missing something. I'm going to edit to include the code from the moment the program starts until it calls this method to see if maybe I'm screwing something up somewhere @usr – DarthVegan Dec 04 '14 at 05:11
  • One strange thing I noticed is that the only line of code it seems to process is when it gets a web exception for if the url is invalid. I can't get it to hit any other line of code in that method... – DarthVegan Dec 04 '14 at 05:44
  • Configure all parallelism primitives to use a MaxDOP of 1 so that can debug this in a single-threaded fashion. 3 nested parallel loops is a lot of nesting. This can cause a lot of threads which can overwhelm some IO subsystems such as the DB (pool exhaustion) and web (connection and server limits). – usr Dec 04 '14 at 09:01
  • I'm trying to search online for a guide on how to do this in C# but I can't seem to find anything. Is this a setting that I can only do in sql server 2008 directly? Do you have any example C# code? – DarthVegan Dec 04 '14 at 09:09
  • https://www.google.com/webhp?complete=1&hl=en&gws_rd=ssl#complete=1&hl=en&q=Parallel.ForEach+limit+degree+of+parallelism there you go. – usr Dec 04 '14 at 09:21
  • I actually tried that option before because I read it on one of the answers here for a similar issue but it didn't fix the problem. I didn't realize that you were talking about the same parallel options – DarthVegan Dec 04 '14 at 09:42
  • Correction it took around 15 minutes but it finally got to the breakpoint for the first time. Any suggestions for how to get this to run better? – DarthVegan Dec 04 '14 at 09:53
  • I'd need to take the time to understand the code better (which I will not do). Pause the debugger 10 times under load to see where it stops most. That's the hot part of your code that must be optimized. – usr Dec 04 '14 at 10:12