0

Im using django celery and celery beat to run periodic tasks. I run a task every one minute to get some data via SNMP.

My function uses asyncio as per the below. I have put a check in the code to check if the loop is closed and to create a new one.

but what seems to be happening is every few tasks, I get an failure and in the Django-tasks-results db I have the below traceback. there seems to be a failure around every 3 minutes, but there are successes every minute there are not failures

Error:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 374, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 629, in __protected_call__
    return self.run(*args, **kwargs)
  File "/itapp/itapp/monitoring/tasks.py", line 32, in link_data
    return get_link_data()
  File "/itapp/itapp/monitoring/jobs/link_monitoring.py", line 209, in get_link_data
    done, pending = loop.run_until_complete(asyncio.wait(tasks))
  File "/usr/local/lib/python3.6/asyncio/base_events.py", line 468, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 311, in wait
    fs = {ensure_future(f, loop=loop) for f in set(fs)}
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 311, in <setcomp>
    fs = {ensure_future(f, loop=loop) for f in set(fs)}
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 514, in ensure_future
    raise ValueError('loop argument must agree with Future')
ValueError: loop argument must agree with Future

Function:

async def retrieve_data(link):
    poll_interval = 60
    results = []
    # credentials:
    link_mgmt_ip = link.mgmt_ip
    link_index = link.interface_index
    snmp_user = link.device_circuit_subnet.device.snmp_data.name
    snmp_auth = link.device_circuit_subnet.device.snmp_data.auth
    snmp_priv = link.device_circuit_subnet.device.snmp_data.priv
    hostname = link.device_circuit_subnet.device.hostname
    print('polling data for {} on {}'.format(hostname,link_mgmt_ip))

    # first poll for speeds
    download_speed_data_poll1 = snmp_get(link_mgmt_ip, down_speed_oid % link_index ,snmp_user, snmp_auth, snmp_priv)

    # check we were able to poll
    if 'timeout' in str(get_snmp_value(download_speed_data_poll1)).lower():
        return 'timeout trying to poll {} - {}'.format(hostname ,link_mgmt_ip)
    upload_speed_data_poll1 = snmp_get(link_mgmt_ip, up_speed_oid % link_index, snmp_user, snmp_auth, snmp_priv) 

    # wait for poll interval
    await asyncio.sleep(poll_interval)

    # second poll for speeds
    download_speed_data_poll2 = snmp_get(link_mgmt_ip, down_speed_oid % link_index, snmp_user, snmp_auth, snmp_priv)
    upload_speed_data_poll2 = snmp_get(link_mgmt_ip, up_speed_oid % link_index, snmp_user, snmp_auth, snmp_priv)    

    # create deltas for speed
    down_delta = int(get_snmp_value(download_speed_data_poll2)) - int(get_snmp_value(download_speed_data_poll1))
    up_delta = int(get_snmp_value(upload_speed_data_poll2)) - int(get_snmp_value(upload_speed_data_poll1))

    # set speed results
    download_speed = round((down_delta * 8 / poll_interval) / 1048576)
    upload_speed = round((up_delta * 8 / poll_interval) / 1048576)

    # get description and interface state
    int_desc = snmp_get(link_mgmt_ip, int_desc_oid % link_index, snmp_user, snmp_auth, snmp_priv)   
    int_state = snmp_get(link_mgmt_ip, int_state_oid % link_index, snmp_user, snmp_auth, snmp_priv)

    ...
    return results

def get_link_data():  
    mgmt_ip = Subquery(
        DeviceCircuitSubnets.objects.filter(device_id=OuterRef('device_circuit_subnet__device_id'),subnet__subnet_type__poll=True).values('subnet__subnet')[:1])
    link_data = LinkTargets.objects.all() \
                .select_related('device_circuit_subnet') \
                .select_related('device_circuit_subnet__device') \
                .select_related('device_circuit_subnet__device__snmp_data') \
                .select_related('device_circuit_subnet__subnet') \
                .select_related('device_circuit_subnet__circuit') \
                .annotate(mgmt_ip=mgmt_ip) 
    tasks = []
    loop = asyncio.get_event_loop()
    if asyncio.get_event_loop().is_closed():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(asyncio.new_event_loop())

    for link in link_data:
        tasks.append(asyncio.ensure_future(retrieve_data(link)))

    if tasks:
        start = time.time()  
        done, pending = loop.run_until_complete(asyncio.wait(tasks))
        loop.close()  

        results = []
        for completed_task in done:
            results.append(completed_task.result()[0])

        end = time.time() 
        print("Poll time: {}".format(end - start))
        return 'Link data updated for {}'.format(' \n '.join(results))
    else:
        return 'no tasks defined'
