2

I try to start Binance websocket to collect candles data. It works well if there is no delay in the data processing function. But when some pauses occurs in the function processing one ticker data, it also delays the response for other ticker. Do anybody know how to run them independantly?

from binance.client import Client
from binance.websockets import BinanceSocketManager

api_key = ''
api_secret = ''
client = Client(api_key, api_secret)
bm = BinanceSocketManager(client, user_timeout=60)

def process(msg):
    print(msg['s'])
    if msg['s'] == 'ETHUSDT':
        time.sleep(5)

def socket_1():
     conn_key = bm.start_kline_socket('ETHUSDT', process, '1h')

def socket_2():
     conn_key = bm.start_kline_socket('BNBUSDT', process, '1h')

socket_1()
socket_2()

bm.start()

I tried to make the socket run two separate tasks with asyncio as @Mike Malyi suggested, but it did not eliminate the delay:

import asyncio

def process(msg):
    asyncio.run(main(msg))

async def main(msg):
    if msg['s'] == 'ETHUSDT':
        task1 = asyncio.create_task(process_message(msg))
        await task1
    else:
        task2 = asyncio.create_task(process_message(msg))
        await task2

async def process_message(msg):
    print(msg['s'])
    if msg['s'] == 'ETHUSDT':
        await asyncio.sleep(5)

eth_key = bm.start_kline_socket('ETHUSDT', process, '1h')
bnb_key = bm.start_kline_socket('BNBUSDT', process, '1h')

bm.start()

I also tried to make the function run independanly using Queue in threads, but it did not help, one function still delays the other:

from queue import Queue

def consumer(in_q):
    while True:
        msg = in_q.get()
        process_message(msg)
    
def producer(out_q):
    eth = bm.start_kline_socket('ETHUSDT', out_q.put, '1h')
    bnb = bm.start_kline_socket('BNBUSDT', out_q.put, '1h')

def process_message(msg):
    if msg['s'] == 'ETHUSDT':
        time.sleep(5)
        print(f"{msg['s']} with delay, {time.strftime('%X')}")
    else:
        print(f"{msg['s']} {time.strftime('%X')}")


q = Queue()
t1 = Thread(target = consumer, args =(q, )) 
t2 = Thread(target = producer, args =(q, )) 
t1.start() 
t2.start() 

bm.start() 

2 Answers2

2
from binance.client import Client
from binance.websockets import BinanceSocketManager
import _thread as thread
import time
import queue

api_key = ''
api_secret = ''
client = Client(api_key, api_secret)

def process_message(msg):
    if msg['s'] == 'ETHUSDT':
      print(f"{msg['s']} with delay, {time.strftime('%X')}")
      time.sleep(5)
      print('delay end')  
    else:
        print(f"{msg['s']} {time.strftime('%X')}")
  

def build_thread (symbol):
  print('start thread', symbol)
  q = queue.Queue()
  bm = BinanceSocketManager(client, user_timeout=60)
  conn_key = bm.start_kline_socket(symbol, q.put, '1h')
  bm.start()
  while(True):
    msg = q.get()
    process_message(msg)

