2

I am implementing a python module that takes a tuple with three lists (x,y,val) and subsamples them according to a given ratio. Am I doing it the right way?

  1. Do I write to the disk asynchronously?
  2. Could I have many producers and consumers such that all of them generate and write data to the same output file?
  3. When I compare this code to a naive implementation of a single thread they perform similary with respect to their runtime.

import bisect
import numpy as np
import gzip
import asyncio

class SignalPDF:
    def __init__(self, inputSignal):
        self.x         = inputSignal[0][:]
        self.y         = inputSignal[1][:]
        self.vals      = inputSignal[2][:]
        self.valCumsum = np.cumsum(self.vals)
        self.totalSum  = np.sum(self.vals)
        self.N         = len(self.vals)

class SignalSampler:
    def __init__(self, inputSignal, ratio=1.0):
        self.signalPDF = SignalPDF(inputSignal)
        self.Q         = asyncio.Queue()
        self.ratio     = float(ratio)
        self.N         = int(self.signalPDF.N/self.ratio)
        self.sampledN  = 0

    async def randRead(self):
        while self.sampledN < self.N:
            i = np.random.randint(self.signalPDF.totalSum, size=1, dtype=np.uint64)[0]
            self.sampledN += 1 
            cell = bisect.bisect(self.signalPDF.valCumsum, i)
            yield (self.signalPDF.x[cell], self.signalPDF.y[cell], int(self.signalPDF.vals[cell]))

    async def readShortFormattedLine(self):
        async for read in self.randRead():
            x = read[0]; y = read[1]; val = read[2]; 
            yield '{0} {1} {2}'.format(x,y,val)

    async def populateQueue(self):
        async for i in self.readShortFormattedLine():
            await self.Q.put(i)
        await self.Q.put(None)

    async def hanldeGzip(self, filePath):
        with gzip.open(filePath, 'wt') as f:
            while True:
                item = await self.Q.get()
                if item is None:
                    break
                f.write('{0}\n'.format(item))
            f.flush()

    async def hanldeFile(self, filePath):
        with open(filePath, 'w+') as f:
            while True:
                item = await self.Q.get()
                if item is None:
                    break
                f.write('{0}\n'.format(item))
            f.flush()

def main(gzip, outputFile):
    x=[]; y=[];val=[]
    for i in range(100):
        for j in range(100):
            x.append(i)
            y.append(j)
            val.append(np.random.randint(0,250))

    loop      = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    mixer = SignalSampler(inputSignal=[x,y,val], ratio=2.0)
    futures = []
    if gzip:
        futures = [mixer.hanldeGzip(outputFile), mixer.populateQueue()]
    else:
        futures = [mixer.hanldeFile(outputFile), mixer.populateQueue()] 
    tasks   = asyncio.wait(futures, loop=loop)
    results = loop.run_until_complete(tasks)
    loop.close()

main(gzip=False, outputFile='/tmp/a.txt')
main(gzip=True, outputFile='/tmp/a.txt.gz')
0x90
  • 39,472
  • 36
  • 165
  • 245
  • IO, especially writing to disk, is nearly always the slowest part of any application, making your code asynchronous will not improve the write speed of whatever disk you are writing to – Iain Shelvington Jan 17 '20 at 22:51
  • @IainShelvington true, but I could write only once every 100s lines, so this code can be still optimized. Also note this is a toy problem for a case where the consumer consume data from another file and not a local array. – 0x90 Jan 17 '20 at 22:53
  • An asyncio event loop is single threaded which is why you get similar speeds to another single-threaded application – Iain Shelvington Jan 17 '20 at 22:59

1 Answers1

3

How asyncio works

Let's consider a task of making two web requests.

Synchronous version:

  1. Send request 1
  2. Wait for the answer for 1 sec.
  3. Send request 2
  4. Wait for the answer for 1 sec.
  5. Both requests finished in 2 sec.

Asynchronous version:

  1. Send request 1
  2. Instead of waiting, immediately send request 2
  3. Wait for the answers for over 1 sec.
  4. Both requests finished in 1 sec.

asyncio allows you to write program that actually works like in second asynchronous version, while your code looks very similar to (intuitive) first version.

Note important thing here: only reason asynchronous version is faster is that it starts another concurrent operation immediately instead of waiting first one is fully finished. It has nothing to do with threads, asyncio works in a single main thread.


What about disk I/O?

Can your hardware read/write two files parallely?

If you have one physical HDD then, probably, not: it has one physical "needle" that can read/write single piece of data at the time. Asynchronous approach won't help you then.

Situation may differ if you have multiple disks. Although I have ho idea if OS/asyncio can handle working with multiple disks parallely (probably not).

Let's presume that you expect your hardware and OS to support multiple disk I/O. It will probably only work when you use multiple threads or processes for operations:

  • Module aiofiles uses threads to work with files - you can give it a try
  • To work with processes with ProcessPoolExecutor & asyncio you can use run_in_executor as shown here

There's also some chance that using processes or even threads will increase disk I/O purely due to parallelizing related CPU-bound operations, but I have no idea if it's the case and how beneficial it can be (probably not much comparing to disk I/O).

Mikhail Gerasimov
  • 36,989
  • 16
  • 116
  • 159
  • I am not sure how to use aiofiles to write a .gzip file. – 0x90 Jan 19 '20 at 01:30
  • @0x90 you can read/write raw file content by chunks info a buffer using `aiofiles` and then pass it to some library that works with `gzip`. Google "python gzip streaming". Take a look [at this question](https://stackoverflow.com/q/37944801/1113207), for example. Or as alternative you can avoid `aiofiles` and try to use `gzip` module calling its functions in thread using `run_in_executor`. – Mikhail Gerasimov Jan 19 '20 at 06:01
  • in fact it’s a more general question. Why aiofiles is needed? Why can’t I have a async function that simply using the blocking python api to write to a file? – 0x90 Jan 19 '20 at 06:04
  • 1
    @0x90 If some async function will use blocking api and operation will take significant amount of time, it will block `asyncio`'s event loop and freeze all async operations in other parts of the code. Read [this answer](https://stackoverflow.com/a/33399896/1113207) and discussion under it for more detailed explanation. To avoid this situation `aiofiles` were createad: not to improve total speed, but to avoid freezing event loop when working with disk I/O (it's similar to how people [use](https://stackoverflow.com/a/22414756/1113207) threads with `requests` to make it async-compatible). – Mikhail Gerasimov Jan 19 '20 at 06:57