4

I am running Python ib-api to receive the realtime market data from Interactive Brokers. It can provide the data I expected but it ends with "unhandled exception in EReader thread".

from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract as IBcontract
from threading import Thread
import queue
import pandas as pd
from ibapi.ticktype import TickTypeEnum`

`DEFAULT_PRICE_DATA_ID = 1001`

`FINISHED = object()
STARTED = object()
TIME_OUT = object()`

class finishableQueue(object):

    def __init__(self, queue_to_finish):

        self._queue = queue_to_finish
        self.status = STARTED

    def get(self, timeout):

        contents_of_queue=[]
        finished=False

        while not finished:
            try:
                current_element = self._queue.get(timeout=timeout)
                if current_element is FINISHED:
                    finished = True
                    self.status = FINISHED
                else:
                    contents_of_queue.append(current_element)

            except queue.Empty:
                finished = True
                self.status = TIME_OUT

        return contents_of_queue

    def timed_out(self):
        return self.status is TIME_OUT


class TestWrapper(EWrapper):

    def __init__(self):
        self._my_price_data_dict = {}

    def get_error(self, timeout=5):
        if self.is_error():
            try:
                return self._my_errors.get(timeout=timeout)
            except queue.Empty:
                return None

        return None

    def is_error(self):
        an_error_if=not self._my_errors.empty()
        return an_error_if

    def init_error(self):
        error_queue=queue.Queue()
        self._my_errors = error_queue

    def error(self, id, errorCode, errorString):
        ## Overriden method
        errormsg = "IB error id %d errorcode %d string %s" % (id, errorCode, errorString)
        self._my_errors.put(errormsg)

    def init_ibprices(self, tickerid):
        ibprice_data_queue = self._my_price_data_dict[tickerid] = queue.Queue()

        return ibprice_data_queue

    def tickPrice(self, reqId, tickType, price, attrib):
        tickdata = (TickTypeEnum.to_str(tickType), price)

        price_data_dict = self._my_price_data_dict

        if reqId not in price_data_dict.keys():
            self.init_ibprices(reqId)

        price_data_dict[reqId].put(tickdata)


class TestClient(EClient):

    def __init__(self, wrapper):
        EClient.__init__(self, wrapper)

    def error(self, reqId, errorCode, errorString):
        print("Error: ", reqId, " ", errorCode, " ", errorString)

    def getIBrealtimedata(self, ibcontract, tickerid=DEFAULT_PRICE_DATA_ID):
        ib_data_queue = finishableQueue(self.init_ibprices(tickerid))

        self.reqMktData(
            tickerid,
            ibcontract,
            "",
            False,
            False,
            []
        )

        MAX_WAIT_SECONDS = 5
        print("Getting data from the server... could take %d seconds to complete " % MAX_WAIT_SECONDS)

        price_data = ib_data_queue.get(timeout = MAX_WAIT_SECONDS)

        while self.wrapper.is_error():
            print(self.get_error())

        if ib_data_queue.timed_out():
            print("Exceeded maximum wait for wrapper to confirm finished - seems to be normal behaviour")

        self.cancelMktData(tickerid)

        return price_data

class TestApp(TestWrapper, TestClient):
    def __init__(self, ipaddress, portid, clientid):
        TestWrapper.__init__(self)
        TestClient.__init__(self, wrapper=self)

        self.connect(ipaddress, portid, clientid)

        thread = Thread(target = self.run)
        thread.start()

        setattr(self, "_thread", thread)

        self.init_error()

def main(slist):

    app = TestApp("127.0.0.1", 7497, 1)

    for i in slist:
        ibcontract = IBcontract()
        ibcontract.secType = "STK"
        ibcontract.symbol = i
        ibcontract.exchange ="SEHK"

        Lastprice = app.getIBrealtimedata(ibcontract)

        df = pd.DataFrame(Lastprice)
        print(ibcontract.symbol, df.head())

    app.disconnect()

if __name__ == "__main__":

    seclist = [700,2318,5,12]
    main(seclist)

Here are the error messages:

unhandled exception in EReader thread Traceback (most recent call last): File "D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\reader.py", line 34, >in run data = self.conn.recvMsg() File "D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\connection.py", line >99, in recvMsg buf = self._recvAllMsg() File "D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\connection.py", line >119, in _recvAllMsg buf = self.socket.recv(4096) OSError: [WinError 10038] An operation was attempted on something that is >not a socket

Vega
  • 27,856
  • 27
  • 95
  • 103
Michael Siu
  • 41
  • 1
  • 3

2 Answers2

2

A separate thread is started to read incoming messages from the socket:

thread = Thread(target = self.run)
thread.start()

But this thread is never stopped, and is still running when you call disconnect(). As a result, it tries to access the socket object which is now None, triggering the error. Try stopping the EReader thread prior to disconnecting by setting done=True.

As a side note, since this error happens at the very end of the program at the disconnection it shouldn't interfere with receiving the expected data.

Josh
  • 706
  • 3
  • 8
  • I set a 'app.done = True' before the 'app.disconnect()', error still exist. – Michael Siu Aug 20 '19 at 10:18
  • You might need to upgrade your API version. https://groups.io/g/twsapi/message/42580 – Josh Aug 23 '19 at 22:12
  • 1
    I'm having a similar issue with newer API versions. I also tried adding a "app.done = True" on my code and nothing happened. When running an older API version, the error disappears (see here: https://stackoverflow.com/questions/57618971/how-do-i-get-my-accounts-positions-at-interactive-brokers-using-python-api/57813372#57813372). However, I noticed that this older version is not returning the data for some callbacks the way it should. Therefore, as of now, I have to choose between a suboptimal API callback library or always get an error message when running my code. – Rational-IM Nov 26 '19 at 21:35
  • There's a modification that is done in `connection.py` which is not added in the official release yet. The error is resolved. Please refer [here](https://stackoverflow.com/a/58684561/8907904). – Krishnendu S. Kar Jan 20 '21 at 06:20
1

A workaround to avoid the warning.

Implement this at your EClient / EWrapper subclass:

  1. Create a socket shutdown function:
import socket, time
[...]

    def _socketShutdown(self):
        self.conn.lock.acquire()
        try:
            if self.conn.socket is not None:
                self.conn.socket.shutdown(socket.SHUT_WR)
        finally:
            self.conn.lock.release()
  1. Use it before closing the connection:
    self._socketShutdown()
    time.sleep(1) 
    self.disconnect()
Dharman
  • 30,962
  • 25
  • 85
  • 135
maremoto007
  • 104
  • 1
  • 5