3

I have a sampling application that acquires 250,000 samples per second, buffers them in memory and eventually appends to an HDFStore provided by pandas. In general, this is great. However, I have a thread that runs and continually empties the data acquisition device (DAQ) and it needs to run on a somewhat regular basis. A deviation of about a second tends to break things. Below is an extreme case of the timings observed. Start indicates a DAQ read starting, Finish is when it finishes, and IO indicates an HDF write (Both DAQ and IO occur in separate threads).

Start        : 2016-04-07 12:28:22.241303
IO (1)       : 2016-04-07 12:28:22.241303
Finish       : 2016-04-07 12:28:46.573440 (0.16 Hz, 24331.26 ms)
IO Done (1)  : 2016-04-07 12:28:46.573440 (24332.39 ms)

As you can see, it takes 24 seconds to perform this write (a typical write is about 40 ms). The HDD that I'm writing to is not under load, so this delay shouldn't be caused by contention (it's got about ~7% utilisation while running). I have disabled indexing on my HDFStore writes. My application runs numerous other threads, all of which print status strings, and therefore it seems like the IO task is blocking all other threads. I've spent quite a bit of time stepping through code to figure out where things are slowing down, and it's always within a method provided by a C extension, and this leads to my question..

  1. Can Python (I'm using 3.5) preempt execution in a C extension? Concurrency: Are Python extensions written in C/C++ affected by the Global Interpreter Lock? Seems to indicate that it doesn't unless the extension specifically yields.
  2. Does Pandas' HDF5 C code implement any yielding for I/O? If so, does this mean that the delay is due to a CPU bounded task? I have disabled indexing.
  3. Any suggestions for how I can get somewhat consistent timings? I'm thinking of moving the HDF5 code into another process. This only helps to a certain extent, though, as I can't really tolerate ~20 second writes anyway, especially when they're unpredictable.

Here's an example you can run to see the issue:

import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import threading
import time

def write_samples(store, samples, overwrite):
    frame = pd.DataFrame(samples, dtype='float64')

    if not overwrite:
        store.append("df", frame, format='table', index=False)
    else:
        store.put("df", frame, format='table', index=False)

def begin_io():
    store = pd.HDFStore("D:\\slow\\test" + str(random.randint(0,100)) + ".h5", mode='w', complevel=0)

    counter = 0
    while True:
        data = np.random.rand(50000, 1)
        start_time = timer()
        write_samples(store, data, counter == 0)
        end_time = timer()

        print("IO Done      : %s (%.2f ms, %d)" % (datetime.datetime.now(), (end_time - start_time) * 1000, counter))

        counter += 1

    store.close()

def dummy_thread():
    previous = timer()
    while True:
        now = timer()
        print("Dummy Thread  : %s (%d ms)" % (datetime.datetime.now(), (now - previous) * 1000))
        previous = now
        time.sleep(0.01)


if __name__ == '__main__':
    threading.Thread(target=dummy_thread).start()
    begin_io()

You will get output similar to:

IO Done      : 2016-04-08 10:51:14.100479 (3.63 ms, 470)
Dummy Thread  : 2016-04-08 10:51:14.101484 (12 ms)
IO Done      : 2016-04-08 10:51:14.104475 (3.01 ms, 471)
Dummy Thread  : 2016-04-08 10:51:14.576640 (475 ms)
IO Done      : 2016-04-08 10:51:14.576640 (472.00 ms, 472)
Dummy Thread  : 2016-04-08 10:51:14.897756 (321 ms)
IO Done      : 2016-04-08 10:51:14.898782 (320.79 ms, 473)
IO Done      : 2016-04-08 10:51:14.901772 (3.29 ms, 474)
IO Done      : 2016-04-08 10:51:14.905773 (2.84 ms, 475)
IO Done      : 2016-04-08 10:51:14.908775 (2.96 ms, 476)
Dummy Thread  : 2016-04-08 10:51:14.909777 (11 ms)
Community
  • 1
  • 1
  • Any code? What about narrowing the problem to short simple script, which would write expected amount of some dummy data. You would see, if it still suffers from the same problem, or works well. – Jan Vlcinsky Apr 07 '16 at 22:31
  • @JanVlcinsky I've replicated it in a script that just constantly appends to an HDFStore. I'll simplify it and then post it here. – user3870920 Apr 07 '16 at 22:42
  • @JanVlcinsky Added a code example – user3870920 Apr 07 '16 at 22:54

2 Answers2

2

The answer is no, these writers do not release the GIL. See the documentation here. I know you are not actually trying to write with multiple threads, but this should clue you in. There are strong locks that are held when writes happen to really to prevent multiple writing. Both PyTables and h5py do this as its part of the HDF5 standards.

You can look at SWMR, though not directly supported by pandas. PyTables docs here and here point to solutions. These generally involved having a separate process pulling data off of queues and writing it.

This is in generally a much more scalable pattern in any event.

Jeff
  • 125,376
  • 21
  • 220
  • 187
  • Yeah, pulling it into a separate process is the only way I can think of to avoid blocking other threads. However, I'm still not really sure _why_ there is a large delay every now and then. I guess it's growing the file in some way, but I'm not sure what is is specifically doing. I've tried setting a large number for `expectedrows` when creating, but that doesn't help. Any ideas on where to look? – user3870920 Apr 08 '16 at 00:10
  • ``PyTables`` chunk writes (computed relative to ``expectedrows``), but I think the actual write size (the flush) is implementation dependent (e.g. you don't know). If you are time sensitive, then bets to shuffle off to another processes, or maybe use something like ``msgpack`` which just dumps straight to disk (and is appendable). Generally a real-time capture is just that. You post-process later. – Jeff Apr 08 '16 at 00:13
  • I thought HDF5 would be a good bet because it's a live data capture, but we let the user scroll back through its history, so we need to be able to read chunks, too (sessions can be hours long, so 250k/sec adds up pretty quickly) . I guess if I can find a way to make it flush more often and move it to a different process, things should be okay.. I think it's a little disappointing that the GIL isn't released for IO, but there's probably some logic behind the decision. – user3870920 Apr 08 '16 at 00:38
  • you are trying to engineer an already heavily engineered soln. you are looking at SWMR, which is pretty new, but works nicely. – Jeff Apr 08 '16 at 00:42
1

Thanks for providing working code. I have modified that to get some insight and later created modified version using multiprocessing.

Modified threading version

All the modifications are just to get more information out, no conceptual changes. All goes into one file mthread.py and is commented part by part.

Imports as usually:

import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import threading
import logging

write_samples got some logging:

def write_samples(store, samples, overwrite):
    wslog = logging.getLogger("write_samples")
    wslog.info("starting")
    frame = pd.DataFrame(samples, dtype='float64')

    if overwrite:
        store.put("df", frame, format='table', index=False)
    else:
        store.append("df", frame, format='table', index=False)
    wslog.info("finished")

begin_io got maximal duaration, exceeding that time results in WARNING log entry:

def begin_io(maxduration=500):
    iolog = logging.getLogger("begin_io")
    iolog.info("starting")
    try:
        fname = "data/tab" + str(random.randint(0, 100)) + ".h5"
        iolog.debug("opening store %s", fname)
        with pd.HDFStore(fname, mode='w', complevel=0) as store:
            iolog.debug("store %s open", fname)

            counter = 0
            while True:
                data = np.random.rand(50000, 1)
                start_time = timer()
                write_samples(store, data, counter == 0)
                end_time = timer()
                duration = (end_time - start_time) * 1000
                iolog.debug("IO Done      : %s (%.2f ms, %d)",
                            datetime.datetime.now(),
                            duration,
                            counter)
                if duration > maxduration:
                    iolog.warning("Long duration %s", duration)
                counter += 1
    except Exception:
        iolog.exception("oops")
    finally:
        iolog.info("finished")

dummy_thread got modified to stop properly and also emits WARNING, if takes too long:

def dummy_thread(pill2kill, maxduration=500):
    dtlog = logging.getLogger("dummy_thread")
    dtlog.info("starting")
    try:
        previous = timer()
        while not pill2kill.wait(0.01):
            now = timer()
            duration = (now - previous) * 1000
            dtlog.info("Dummy Thread  : %s (%d ms)",
                       datetime.datetime.now(),
                       duration)
            if duration > maxduration:
                dtlog.warning("Long duration %s", duration)
            previous = now
        dtlog.debug("stopped looping.")
    except Exception:
        dtlog.exception("oops")
    finally:
        dtlog.info("finished")

and finally we call it all. Feel free to modify log levels, WARNING shows just excessive times, INFO and DEBUG tell much much more.

if __name__ == '__main__':
    logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s'
    logging.basicConfig(format=logformat,
                        level=logging.WARNING)

    pill2kill = threading.Event()
    t = threading.Thread(target=dummy_thread, args=(pill2kill, 500))
    t.start()
    try:
        begin_io(500)
    finally:
        pill2kill.set()
        t.join()

Running the code I get results as you described:

2016-04-08 15:29:11,428 [WARNING] - begin_io: Long duration 5169.03591156
2016-04-08 15:29:11,429 [WARNING] - dummy_thread: Long duration 5161.45706177
2016-04-08 15:29:27,305 [WARNING] - begin_io: Long duration 1447.40581512
2016-04-08 15:29:27,306 [WARNING] - dummy_thread: Long duration 1450.75201988
2016-04-08 15:29:32,893 [WARNING] - begin_io: Long duration 1610.98194122
2016-04-08 15:29:32,894 [WARNING] - dummy_thread: Long duration 1612.98394203
2016-04-08 15:29:34,930 [WARNING] - begin_io: Long duration 823.182821274
2016-04-08 15:29:34,930 [WARNING] - dummy_thread: Long duration 815.275907516
2016-04-08 15:29:43,640 [WARNING] - begin_io: Long duration 510.369062424
2016-04-08 15:29:43,640 [WARNING] - dummy_thread: Long duration 511.776924133

From the values it is clear, that while begin_io is very busy and delayd (probably during data being written to the disk), the dummy_thread is also delayed for almost the same amount of time.

Version with multiprocessing - works well

I have modified the code to run in multiple processes and since then, it really does not block the dummy_thread.

2016-04-08 15:38:12,487 [WARNING] - begin_io: Long duration 755.397796631
2016-04-08 15:38:14,127 [WARNING] - begin_io: Long duration 1434.60512161
2016-04-08 15:38:15,725 [WARNING] - begin_io: Long duration 848.396062851
2016-04-08 15:38:24,290 [WARNING] - begin_io: Long duration 1129.17089462
2016-04-08 15:38:25,609 [WARNING] - begin_io: Long duration 1059.08918381
2016-04-08 15:38:31,165 [WARNING] - begin_io: Long duration 646.969079971
2016-04-08 15:38:37,273 [WARNING] - begin_io: Long duration 1699.17201996
2016-04-08 15:38:43,788 [WARNING] - begin_io: Long duration 1555.341959
2016-04-08 15:38:47,765 [WARNING] - begin_io: Long duration 639.196872711
2016-04-08 15:38:54,269 [WARNING] - begin_io: Long duration 1690.57011604
2016-04-08 15:39:06,397 [WARNING] - begin_io: Long duration 1998.33416939
2016-04-08 15:39:16,980 [WARNING] - begin_io: Long duration 2558.51006508
2016-04-08 15:39:21,688 [WARNING] - begin_io: Long duration 1132.73501396
2016-04-08 15:39:26,450 [WARNING] - begin_io: Long duration 876.784801483
2016-04-08 15:39:29,809 [WARNING] - begin_io: Long duration 709.135055542
2016-04-08 15:39:31,748 [WARNING] - begin_io: Long duration 677.506923676
2016-04-08 15:39:41,854 [WARNING] - begin_io: Long duration 770.184993744

The code with multiprocessing is here:

import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import multiprocessing
import time
import logging


def write_samples(store, samples, overwrite):
    wslog = logging.getLogger("write_samples")
    wslog.info("starting")
    frame = pd.DataFrame(samples, dtype='float64')

    if overwrite:
        store.put("df", frame, format='table', index=False)
    else:
        store.append("df", frame, format='table', index=False)
    wslog.info("finished")


def begin_io(pill2kill, maxduration=500):
    iolog = logging.getLogger("begin_io")
    iolog.info("starting")
    try:
        fname = "data/tab" + str(random.randint(0, 100)) + ".h5"
        iolog.debug("opening store %s", fname)
        with pd.HDFStore(fname, mode='w', complevel=0) as store:
            iolog.debug("store %s open", fname)

            counter = 0
            while not pill2kill.wait(0):
                data = np.random.rand(50000, 1)
                start_time = timer()
                write_samples(store, data, counter == 0)
                end_time = timer()
                duration = (end_time - start_time) * 1000
                iolog.debug( "IO Done      : %s (%.2f ms, %d)",
                            datetime.datetime.now(),
                            duration,
                            counter)
                if duration > maxduration:
                    iolog.warning("Long duration %s", duration)
                counter += 1
    except Exception:
        iolog.exception("oops")
    finally:
        iolog.info("finished")


def dummy_thread(pill2kill, maxduration=500):
    dtlog = logging.getLogger("dummy_thread")
    dtlog.info("starting")
    try:
        previous = timer()
        while not pill2kill.wait(0.01):
            now = timer()
            duration = (now - previous) * 1000
            dtlog.info( "Dummy Thread  : %s (%d ms)",
                       datetime.datetime.now(),
                       duration)
            if duration > maxduration:
                dtlog.warning("Long duration %s", duration)
            previous = now
        dtlog.debug("stopped looping.")
    except Exception:
        dtlog.exception("oops")
    finally:
        dtlog.info("finished")


if __name__ == '__main__':
    logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s'
    logging.basicConfig(format=logformat,
                        level=logging.WARNING)
    pill2kill = multiprocessing.Event()
    dp = multiprocessing.Process(target=dummy_thread, args=(pill2kill, 500,))
    dp.start()
    try:
        p = multiprocessing.Process(target=begin_io, args=(pill2kill, 500,))
        p.start()
        time.sleep(100)
    finally:
        pill2kill.set()
        dp.join()
        p.join()

Conclusions

Writing data to HDF5 file really blocks other threads and multiprocessing version is required.

If you expect the dummy_thread do some real work (like collecting data to store), and you want to send data from here to the HDF5 serializer, you will have to some sort of messaging - either using multiprocessing.Queue, Pipe or possibly use ZeroMQ (e.g. PUSH - PULL socket pair). With ZeroMQ you could do the saving of data even on another computer.

EDIT/WARNING: Provided code can fail saving the data sometime, I made it to measure performance and did not make it waterproof. When Ctrl-C during processing, sometime I get corrupted file. This problem I consider out of scope of this question (and the problem shall be resolved by careful stopping of the running process).

Jan Vlcinsky
  • 42,725
  • 12
  • 101
  • 98
  • Thanks for adding more useful logging, you did need to hunt through output with my code :) Think we've come to the same conclusion -- need to use multiprocessing. – user3870920 Apr 14 '16 at 21:13
  • @user3870920 Your question helped me to better understand GIL. Thanks for that. Note, that my logging prints only times longer than 500 ms, so the short ones (typically around 10 ms) are not seen. – Jan Vlcinsky Apr 16 '16 at 19:47