3

I am a PhD student studying Market microstructure. I need to deal with very large datasets (millisecond data that are hundreds of GB). I have been using SAS which is quite nice for processing big data in data frame format. However, it is costly. I would like to use Python for my study/research. I have some but not advance skills in Python. I have heard of Pandas which is quite efficient in processing data frames but it is limited to RAM, which is not quite good for my purpose.

What I have tried: I have tried to iterate data line by line, process them and store into dictionaries but this has memory constraint. I got Memory Error and I can see Python chewed up all the RAM (I have 32gb). This dataset is still very small (500 mb) compared to what I will be dealing with later (50~100 gb). Also, there are things that are difficult to do line by line such as regressions, graphs etc. So my question is how should I process and store such data?

The input data looks like this:

#RIC    Date[L]     Time[L] Type    Price   Volume  Bid Price Ask Price
TPI.AX  20140820    00:11.7 Quote                             0.91
TPI.AX  20140820    00:11.7 Trade   0.91    10000       
TPI.AX  20140820    00:21.5 Quote                             0.91
TPI.AX  20140820    00:22.1 Quote                   0.905   
TPI.AX  20140820    00:42.2 Quote                   0.905   
TPI.AX  20140820    00:42.6 Trade   0.9075  117     
TPI.AX  20140820    00:43.1 Trade   0.9075  495     
TPI.AX  20140820    00:49.6 Quote                   0.905   
TPI.AX  20140820    00:57.6 Quote                   0.905   
TPI.AX  20140820    00:57.6 Quote                   0.905   
TPI.AX  20140820    00:58.3 Quote                   0.905   
TPI.AX  20140820    01:02.6 Quote                             0.91
TPI.AX  20140820    01:02.6 Quote                             0.91
TPI.AX  20140820    01:02.6 Quote                   0.905   
TPI.AX  20140820    01:02.6 Trade   0.91    9365        
TPI.AX  20140820    01:02.6 Trade   0.91    9041        

These are my code:

