I am implementing a python module that takes a tuple with three lists (x
,y
,val
) and subsamples them according to a given ratio. Am I doing it the right way?
- Do I write to the disk asynchronously?
- Could I have many producers and consumers such that all of them generate and write data to the same output file?
- When I compare this code to a naive implementation of a single thread they perform similary with respect to their runtime.
import bisect
import numpy as np
import gzip
import asyncio
class SignalPDF:
def __init__(self, inputSignal):
self.x = inputSignal[0][:]
self.y = inputSignal[1][:]
self.vals = inputSignal[2][:]
self.valCumsum = np.cumsum(self.vals)
self.totalSum = np.sum(self.vals)
self.N = len(self.vals)
class SignalSampler:
def __init__(self, inputSignal, ratio=1.0):
self.signalPDF = SignalPDF(inputSignal)
self.Q = asyncio.Queue()
self.ratio = float(ratio)
self.N = int(self.signalPDF.N/self.ratio)
self.sampledN = 0
async def randRead(self):
while self.sampledN < self.N:
i = np.random.randint(self.signalPDF.totalSum, size=1, dtype=np.uint64)[0]
self.sampledN += 1
cell = bisect.bisect(self.signalPDF.valCumsum, i)
yield (self.signalPDF.x[cell], self.signalPDF.y[cell], int(self.signalPDF.vals[cell]))
async def readShortFormattedLine(self):
async for read in self.randRead():
x = read[0]; y = read[1]; val = read[2];
yield '{0} {1} {2}'.format(x,y,val)
async def populateQueue(self):
async for i in self.readShortFormattedLine():
await self.Q.put(i)
await self.Q.put(None)
async def hanldeGzip(self, filePath):
with gzip.open(filePath, 'wt') as f:
while True:
item = await self.Q.get()
if item is None:
break
f.write('{0}\n'.format(item))
f.flush()
async def hanldeFile(self, filePath):
with open(filePath, 'w+') as f:
while True:
item = await self.Q.get()
if item is None:
break
f.write('{0}\n'.format(item))
f.flush()
def main(gzip, outputFile):
x=[]; y=[];val=[]
for i in range(100):
for j in range(100):
x.append(i)
y.append(j)
val.append(np.random.randint(0,250))
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
mixer = SignalSampler(inputSignal=[x,y,val], ratio=2.0)
futures = []
if gzip:
futures = [mixer.hanldeGzip(outputFile), mixer.populateQueue()]
else:
futures = [mixer.hanldeFile(outputFile), mixer.populateQueue()]
tasks = asyncio.wait(futures, loop=loop)
results = loop.run_until_complete(tasks)
loop.close()
main(gzip=False, outputFile='/tmp/a.txt')
main(gzip=True, outputFile='/tmp/a.txt.gz')