thread.start_new_thread(build_thread, ('ETHUSDT', ))  
thread.start_new_thread(build_thread, ('BNBUSDT', ))  
Mike Malyi
  • 983
  • 8
  • 18
  • Thank you, @Mike Malyi! I tried this, but placing code in separate tasks did not eliminate the delay. I showed the code above. – Ruslan Asadullin Feb 16 '21 at 05:39
  • 1
    @RuslanAsadullin that's right. It cause you made `await task1`. Just remove this strings. Your script still waits on this string until `process_message` will be resolved. – Mike Malyi Feb 16 '21 at 06:20
  • Thank you, @MikeMalyi but, if I remove `await` expressions, websocket starts another object of `process` function even if the previous one was not finished yet. They work independently, but never finish completely – Ruslan Asadullin Feb 17 '21 at 06:23
  • 1
    Yes, it could start another instance of function even if the previous one was not. Is it not your question? So what do you want to achieve? What do you mean under "Do anybody know how to run them independantly?" – Mike Malyi Feb 17 '21 at 07:25
  • The `start_kline_socket` gives data every 2 sec. If `process_message` function works longer than 2 sec, I don't want its another object to run untill the previous one finishes the job. But the `process_message` function strated by another `start_kline_socket` must have no delays caused by other `start_kline_socket` object. So that one crypto pair algorithm has no effect on the other. This is the point. Thank you, @MikeMalyi, I guess I have to study asyncio lib more thoroughly. – Ruslan Asadullin Feb 17 '21 at 08:16
  • 1
    `eth_key = bm.start_kline_socket('ETHUSDT', lambda msg: asyncio.run(main(msg)), '1h')` try something like this. Lambda function should create independent instance for each run of main – Mike Malyi Feb 17 '21 at 08:19
  • I did as you suggested, the delay still remains common for all functions. Thank you for your time, @MikeMalyi – Ruslan Asadullin Feb 17 '21 at 09:05
  • 1
    I don't want to give up :-). Switch to threads https://stackoverflow.com/questions/2957116/make-2-functions-run-at-the-same-time – Mike Malyi Feb 17 '21 at 09:12
  • I did it in `threads` with `Queue` as shown above, @MikeMalyi, but the problem still persists – Ruslan Asadullin Feb 17 '21 at 11:22
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/228836/discussion-between-mike-malyi-and-ruslan-asadullin). – Mike Malyi Feb 17 '21 at 11:42
  • just start the callback of the websocket in a new thread https://github.com/saurabhrb/python-binance/commit/3e30e3b5a577873fa975fc9d8b8578abf788f5a0 – Saurabh Badenkal May 29 '22 at 07:39
0

This is setup up to get the pair and stop level from SQL (inline query for you so the code works though) and to then stop the socket when the stop level is below the close. Each pair runs in its own process so will scale to the number of CPU threads available.

import config
from binance import ThreadedWebsocketManager
from datetime import datetime
import pyodbc
from multiprocessing import Pool, cpu_count

KEY = config.binance_key
SECRET = config.binance_secret
BASE_URL = config.binance_base_url

''' ======  begin of functions ====== '''
def exec_sql (query) :
    cnxn_p = pyodbc.connect(config.sql_connection)
    cursor_p = cnxn_p.cursor()
    cursor_p.execute(query)
    cnxn_p.commit()
    cursor_p.close()
    cnxn_p.close()
    
def process_message(pair,stop):
    print(pair)
    print(stop)

    twm = ThreadedWebsocketManager(api_key=KEY, api_secret=SECRET)
    # start is required to initialise its internal loop
    twm.start()

    def handle_socket_message(msg):
        transactiontime = msg['k']['T'] / 1000
        transactiontime = datetime.fromtimestamp(transactiontime).strftime('%d %b %Y %H:%M:%S')

        if msg['e'] != 'error':
            # print("{} - {} - Interval {} - Open: {} - Close: {} - High: {} - Low: {} - Volume: {}".
            #      format(transactiontime,msg['s'],msg['k']['i'],msg['k']['o'],msg['k']['c'],msg['k']['h'],msg['k']['l'],msg['k']['v']))
            print("{} - {} - Interval {} - Close: {} - Stop: {}".
                 format(transactiontime,msg['s'],msg['k']['i'],msg['k']['c'], stop ))
        else:
            print(msg)

        Close = float(msg['k']['c'])
        if Close < stop:
            print(pair + ' close is below Stop')
            twm.stop()

    twm.start_kline_socket(callback=handle_socket_message, symbol=pair)
    twm.join()  

def main():
       
    print(f'starting computations on {cpu_count()} cores')

    # connect SQL server
    cnxn = pyodbc.connect(config.sql_connection)
    cursor = cnxn.cursor()
    sql = """select 'BNBBTC' as pair, 0.01086300 as stop
            union
            select 'BTCUSDT', 56234"""
    cursor.execute(sql)
    
    # iterate pairs
    rows = cursor.fetchall()
    pairs = []
    stops = []
    for row in rows:       
        pairs.append(row.pair)
        stops.append(row.stop)

    with Pool() as pool:
        pool.starmap(process_message, zip(pairs,stops))
    pool.close()

    print('pool done')    

    
if __name__ == '__main__':
    main()
DingoCC
  • 159
  • 1
  • 2
  • 12