I have been put into the situation that i have to probe a relatively huge dataset through a chocked output which is an id based single record returning endpoint. Iterating this endpoint to scrape all the data sequentially is not obviously a practical option so I tried to create parallelized multiple processes each has its own range of ids to deal with. Here is what i managed to do which is still not working perfectly. "Refer to the questions at the bottom"
Code Preface ↴
In this demo I used http://numbersapi.com/. It is simple you put a number and the API returns a random fact about it from the facts associated with it, If there are any otherwise It returns randomized one of 4 phrases containing the facts-free number.
API Examples:
Number that has atleast one fact → http://numbersapi.com/1
Number that has no facts associated → http://numbersapi.com/123456789
Appendix
ThreadRange : Items to be processed by a single thread
Code ↴
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Security.Authentication;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
namespace CSharp
{
class Program
{
private static Stopwatch CrawlerStopWatch = new Stopwatch();
private static HttpClient HttpClientCrawler = null;
private static HttpClientHandler HttpClientCrawlerHttpsHandler = new HttpClientHandler() { SslProtocols = SslProtocols.Tls12 | SslProtocols.Tls11 | SslProtocols.Tls };
private static BlockingCollection<Thread> CrawlerThreads = new BlockingCollection<Thread>();
private static BlockingCollection<String> CrawlerFailuresBlockingCollection = new BlockingCollection<String>();
private static BlockingCollection<String> CrawlerHitsBlockingCollection = new BlockingCollection<String>();
private static BlockingCollection<String> CrawlerMissesBlockingCollection = new BlockingCollection<String>();
private static String BaseUrl = "http://numbersapi.com/";
private static String FullUrl = String.Concat(BaseUrl, "{0}");
private static long ThreadsCount = 1;
private static long ThreadRange = 10;
private static long Offset = 2;
private static long ItemsToProcess = ThreadsCount * ThreadRange;
private static int MaxUrlLength = String.Format(FullUrl, ItemsToProcess).Length;
private static ReaderWriterLockSlim CrawlerReaderWriterLockSlim = new ReaderWriterLockSlim();
private static String CrawlerResultantFileName = "z.CrawlerResult.txt";
public static void Main(String[] args)
{
CrawlerStopWatch.Start();
//### Managing HttpClient ###/
ServicePointManager.DefaultConnectionLimit = 50;
if (Regex.IsMatch(BaseUrl, @"^https.*$")) { HttpClientCrawler = new HttpClient(HttpClientCrawlerHttpsHandler); } else { HttpClientCrawler = new HttpClient(); }
HttpClientCrawler.BaseAddress = new Uri(BaseUrl);
HttpClientCrawler.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/html"));
//### Managing HttpClient ###/
InitiateCrawler(BaseUrl, ThreadsCount, ThreadRange, Offset);
}
//### Crawler Methods ###
public static void InitiateCrawler(String BaseUrl, long ThreadsCount, long ThreadRange, long Offset = 0)
{
Console.WriteLine("###############################");
Console.WriteLine($"Commenced crawling the <{BaseUrl}> endpoint and working on <{ItemsToProcess}> items by creating <{ThreadsCount}> threads which each working on <{ThreadRange}> items.");
while (CrawlerThreads.Count < ThreadsCount)
{
long Start = Offset + ThreadRange * CrawlerThreads.Count + 1;
long End = Offset + ThreadRange * (CrawlerThreads.Count + 1);
Thread CrawlerThread = new Thread(() => Crawl(Start, End));
CrawlerThreads.Add(CrawlerThread);
CrawlerThread.Start();
}
Task FinalizeCrawlerTask = Task.Run(() => { foreach (Thread CrawlerThread in CrawlerThreads) { CrawlerThread.Join(); } FinalizeCrawler(); });
FinalizeCrawlerTask.Wait();
}
public static void Crawl(long Start, long End)
{
long Current = Start;
while (Current <= End)
{
String CurrentUrlParamters = String.Format("{0}", Current);
String CurrentUrl = $"{HttpClientCrawler.BaseAddress.AbsoluteUri}{CurrentUrlParamters}";
String CurrentPageContent = "";
HttpResponseMessage HttpResponseMessage = HttpClientCrawler.GetAsync(CurrentUrlParamters).Result;
if (HttpResponseMessage.IsSuccessStatusCode)
{
CurrentPageContent = Encoding.UTF8.GetString(HttpResponseMessage.Content.ReadAsByteArrayAsync().Result);
if (isResultRelevant(CurrentPageContent)) { HandleCrawlerRelevantResult(CurrentUrl, CurrentPageContent); } else { HandleCrawlerIrrelevantResult(CurrentUrl, CurrentPageContent); }
}
else
{
HandleCrawlerFailure(CurrentUrl, HttpResponseMessage);
}
Current++;
}
}
public static void HandleCrawlerFailure(String Url, HttpResponseMessage HttpResponseMessage)
{
CrawlerFailuresBlockingCollection.Add(Url);
int ProcessedItems = CrawlerHitsBlockingCollection.Count + CrawlerMissesBlockingCollection.Count + CrawlerFailuresBlockingCollection.Count;
Console.WriteLine($"[Item #{ProcessedItems.ToString().PadRight(ItemsToProcess.ToString().Length)}] {Url.PadRight(MaxUrlLength)} returned {(int)HttpResponseMessage.StatusCode} Code | {HttpResponseMessage.ReasonPhrase}");
}
public static Boolean isResultRelevant(String Content)
{
Boolean IsRelevant = true;
String[] RegularExpressionsArray = new string[]
{
@"^[\d]+ is a boring number\.$",
@"^[\d]+ is an uninteresting number\.$",
@"^[\d]+ is an unremarkable number\.$",
@"^[\d]+ is a number for which we're missing a fact (submit one to numbersapi at google mail!)\.$",
};
foreach (String RegularExpression in RegularExpressionsArray) { if (Regex.IsMatch(Content, RegularExpression)) { IsRelevant = false; break; } }
return IsRelevant;
}
public static void HandleCrawlerRelevantResult(String Url, String Content)
{
CrawlerResultantFileWriteLine(Url);
CrawlerHitsBlockingCollection.Add(Url);
int ProcessedItems = CrawlerHitsBlockingCollection.Count + CrawlerMissesBlockingCollection.Count + CrawlerFailuresBlockingCollection.Count;
Console.WriteLine($"[Item #{ProcessedItems.ToString().PadRight(ItemsToProcess.ToString().Length)}] {Url.PadRight(MaxUrlLength)} is relevant");
}
public static void HandleCrawlerIrrelevantResult(String Url, String Content)
{
CrawlerMissesBlockingCollection.Add(Url);
int ProcessedItems = CrawlerHitsBlockingCollection.Count + CrawlerMissesBlockingCollection.Count + CrawlerFailuresBlockingCollection.Count;
Console.WriteLine($"[Item #{ProcessedItems.ToString().PadRight(ItemsToProcess.ToString().Length)}] {Url.PadRight(MaxUrlLength)} is irrelevant");
}
public static void FinalizeCrawler()
{
CrawlerStopWatch.Stop();
TimeSpan TimeSpan = TimeSpan.FromMilliseconds(CrawlerStopWatch.ElapsedMilliseconds);
String TimeLapseInformation = String.Format("{0:D2}h:{1:D2}m:{2:D2}s:{3:D3}ms",
TimeSpan.Hours,
TimeSpan.Minutes,
TimeSpan.Seconds,
TimeSpan.Milliseconds);
Console.WriteLine($"Crawling finished in {TimeLapseInformation}.");
Console.WriteLine($"<{CrawlerFailuresBlockingCollection.Count + CrawlerHitsBlockingCollection.Count + CrawlerMissesBlockingCollection.Count}> out of <{ItemsToProcess}> items have been crawled having <{CrawlerHitsBlockingCollection.Count}> relevant items, <{CrawlerMissesBlockingCollection.Count}> irrelevant items and <{CrawlerFailuresBlockingCollection.Count}> failures.");
Console.WriteLine("###############################");
}
//### Crawler Methods ###
//### Auxiliary Methods ###
public static void CrawlerResultantFileWriteLine(String Line)
{
CrawlerReaderWriterLockSlim.EnterWriteLock();
try { using (StreamWriter StreamWriter = File.AppendText(String.Concat(AppDomain.CurrentDomain.BaseDirectory, "\\", CrawlerResultantFileName))) { StreamWriter.WriteLine(Line); StreamWriter.Close(); } }
finally { CrawlerReaderWriterLockSlim.ExitWriteLock(); }
}
//### Auxiliary Methods ###
}
}
Questions
(Q1.) Is the presented approach optimal ?
(CrawlerFailuresBlockingCollection.Count + CrawlerHits.Count + CrawlerMisses.Count) has to equal(ThreadsCount * ThreadRange)
Do some items get omitted? if so, Why?
(A2.) As Joshua Robinson pointed out, I was using System.Collections.Generic.List which is unsafe collection to modify from multiple threads. I switched to System.Collections.Concurrent.BlockingCollection, tested it with big number like 10000 and it worked.
(Q3.) How to pinpoint the optimal ThreadRange and ThreadsCount combination regarding performance. Which one?
- (A) Min ThreadsCount with max ThreadRange
- (B) Low ThreadsCount with high ThreadRange
- (C) Median thread range with Median thread count
- (B) High ThreadsCount with low ThreadRange
- (E) Max ThreadsCount with min ThreadRange
To be more elaborate, If I wanted to process 10000 Items from this endpoint these are all the possible combinations satisfying the necessary predicate ThreadsCount*ThreadRange=10000 by that I have choose only from the them.
Which is the most optimal? And why?
<1> ThreadsCount * <10000> ThreadRange <2> ThreadsCount * <5000> ThreadRange <4> ThreadsCount * <2500> ThreadRange <5> ThreadsCount * <2000> ThreadRange <8> ThreadsCount * <1250> ThreadRange <10> ThreadsCount * <1000> ThreadRange <16> ThreadsCount * <625> ThreadRange <20> ThreadsCount * <500> ThreadRange <25> ThreadsCount * <400> ThreadRange <40> ThreadsCount * <250> ThreadRange <50> ThreadsCount * <200> ThreadRange <80> ThreadsCount * <125> ThreadRange <100> ThreadsCount * <100> ThreadRange // Exact Median → square root of items to process if integer <125> ThreadsCount * <80> ThreadRange <200> ThreadsCount * <50> ThreadRange <250> ThreadsCount * <40> ThreadRange <400> ThreadsCount * <25> ThreadRange <500> ThreadsCount * <20> ThreadRange <625> ThreadsCount * <16> ThreadRange <1000> ThreadsCount * <10> ThreadRange <1250> ThreadsCount * <8> ThreadRange <2000> ThreadsCount * <5> ThreadRange <2500> ThreadsCount * <4> ThreadRange <5000> ThreadsCount * <2> ThreadRange <10000> ThreadsCount * <1> ThreadRange