0

I am running a streaming task inside a celery worker which is giving me 1 minute candle stick data and then I'm converting that 1 minute incoming data to 5 minute data but this task always starts giving some wierd output after sometime but works fine when I run it on jupyter notebook separately.

df = pd.DataFrame(columns=['time', 'open', 'high','low','close'])
df.set_index('time', inplace=True)
cache.set('df', df)
df_5min=pd.DataFrame()
cache.set('df_5min',df_5min)
max_strategy_run=2
cache.set('max_strategy_run', max_strategy_run)
@shared_task(name="strategy_1",queue='strategy_1_queue')
def strategy_1():
    client = easy_client(
            api_key='',
            redirect_uri='',
            token_path='')
    stream_client = StreamClient(client)
    strategy=Strategy.objects.filter(name="strategy_1").first()
    stock_symbol=strategy.stock.symbol
    async def read_stream():
        await stream_client.login()
        await stream_client.quality_of_service(StreamClient.QOSLevel.FAST)
        async def print_message(message):
                #global df
                #global df_5min
                #global max_strategy_run
                #global place_order_data
            if message['service']=='CHART_EQUITY':
                df=cache.get('df')
                df_5min=cache.get('df_5min')
                max_strategy_run=cache.get('max_strategy_run')
                if  max_strategy_run==0:
                    await stream_client.chart_equity_unsubs([stock_symbol])
                    await stream_client.level_one_equity_unsubs([stock_symbol])
                if df.shape[0]==0 and datetime.datetime.fromtimestamp(message['content'][0]['CHART_TIME']/1000).minute%5!=0:
                    pass
                else:
                    df = df.append({
                    'time': datetime.datetime.fromtimestamp(message['content'][0]['CHART_TIME']/1000),
                    'open': message['content'][0]['OPEN_PRICE'],
                    'high': message['content'][0]['HIGH_PRICE'],
                    'low':  message['content'][0]['LOW_PRICE'],
                    'close': message['content'][0]['CLOSE_PRICE'],
                    },ignore_index=True)

                print(json.dumps(message, indent=4))
                if df.shape[0]==5:
                    df.set_index('time', inplace=True)
                    df = df.resample('5T').agg({'open': 'first', 
                                        'high': 'max', 
                                        'low': 'min', 
                                        'close': 'last'})
                    df_5min=copy.deepcopy(df)
                    print('New DataFrame (5 minute):\n', df_5min)
                    if df_5min.loc[df_5min.index[0],'open']>df_5min.loc[df_5min.index[0],'close']:
                        max_strategy_run=max_strategy_run-1
                        option=await getOptionSymbol(client,message['content'][0]['key'])
                        print(option)
                        place_order_data = await placeOrderDataStrategy1(option,df_5min)
                        print(place_order_data)
                        df.drop(index=df.index, inplace=True)
                        await stream_client.chart_equity_unsubs([stock_symbol])
                        await stream_client.level_one_equity_subs([stock_symbol])
                    else:
                        df.drop(index=df.index, inplace=True)
                cache.set('df',df)
                cache.set('df_5min',df_5min)
                cache.set('max_strategy_run', max_strategy_run)
            elif message['service']=='QUOTE':
                df=cache.get('df')
                max_strategy_run=cache.get('max_strategy_run')
                df_5min=cache.get('df_5min')
                print(message)w
                if max_strategy_run==0:
                    message['content'][0]['LAST_PRICE']=12345678
                if ((datetime.datetime.now() - df_5min.index[0]).total_seconds() / 60)<20:
                    try:
                        if message['content'][0]['LAST_PRICE']<df_5min.loc[df_5min.index[0],'low']:
                            # here we'll place order
                            max_strategy_run=0
                            trade_replication.apply_async(args=place_order_data)
                            print("ORDER   PLACED  !!!!!!!!!!!>>>>><<<<<<<<")
                            await stream_client.level_one_equity_unsubs([stock_symbol])
                    except KeyError:
                        print('Error: Could not retrieve LAST_PRICE from message')
                    cache.set('df',df)
                    cache.set('df_5min',df_5min)
                    cache.set('max_strategy_run', max_strategy_run)
                else:
                    await stream_client.level_one_equity_unsubs([stock_symbol])
                    
            # Always add handlers before subscribing because many streams start sending
            # data immediately after success, and messages with no handlers are dropped.
        stream_client.add_chart_equity_handler(print_message)
        stream_client.add_level_one_equity_handler(print_message)
        await stream_client.chart_equity_subs([stock_symbol])
        while True:
            await stream_client.handle_message()
    asyncio.run(read_stream())


I'm expecting a 5minute data frame as an output

0 Answers0