AlexW
  • 2,843
  • 12
  • 74
  • 156
  • This seems to be an exact duplicate of the question you've [already posted](https://stackoverflow.com/questions/50257125/python-3-6-asyncio-loop-argument-must-agree-with-future?noredirect=1) and then deleted. The design seems flawed beginning with the fact that your coroutine doesn't `await` anything other than `asyncio.sleep()`. – user4815162342 May 15 '18 at 15:06
  • sorry, I deleted the original question because the script worked after a few tweaks, I assumed at this point everything is ok with it working 2/3 times. im new to Asyncio so im not quite sure what you mean? I need the function to wait 60 seconds between snmp polls to collect data – AlexW May 15 '18 at 15:14
  • We've already had this conversation in the original question and I responded that you should await any operation that can block. I also referred you to [a tutorial](https://medium.freecodecamp.org/a-guide-to-asynchronous-programming-in-python-with-asyncio-232e2afa44f6) and [an answer](https://stackoverflow.com/questions/33357233/when-to-use-and-when-not-to-use-python-3-5-await/33399896#33399896) with more information. – user4815162342 May 15 '18 at 15:17
  • ive read both of those three times at least, its quite confusing for a novice but from what I understood I thought my script was ok, and when it worked I assume thus. im not sure I have any blocking operations in there. a blocking operation anything that takes longer than 50ms? is that right? and if I have one of those it should be run in an executor? – AlexW May 15 '18 at 15:27
  • A blocking operation is anything that may wait for an arbitrary amount of time, e.g. anything that depends on network latency or a server's response. If you don't use asyncio-compatible libraries and you don't `await` your blocking calls, then using asyncio doesn't buy you anything. You get no parallelism, and you also got the exception, likely due to conflict with another event loop some of your libraries is using internally. Try using an ordinary `def` instead of `async def` for `retrieve_data` and replace `await asyncio.sleep()` with `time.sleep()`. – user4815162342 May 15 '18 at 15:47
  • I think I understand, any function thats run within my main function must also be an async function? – AlexW May 16 '18 at 10:18
  • If it is contacting the network and or can take a long time to run for other reasons, then yes. Functions that just do something in memory can safely remain regular functions. [This question](https://stackoverflow.com/questions/49660129/asyncio-how-many-coroutines) contains further discussion about converting an existing code base. – user4815162342 May 16 '18 at 21:22
  • Thankyou, so I have converted my snmp function to async. I have another function that uses regex to extract data, now that would be a memory function and not need async? but I have functions that query and add records to a django DB, now that would need to be converted? but i dont think Django can be run async, so then for that function it would be need to be moved to an executor? hopefully im getting this right? thanks again – AlexW May 17 '18 at 08:26
  • Yes, for those that have no async support, you'd use `loop.run_in_executor(None, lambda: ...)`. That will return a future that you can `await`, pass to `asyncio.gather`, etc. – user4815162342 May 17 '18 at 08:33
  • thanks for your help with this, I think ive finally made a working script however have been stumped by celery in this question (https://stackoverflow.com/questions/50512302/django-celery-asyncio-daemonic-process-are-not-allowed-to-have-children) shall I delete this question you've assited with or would you like to post answer? – AlexW May 24 '18 at 14:58
  • I don't feel like my answer would cover it properly, but you could post your own answer that shortly explains how you fixed the issue. Who knows, maybe it will help someone googling this problem later? – user4815162342 May 24 '18 at 15:28

1 Answers1

0

from these urls suggested by user4815162342

https://medium.freecodecamp.org/a-guide-to-asynchronous-programming-in-python-with-asyncio-232e2afa44f6

When to use and when not to use Python 3.5 `await` ?

When running ascync functions, any input output operation needs to be async compatible, except functions that are run in memory. (in my example a regex query)

i.e any function that needs to gather data from another source (a django query in my example) which is not async compatible must be run in an executor.

I think I have now fixed my issues by running all django DB calls in executors, I have not had an issue running the script ad hoc since.

However I have compatibility issue with celery and async (as celery is not yet compatible with asyncio which throws up some errors, but not the errors I was previously seeing)

AlexW
  • 2,843
  • 12
  • 74
  • 156