I am running a streaming task inside a celery worker which is giving me 1 minute candle stick data and then I'm converting that 1 minute incoming data to 5 minute data but this task always starts giving some wierd output after sometime but works fine when I run it on jupyter notebook separately.
df = pd.DataFrame(columns=['time', 'open', 'high','low','close'])
df.set_index('time', inplace=True)
cache.set('df', df)
df_5min=pd.DataFrame()
cache.set('df_5min',df_5min)
max_strategy_run=2
cache.set('max_strategy_run', max_strategy_run)
@shared_task(name="strategy_1",queue='strategy_1_queue')
def strategy_1():
client = easy_client(
api_key='',
redirect_uri='',
token_path='')
stream_client = StreamClient(client)
strategy=Strategy.objects.filter(name="strategy_1").first()
stock_symbol=strategy.stock.symbol
async def read_stream():
await stream_client.login()
await stream_client.quality_of_service(StreamClient.QOSLevel.FAST)
async def print_message(message):
#global df
#global df_5min
#global max_strategy_run
#global place_order_data
if message['service']=='CHART_EQUITY':
df=cache.get('df')
df_5min=cache.get('df_5min')
max_strategy_run=cache.get('max_strategy_run')
if max_strategy_run==0:
await stream_client.chart_equity_unsubs([stock_symbol])
await stream_client.level_one_equity_unsubs([stock_symbol])
if df.shape[0]==0 and datetime.datetime.fromtimestamp(message['content'][0]['CHART_TIME']/1000).minute%5!=0:
pass
else:
df = df.append({
'time': datetime.datetime.fromtimestamp(message['content'][0]['CHART_TIME']/1000),
'open': message['content'][0]['OPEN_PRICE'],
'high': message['content'][0]['HIGH_PRICE'],
'low': message['content'][0]['LOW_PRICE'],
'close': message['content'][0]['CLOSE_PRICE'],
},ignore_index=True)
print(json.dumps(message, indent=4))
if df.shape[0]==5:
df.set_index('time', inplace=True)
df = df.resample('5T').agg({'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last'})
df_5min=copy.deepcopy(df)
print('New DataFrame (5 minute):\n', df_5min)
if df_5min.loc[df_5min.index[0],'open']>df_5min.loc[df_5min.index[0],'close']:
max_strategy_run=max_strategy_run-1
option=await getOptionSymbol(client,message['content'][0]['key'])
print(option)
place_order_data = await placeOrderDataStrategy1(option,df_5min)
print(place_order_data)
df.drop(index=df.index, inplace=True)
await stream_client.chart_equity_unsubs([stock_symbol])
await stream_client.level_one_equity_subs([stock_symbol])
else:
df.drop(index=df.index, inplace=True)
cache.set('df',df)
cache.set('df_5min',df_5min)
cache.set('max_strategy_run', max_strategy_run)
elif message['service']=='QUOTE':
df=cache.get('df')
max_strategy_run=cache.get('max_strategy_run')
df_5min=cache.get('df_5min')
print(message)w
if max_strategy_run==0:
message['content'][0]['LAST_PRICE']=12345678
if ((datetime.datetime.now() - df_5min.index[0]).total_seconds() / 60)<20:
try:
if message['content'][0]['LAST_PRICE']<df_5min.loc[df_5min.index[0],'low']:
# here we'll place order
max_strategy_run=0
trade_replication.apply_async(args=place_order_data)
print("ORDER PLACED !!!!!!!!!!!>>>>><<<<<<<<")
await stream_client.level_one_equity_unsubs([stock_symbol])
except KeyError:
print('Error: Could not retrieve LAST_PRICE from message')
cache.set('df',df)
cache.set('df_5min',df_5min)
cache.set('max_strategy_run', max_strategy_run)
else:
await stream_client.level_one_equity_unsubs([stock_symbol])
# Always add handlers before subscribing because many streams start sending
# data immediately after success, and messages with no handlers are dropped.
stream_client.add_chart_equity_handler(print_message)
stream_client.add_level_one_equity_handler(print_message)
await stream_client.chart_equity_subs([stock_symbol])
while True:
await stream_client.handle_message()
asyncio.run(read_stream())
I'm expecting a 5minute data frame as an output