I am having some huuuge problems running the asyncio with websockets for my python jupyter. I read this existing thread : How do I run Python asyncio code in a Jupyter notebook?
However, I still get errors when I apply the fix of just running : await in Jupyter python.
When I use the standard code-line :
asyncio.get_event_loop().run_until_complete(call_api(json.dumps(msg)))
My error message is as follows :
RuntimeError Traceback (most recent call last)
<ipython-input-40-23500a349044> in <module>
65 asyncio.run_coroutine_threadsafe(call_api(msg), loop)
66
---> 67 asyncio.get_event_loop().run_until_complete(call_api(json.dumps(msg)))
68
C:\ProgramData\Anaconda3\lib\asyncio\base_events.py in run_until_complete(self, future)
569 future.add_done_callback(_run_until_complete_cb)
570 try:
--> 571 self.run_forever()
572 except:
573 if new_task and future.done() and not future.cancelled():
C:\ProgramData\Anaconda3\lib\asyncio\base_events.py in run_forever(self)
524 self._check_closed()
525 if self.is_running():
--> 526 raise RuntimeError('This event loop is already running')
527 if events._get_running_loop() is not None:
528 raise RuntimeError(
RuntimeError: This event loop is already running
I apply the fix in the reference thread as below
# jupyter cannot use python main syntax and have to use loop form below.
loop = asyncio.get_event_loop()
loop.create_task(call_api(msg))
asyncio.run_coroutine_threadsafe(call_api(msg), loop)
but, then I get no results back, except for a code-line as per:
<Future at 0xa02de10 state=pending>
full-code as per below.
# this code seems to work
import pandas as pd
import numpy as np
import scipy as sc
import datetime as dt
import time
import matplotlib.pyplot as plt
from datetime import date
import asyncio
import websockets
import json
from IPython.display import clear_output
# To subscribe to this channel:
msg = \
{"jsonrpc": "2.0",
"method": "public/subscribe",
"id": 42,
"params": {
"channels": ["markprice.options.btc_usd"]}
}
db = []
column = ['iv', 'mark_price', 'commod', 'expiry', 'strike', 'callput']
async def call_api(msg, t_delay =10, t_period =1):
async with websockets.connect('wss://test.deribit.com/ws/api/v2') as websocket:
await websocket.send(msg)
c_time = 0.0 # initialize start time
s_time = time.time() + t_delay # get end time time
while (websocket.open)& (c_time <= s_time): # this is while webscoket is open, we use 'while websocket.open:'
response = await websocket.recv()
d = json.loads(response)
if 'params' not in d:
continue
else:
db = d['params']['data']
# sets up the panda dataframe and the information transform.
df = pd.DataFrame(db)
df['splitinst'] = df['instrument_name'].str.split("-")
df[['commod','expiry','strike','callput']] = pd.DataFrame(df.splitinst.values.tolist(), index=df.index)
df['expiry'] = pd.to_datetime(df['expiry'])
df = df.sort_values(by=['expiry','strike'])
df.to_json('C:/settles.json', orient='records')
df.to_csv('C:/settles.csv')
await asyncio.sleep(t_period) # wait for this seconds before doing loo again
c_time = time.time() # update the time
clear_output(wait = True)
print('current time is', c_time)
print(df.iloc[0])
# jupyter cannot use python main syntax and have to use loop form below.
loop = asyncio.get_event_loop()
loop.create_task(call_api(msg))
asyncio.run_coroutine_threadsafe(call_api(msg), loop)
# asyncio.get_event_loop().run_until_complete(call_api(json.dumps(msg)))