2

I'm working on a PyQt program, which basically collects data from the internet. In this example, I'm trying to get data from a RSS web page.

Let's assume self.feed is the RSS page, containing all the articles, and let's assume "entry" is an article. "entry.url" is the original page of an article, on the website.

from requests_futures.sessions import FuturesSession

self.session_pages = FuturesSession(max_workers=20)
for entry in self.feed.entries:
    future = self.session_pages.get(entry.url, timeout=10)
    future.add_done_callback(my_call_back)

That's basically how I do it. It's embedded in a PyQt thread, and I run several threads at the same time, but I think the problem doesn't come from PyQt.

My problem is I think the futures don't close the connection, even when they're done. I check it like that:

lsof -i | grep "python" | wc -l

losf -i gives the open files involved in a connection. The rest of the command is to count the number of files open. This number doesn't stop growing (something like 900), and then I get the following error:

(python:28285): GLib-ERROR **: Creating pipes for GWakeup: Too many open files
[1]    28285 trace trap (core dumped)  python gui.py

I think the problem comes from the futures, but I'm not sure, actually.

I tried something like:

self.session_pages.shutdown()

at the end of the thread, but it didn't work.

Do you have any idea ?

JPFrancoia
  • 4,866
  • 10
  • 43
  • 73

2 Answers2

2

I don't see FutureSession in python's concurrent.futures? I am making some assumptions here.

Unless the callback is unique to each self.session_page.get(...) I think the future.add_done_callback(my_call_back) line might be creating new, and overwriting the object id of the callback, or may be incorrect?

This is from the only place where I could find reference to FutureSession within the context of what you are using:

from pprint import pprint
from requests_futures.sessions import FuturesSession

session = FuturesSession()

def bg_cb(sess, resp):
    # parse the json storing the result on the response object
    resp.data = resp.json()

future = session.get('http://httpbin.org/get', background_callback=bg_cb)
# do some other stuff, send some more requests while this one works
response = future.result()
print('response status {0}'.format(response.status_code))
# data will have been attached to the response object in the background
pprint(response.data)

try setting background_callback

Update:

I would try using self.session_pages.request instead of self.session_pages.get since FutureSession is composed of the thread pool executor and the requests.Sessions

Yes this is the case:

(Pdb) inspect.getmro(FuturesSession)
(<class '__main__.FuturesSession'>, <class 'requests.sessions.Session'>, <class 'requests.sessions.SessionRedirectMixin'>, <class 'object'>)
(Pdb) vars()
{'DEFAULT_POOLSIZE': 10, '__return__': None, '__spec__': None, 'inspect': <module 'inspect' from '/usr/lib/python3.4/inspect.py'>, '__file__': 'requestsfutures.py', 'FuturesSession': <class '__main__.FuturesSession'>, 'HTTPAdapter': <class 'requests.adapters.HTTPAdapter'>, 'ThreadPoolExecutor': <class 'concurrent.futures.thread.ThreadPoolExecutor'>, 'Session': <class 'requests.sessions.Session'>, '__name__': '__main__', '__cached__': None, '__doc__': "\nrequests_futures\n~~~~~~~~~~~~~~~~\n\nThis module provides a small add-on for the requests http library. It makes use\nof python 3.3's concurrent.futures or the futures backport for previous\nreleases of python.\n\n    from requests_futures import FuturesSession\n\n    session = FuturesSession()\n    # request is run in the background\n    future = session.get('http://httpbin.org/get')\n    # ... do other stuff ...\n    # wait for the request to complete, if it hasn't already\n    response = future.result()\n    print('response status: {0}'.format(response.status_code))\n    print(response.content)\n\n", 'pdb': <module 'pdb' from '/usr/lib/python3.4/pdb.py'>, '__loader__': <_frozen_importlib.SourceFileLoader object at 0x7f6d84194470>, '__builtins__': <module 'builtins' (built-in)>, '__package__': None}
(Pdb) vars().keys()
dict_keys(['DEFAULT_POOLSIZE', '__return__', '__spec__', 'inspect', '__file__', 'FuturesSession', 'HTTPAdapter', 'ThreadPoolExecutor', 'Session', '__name__', '__cached__', '__doc__', 'pdb', '__loader__', '__builtins__', '__package__'])
(Pdb) vars()['FuturesSession']
<class '__main__.FuturesSession'>
(Pdb) vars()['FuturesSession'].get
<function Session.get at 0x7f6d80c07488>
(Pdb) vars()['Session'].get
<function Session.get at 0x7f6d80c07488>
Community
  • 1
  • 1
jmunsch
  • 22,771
  • 11
  • 93
  • 114
  • Yes you're right, my bad. I edited my question and added the import. But actually, FuturesSession is a wrapper on ThreadPoolExecutor (I think it's this class, I never recall), and the futures of FutureSession are basically concurrent.futures. – JPFrancoia Apr 19 '15 at 20:10
  • 1
    updated my answer after looking at how that works. I think that by calling `get` the call is using the regular `requests` get method and is not calling any overridden callback/method at all. – jmunsch Apr 19 '15 at 20:41
2

Ok, you were rigt @jm_____. the get() call is simply a call to requests.get. So I used the answer from here:

Python-Requests close http connection

And more specifically:

future = self.session_pages.get(url, timeout=20, headers={'Connection':'close'})

And now lsof indicates normal numbers. Thank you.

Community
  • 1
  • 1
JPFrancoia
  • 4,866
  • 10
  • 43
  • 73