So I guess it boils down to:
1) How to properly throttle the data requests.
2) If a background worker thread is used, how to access function calls on the UI thread in the IB app.
1) How to properly throttle the data requests.
It depends how far you want to go.
If you want a simple solution you can use...
// On UI thread, non-blocking
Task.Delay(1000).ContinueWith(() => {
// make request 2
}
For complete control of all requests, and to maintain both the 50/sec api message limit and the 6/min (soft) or short term burst limits of historical data requests I would recommend at least 2 separate threads based on a producer/consumer model.
A basic outline to get you started follows... (alternative to Task.Delay above)
// The data class used to queue a request
class Request
{
public int? reqId;
public ReqType? reqType, reqFrom;
public bool snapshot = false, regulatorySnaphsot = false;
public string? genericTickList;
// For historical data
public string? endDateTime, duration, barSize, show;
public int? useRTH, formatDate;
public bool update = false;
public Request(int reqId, ReqType reqType, Contract contract, string endDateTime, string duration, string barSize, string show, int useRTH, int formatDate, bool update)
{
// Historical data
this.reqId = reqId;
this.reqType = reqType;
this.contract = contract;
this.endDateTime = endDateTime;
this.duration = duration;
this.barSize = barSize;
this.show = show;
this.useRTH = useRTH;
this.formatDate = formatDate;
this.update = update;
}
}
/*
* This class receives all requests, queues them and process with delay to avoid exceeding message rate limitations
* It also needs to take care of all outstanding requests, so requests for data can be cancelled in one place.
*/
class Requests<T>:IDisposable where T : Request
{
private readonly IBClient ibClient;
private readonly object msgLock = new();
private readonly object historyLock = new();
private readonly Thread msgThread, contractThread, mktDataThread, historyThread;
private readonly Queue<Request> msgQueue = new();
private readonly Queue<Request> historyQueue = new();
private readonly static AutoResetEvent waitHistory = new(true);
private readonly Dictionary<int, Request> historyReqs = new();
public Requests(IBClient ibClient)
{
this.ibClient = ibClient;
msgThread = new Thread(ConsumeMsg);
msgThread.Start();
historyThread = new Thread(ConsumeHistory);
historyThread.Start();
}
private void EnqueueMsg(T req)
{
lock(msgLock)
{
msgQueue.Enqueue(req);
Monitor.PulseAll(msgLock);
}
}
private void ConsumeMsg()
{
/*
* The message queue does not wait other than ~25ms for rate limitation of messages (50/sec max).
* Other queues are responsible for limiting request rates depending on request type.
* We do not increment any counters here, that is done in the respective queue that rate limits requests by type
* EVERY counter is decremented here!
*/
while(true)
{
Request req;
lock(msgLock)
{
while(msgQueue.Count == 0)
Monitor.Wait(msgLock); // Wait for next Task in queue
req = msgQueue.Dequeue();
if(req == null)
return; // This signals our exit
}
switch(req.reqType)
{
case ReqType.History:
ibClient.ClientSocket.reqHistoricalData((int)req.reqId, req.contract, req.endDateTime, req.duration, req.barSize, req.show, req.useRTH ?? 1, req.formatDate ?? 1, req.update, new List<TagValue>());
break;
case ReqType.CancelHistory:
ibClient.ClientSocket.cancelHistoricalData((int)req.reqId);
historyReqs.Remove((int)req.reqId);
break;
}
Thread.Sleep(20); // This prevents over 50 msg per second.
}
}
public void HistoricalData(int reqId, Contract contract, string endDateTime, string duration, string barSize, string show, int useRTH, int formatDate, bool update)
{
EnqueueHistoryRequest((T)new Request(reqId, ReqType.History, contract, endDateTime, duration, barSize, show, useRTH, formatDate, update));
}
public void CancelHistoryData(int reqId)
{
EnqueueMsg((T)new Request(reqId, ReqType.CancelHistory));
}
private void EnqueueHistoryRequest(T req)
{
lock(historyLock)
{
historyQueue.Enqueue(req);
Monitor.PulseAll(historyLock);
}
}
private void ConsumeHistory()
{
while(true)
{
Request req;
lock(historyLock)
{
while(historyQueue.Count == 0)
Monitor.Wait(historyLock); // Wait for next Task in queue
req = historyQueue.Dequeue();
}
if(req == null) return; // This signals our exit
EnqueueMsg((T)req);
// We actually have a soft 6/min limit on hist data.
// This delay does not follow that, we're limiting the number to 50 instead.
Thread.Sleep(800);
}
}
public void HistDataRecieved(int reqID)
{
historyReqs.Remove(reqID);
if(!waitHistory.SafeWaitHandle.IsClosed && ReqCntHistory < MaxReqHistory)
waitHistory.Set(); // We can proceed if less than maxHistory requests outstanding
}
}
To use, in a UI form
Requests = new(iBclient);
requests.Add(new HistoricalData(......));
2) If a background worker thread is used, how to access function calls on the UI thread in the IB app.
The iBclient class runs on a separate thread, and sends events and data back to the UI thread.
This means your UI thread only works when data is received, there is never any need to Thread.Sleep or DoEvents on the UI thread.
You can receive these events on the UI thread by registering such as below...
private readonly EReaderMonitorSignal signal;
private readonly IBClient ibClient;
partial class MyForm:Form
{
public MyForm()
{
InitializeComponent();
signal = new EReaderMonitorSignal();
ibClient = new(signal);
// Register for events
// Depending on your API version use one of the following to know when connected.
ibClient.ConnectedAndReady += Recv_ConnectedAndReady;
ibClient.NextOrderId += Recv_NextOrderId;
ibClient.Error += Recv_Error;
ibClient.HistoricalData += Recv_HistoricalData;
ibClient.HistoricalDataEnd += Recv_HistoricalDataEnd;
// Connect and start API thread.
ibClient.ClientId = 0;
ibClient.ClientSocket.eConnect("127.0.0.1", 7496, 0); // (ClientId = 0) This is the master client that will receive all orders (even from TWS)
var reader = new EReader(ibClient.ClientSocket, signal);
reader.Start();
new Thread(() =>
{
while(ibClient.ClientSocket.IsConnected())
{
signal.waitForSignal();
reader.processMsgs();
}
})
{ IsBackground = true }.Start();
}
private void Recv_ConnectedAndReady(ConnectionStatusMessage msg)
{
// We are now connected and can make requests
// Older API versions use NextOrderId(int)
ibClient.ClientSocket.reqHistoricalData(reqId, contract, endDateTime, durationStr, "5 min", "Trades", 0, 0, new List<TagValue>())
}
private void Recv_Error(int reqId, int errorCode, string? str, string? ordRejectJson, Exception? ex)
{
// Possible combinations of parameters
// reqId, errorCode, str, ex
//
// 0,0,null,Exception
// 0,0,string,null
// -1,int,string,null
// int,int,string,null
}
private void Recv_HistoricalData(HistoricalDataMessage msg)
{
//msg.RequestId is the same Id you used to make the request. This is how you can separate data to different charts.
}
private void Recv_HistoricalDataEnd(HistoricalDataEndMessage msg)
{
//request complete.
}
}