def spread_calculation(input_file_list, output_file):
    """This function calculates the spreads for securities in input_file_list
    input: trade and quote data from TRTH
    2 parameters: 1. list of file names, 2.output file name
    output: csv file contains spreads"""
    # Set variables:
    date = None
    exchange_bbo = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(float)))))
    effective_spread = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(float)))))
    time_bucket = [i * 100000.0 for i in range(0, (16 * 60 * 60 * 1000) * 1000 / 100000)]
    for file in input_file_list:
        file_to_open = '%s.csv' % file
        reader = csv.DictReader(open(file_to_open, 'rb'))
        for i in reader:
            if not bool(date):
                date = i['Date[L]'][0:4] + "-" + i['Date[L]'][4:6] + "-" + i['Date[L]'][6:8]
            if i['Type'] == 'Quote' and (time_to_milli(i['Time[L]']) <= (16*60*60*1000)*1000):
                security = i['#RIC'].split('.')[0]
                exchange = i['#RIC'].split('.')[1]
                timestamp = float(time_to_milli(i['Time[L]']))
                bucket = ceil(float(time_to_milli(i['Time[L]'])) / 100000.0) * 100000.0
                if i['Bid Price'] == "":
                    bid = 0.0
                else:
                    bid = float(i['Bid Price'])
                if i['Ask Price'] == "":
                    ask = 0.0
                else:
                    ask = float(i['Ask Price'])
                if bid < ask < 199999.99:
                    if not bool(exchange_bbo[security][exchange][date][bucket]['ask']):
                        exchange_bbo[security][exchange][date][bucket]['ask'] = ask
                        exchange_bbo[security][exchange][date][bucket]['diff_ask'] = bucket - timestamp
                    elif exchange_bbo[security][exchange][date][bucket]['diff_ask'] > bucket - timestamp:
                        exchange_bbo[security][exchange][date][bucket]['ask'] = ask
                        exchange_bbo[security][exchange][date][bucket]['diff_ask'] = bucket - timestamp
                    if not bool(exchange_bbo[security][exchange][date][bucket]['bid']):
                        exchange_bbo[security][exchange][date][bucket]['bid'] = bid
                        exchange_bbo[security][exchange][date][bucket]['diff_bid'] = bucket - timestamp
                    elif exchange_bbo[security][exchange][date][bucket]['diff_bid'] > bucket - timestamp:
                        exchange_bbo[security][exchange][date][bucket]['bid'] = bid
                        exchange_bbo[security][exchange][date][bucket]['diff_bid'] = bucket - timestamp
            if i['Type'] == 'Trade' and i['Price'] != "" and i['Price'] != 0.0:
                timestamp = float(time_to_milli(i['Time[L]']))
                bucket = ceil(float(time_to_milli(i['Time[L]'])) / 100000.0) * 100000.0
                security = i['#RIC'].split('.')[0]
                exchange = i['#RIC'].split('.')[1]
                price = float(i['Price'])
                volume= float(i['Volume'])
                if not bool(exchange_bbo[security][exchange][date][bucket]['price']):
                    exchange_bbo[security][exchange][date][bucket]['price'] = price
                    exchange_bbo[security][exchange][date][bucket]['volume'] = volume
                    exchange_bbo[security][exchange][date][bucket]['time_diff'] = bucket - timestamp
                elif exchange_bbo[security][exchange][date][bucket]['time_diff'] > bucket - timestamp and price != 0.0:
                    exchange_bbo[security][exchange][date][bucket]['price'] = price
                    exchange_bbo[security][exchange][date][bucket]['volume'] = volume
                    exchange_bbo[security][exchange][date][bucket]['time_diff'] = bucket - timestamp

        # Fill the empty buckets - exchange level
        for security in exchange_bbo:
            for exchange in exchange_bbo[security]:
                for date in exchange_bbo[security][exchange]:
                    for bucket in time_bucket:
                        previous = bucket - 100000.0
                        # best offer
                        bo_t = exchange_bbo[security][exchange][date][bucket]['ask']
                        bo_t1 = exchange_bbo[security][exchange][date][previous]['ask']
                        if bo_t == 0.0 and bo_t1 != 0.0:
                            exchange_bbo[security][exchange][date][bucket]['ask'] = bo_t1
                        # best bid
                        bb_t = exchange_bbo[security][exchange][date][bucket]['bid']
                        bb_t1 = exchange_bbo[security][exchange][date][previous]['bid']
                        if bb_t == 0.0 and bb_t1 != 0.0:
                            exchange_bbo[security][exchange][date][bucket]['bid'] = bb_t1

        for security in exchange_bbo:
            for exchange in exchange_bbo[security]:
                for date in exchange_bbo[security][exchange]:
                    for bucket in exchange_bbo[security][exchange][date]:
                        if not bool(exchange_bbo[security][exchange][date][bucket]['price']):
                            nbo = exchange_bbo[security][exchange][date][bucket]['ask']
                            nbb = exchange_bbo[security][exchange][date][bucket]['bid']
                            midpoint = (nbo + nbb) / 2.0
                            price = exchange_bbo[security][exchange][date][bucket]['price']
                            volume= exchange_bbo[security][exchange][date][bucket]['volume']
                            # print security, exchange, bucket, price, midpoint
                            if price > 0.0 and midpoint != 0.0:
                                effective_spread[security][exchange][date][bucket]['espread_bps'] = 2.0 * abs(price - midpoint)/midpoint
                                effective_spread[security][exchange][date][bucket]['volume']=volume
                                effective_spread[security][exchange][date]['count'] += 1.0

        data_writer = csv.DictWriter(open(output_file, 'wb'),
                                     fieldnames=['security', 'exchange', 'date', 'bucket' 'espread_bps', 'volume', 'count'])

        data_writer.writeheader()

        for security in effective_spread:
            for exchange in effective_spread[security]:
                for date in effective_spread[security][exchange]:
                    for bucket in effective_spread[security][exchange][date]:
                        espread_bps = effective_spread[security][exchange][date][bucket]['espread_bps']
                        volume = effective_spread[security][exchange][date][bucket]['volume']
                        count = effective_spread[security][exchange][date][bucket]['count']
                        data_writer.writerow({'security': security, 'exchange': exchange, 'date': date, 'bucket': bucket,
                                              'espread_bps': espread_bps, 'volume': volume, 'count': count})

input_files = ['ScandinavianTAQ']

Thank you so much

Community
  • 1
  • 1
duckman
  • 687
  • 1
  • 15
  • 30

2 Answers2

0

100 GB isn't that much data. A SQL database and Pandas should be all you need. You need to learn how to write SQL queries, and I'd recommend grabbing a copy of Wes McKinney's book. I haven't looked at your code, but it appears to me that the biggest problem is that you're doing everything line by line, rather than grouping your operations.
Also, check out Dask

Batman
  • 8,571
  • 7
  • 41
  • 80
0

I'd check out elastic if you're going to be dealing with a lot of dicts for external storage. Works well with big data and has an average learning curve.

For larger than memory files you could look at memmap and lazy reading for when line by line is acceptable. Generally iterating is the accepted method.

Grouping operations also helps in your context, for example think about if there are independent operations that can be executed in parallel. For that check out some example SO posts like this. It would benefit you to talk to domain experts in your field about optimizing computations.

Also do you have access to an external server? If you do, and its a distributed system your options are even more.

Community
  • 1
  • 1
themistoklik
  • 880
  • 1
  • 8
  • 19