2

I am having some huuuge problems running the asyncio with websockets for my python jupyter. I read this existing thread : How do I run Python asyncio code in a Jupyter notebook?

However, I still get errors when I apply the fix of just running : await in Jupyter python.

When I use the standard code-line :

asyncio.get_event_loop().run_until_complete(call_api(json.dumps(msg)))

My error message is as follows :

RuntimeError                              Traceback (most recent call last)
<ipython-input-40-23500a349044> in <module>
     65 asyncio.run_coroutine_threadsafe(call_api(msg), loop)
     66 
---> 67 asyncio.get_event_loop().run_until_complete(call_api(json.dumps(msg)))
     68 

C:\ProgramData\Anaconda3\lib\asyncio\base_events.py in run_until_complete(self, future)
    569         future.add_done_callback(_run_until_complete_cb)
    570         try:
--> 571             self.run_forever()
    572         except:
    573             if new_task and future.done() and not future.cancelled():

C:\ProgramData\Anaconda3\lib\asyncio\base_events.py in run_forever(self)
    524         self._check_closed()
    525         if self.is_running():
--> 526             raise RuntimeError('This event loop is already running')
    527         if events._get_running_loop() is not None:
    528             raise RuntimeError(

RuntimeError: This event loop is already running

I apply the fix in the reference thread as below

# jupyter cannot use python main syntax and have to use loop form below.
loop = asyncio.get_event_loop()
loop.create_task(call_api(msg))
asyncio.run_coroutine_threadsafe(call_api(msg), loop)

but, then I get no results back, except for a code-line as per:

<Future at 0xa02de10 state=pending>

full-code as per below.

# this code seems to work

import pandas as pd
import numpy as np
import scipy as sc
import datetime as dt
import time
import matplotlib.pyplot as plt

from datetime import date
import asyncio
import websockets
import json

from IPython.display import clear_output

# To subscribe to this channel:
msg = \
    {"jsonrpc": "2.0",
     "method": "public/subscribe",
     "id": 42,
     "params": {
        "channels": ["markprice.options.btc_usd"]}
    }


db = []
column = ['iv', 'mark_price', 'commod', 'expiry', 'strike', 'callput']

async def call_api(msg, t_delay =10, t_period =1):
    async with websockets.connect('wss://test.deribit.com/ws/api/v2') as websocket:
        await websocket.send(msg)

        c_time = 0.0 # initialize start time
        s_time = time.time() + t_delay # get end time time

        while (websocket.open)& (c_time <= s_time): # this is while webscoket is open, we use 'while websocket.open:'
            response = await websocket.recv()
            d = json.loads(response)
            if 'params' not in d:
                continue

            else:
                db = d['params']['data']

            # sets up the panda dataframe and the information transform.
            df = pd.DataFrame(db)
            df['splitinst'] = df['instrument_name'].str.split("-")
            df[['commod','expiry','strike','callput']] = pd.DataFrame(df.splitinst.values.tolist(), index=df.index)
            df['expiry'] = pd.to_datetime(df['expiry'])
            df = df.sort_values(by=['expiry','strike'])

            df.to_json('C:/settles.json', orient='records')
            df.to_csv('C:/settles.csv')


            await asyncio.sleep(t_period) # wait for this seconds before doing loo again
            c_time = time.time() # update the time

            clear_output(wait = True)
            print('current time is', c_time)

            print(df.iloc[0])

# jupyter cannot use python main syntax and have to use loop form below.
loop = asyncio.get_event_loop()
loop.create_task(call_api(msg))
asyncio.run_coroutine_threadsafe(call_api(msg), loop)

# asyncio.get_event_loop().run_until_complete(call_api(json.dumps(msg)))
eyllanesc
  • 235,170
  • 19
  • 170
  • 241
Kiann
  • 531
  • 1
  • 6
  • 20

0 Answers0