1

I'm stuck with a variation of sliding window problem!

Usually we set the number of element to slide but in my case I want to slide the time!

The goal that I would like to reach is a function (thread in this case) that is able to create a "time" windows in seconds (given by user).

Starting from the first element of the queue in this case:

[datetime.time(7, 6, 14, 537370), 584 add 5 seconds -> 7:6:19.537370 (ending point) and sum all elements in this interval:

 [datetime.time(7, 6, 14, 537370), 584]
 [datetime.time(7, 6, 18, 542798), 761]

Total: 584+761= 1345

Then create another "windows" with the second elements and goes on. IMPORTANT: One item can be part of several window. the item are generated meanwhile, a naif solution with function that sleep for n second and then flush the queue is not good for my problem.

I think its a variation of this post: Flexible sliding window (in Python)

But still can't solve the problem! Any help or suggests will be appreciated. Thanks!

Example list of elements:

 [datetime.time(7, 6, 14, 537370), 584]
 [datetime.time(7, 6, 18, 542798), 761]
 [datetime.time(7, 6, 20, 546007), 848]
 [datetime.time(7, 6, 24, 550969), 20]
 [datetime.time(7, 6, 27, 554370), 478]
 [datetime.time(7, 6, 27, 554628), 12]
 [datetime.time(7, 6, 31, 558919), 29]
 [datetime.time(7, 6, 31, 559562), 227]
 [datetime.time(7, 6, 32, 560863), 379]
 [datetime.time(7, 6, 35, 564863), 132]
 [datetime.time(7, 6, 37, 567276), 651]
 [datetime.time(7, 6, 38, 568652), 68]
 [datetime.time(7, 6, 40, 569861), 100]
 [datetime.time(7, 6, 41, 571459), 722]
 [datetime.time(7, 6, 44, 574802), 560]

...

Code:

 import random
 import time
 import threading
 import datetime
 from multiprocessing import Queue

 q = Queue()

 #this is a producer that put elements in queue

 def t1():
     element = [0,0]
     while True:
         time.sleep(random.randint(0, 5))
         element[0] = datetime.datetime.now().time()
         element[1] = random.randint(0, 1000)
         q.put(element)


 #this is a consumer that sum elements inside a window of n seconds
 #Ineed something a sliding window time of ten seconds that sum all elements for n seconds

 def t2():
     windowsize = 5 #size of the window 5 seconds
     while not queue.empty():
         e = q.get()
         start = e[0] #the first element is the beginning point
         end = start + datetime.timedelta(seconds=windowsize) #ending point
         sum += e[1]
         #some code that solve the problem :)



 a = threading.Thread(target=t1)
 a.start()

 b = threading.Thread(target=t2)
 b.start()

 while True:
     time.sleep(1)
Community
  • 1
  • 1
egariM
  • 175
  • 3
  • 13
  • Can an element be part of several of your "windows" or do you want to consume an element from the queue as soon as it is summed to a window? If a element can be part of several windows, what is the mechanism that eventually removes elements from the queue ("this element is definitely not needed anymore, let's remove it to avoid filling up memory")? This is definitely solvable either way but just need to know what exactly it is that you want to achieve. – Hannu Feb 13 '17 at 09:15
  • One element can be part of several window, add to the original post description, thanks. – egariM Feb 13 '17 at 09:26
  • So do I understand correctly that as soon as your "queue" has time difference of the first and last element of over 10 seconds, then it is ok to expire the oldest entries until everything fits again in 10 seconds frame (or whatever we decide to use as the maximum?) and then your "t2" can be used to query for a window of size 0-10 seconds starting from the first element? – Hannu Feb 13 '17 at 09:30
  • t0 is the time when the item arrived, t0+5 seconds is the ending of windows. when t0+5 elapsed the element go out and the window slides to next element! – egariM Feb 13 '17 at 09:41
  • The maximum is based on the window size starting from fist element datetime. – egariM Feb 13 '17 at 10:15

2 Answers2

0

Would this do? This is how I understood your problem. What this does is it creates a class that keeps track of things. You either add to this by tw.insert() or sum with tw.sum_window(seconds).

When you initialise TimeWindow, you can give it a max size parameter, default is 10 seconds. When you add elements or calculate sums, it does a clean up so that before every insert or sum operation, first element time e[0][0] and last element time e[n][0] are within 10 seconds of each other. Older entries are expunged. A "poller" thread is there to track your requests.

I have added two queues as I do not know what you intend to do with results. Now if you want to request data starting from now to 5 seconds in the future, you create a request and put it in queue. The request has a random id so that you can match it to results. Your main thread needs to monitor result queue and after five seconds, every request sent to queue return with the same id and the sum.

If this is not what you want to do, then I just don't understand what is it that you try to achieve here. Even this is already rather complicated and there may be a much simpler way to achieve what you intend to do.

import random
import time
import threading
import datetime
import Queue
import uuid

from collections import deque

q_lock = threading.RLock()


class TimeWindow(object):
    def __init__(self, max_size=10):
        self.max_size = max_size
        self.q = deque()

    def expire(self):
        time_now = datetime.datetime.now()
        while True:
            try:
                oldest_element = self.q.popleft()
                oe_time = oldest_element[0]
                if oe_time + datetime.timedelta(seconds=self.max_size) > time_now:
                    self.q.appendleft(oldest_element)
                    break

            except IndexError:
                break

    def insert(self,elm):
        self.expire()
        self.q.append(elm)

    def sum_window(self, start, end):
        self.expire()
        try:
            _ = self.q[0]
        except IndexError:
            return 0
        result=0
        for f in self.q:
            if start < f[0] < end:
                result += f[1]
            else:
                pass
        return result


tw = TimeWindow()


def t1():
    while True:
        time.sleep(random.randint(0, 3))
        element = [datetime.datetime.now(), random.randint(0,1000)]
        with q_lock:
            tw.insert(element)


def poller(in_q, out_q):
    pending = []
    while True:
        try:
            new_request = in_q.get(0.1)
            new_request["end"] = new_request["start"] + datetime.timedelta(seconds=new_request["frame"])
            pending.append(new_request)
        except Queue.Empty:
            pass

        new_pending = []
        for a in pending:
            if a["end"] < datetime.datetime.now():
                with q_lock:
                    r_sum = tw.sum_window(a["start"], a["end"])
                r_structure = {"id": a["id"], "result": r_sum}
                out_q.put(r_structure)
            else:
                new_pending.append(a)
        pending = new_pending


a = threading.Thread(target=t1)
a.daemon = True
a.start()
in_queue = Queue.Queue()
result_queue = Queue.Queue()

po = threading.Thread(target=poller, args=(in_queue, result_queue,))
po.daemon = True
po.start()

while True:
    time.sleep(1)
    newr = {"id": uuid.uuid4(), "frame": 5, "start": datetime.datetime.now()}
    in_queue.put(newr)
    try:
        ready = result_queue.get(0)
        print ready
    except Queue.Empty:
        pass
Hannu
  • 11,685
  • 4
  • 35
  • 51
  • First of all a big thanks for helping! Mmm the problem is that the tw.sum_window(5) should be executed at the ending of 5 seconds when time windows ended! – egariM Feb 13 '17 at 11:01
  • I am a bit puzzled how you want this to be calculated. Do you mean you would just like a "peek in history" so that if you executed sum_window now, it would pick the time most recently inserted and count back five seconds from that and sum them up? So basically the same thing I have done but reversed? – Hannu Feb 13 '17 at 11:22
  • An example could be more self explenatory ) the first element arrive at 12:02:50.164829 value 8 than others ... ... the function "sum" should be execute at 12:02:55.164829 (t0+5 seconds) and then iter continue based with the datetime of successor element. – egariM Feb 13 '17 at 12:04
  • sum_window and max_size should be the same in your example.. In my case the trigger should be exactly (t0+n seconds) :) – egariM Feb 13 '17 at 12:22
  • ok I think I understand now. Let me think about it for a sec. – Hannu Feb 13 '17 at 12:35
  • No, actually I don't. Let's assume for the sake of simplicity elements arrive every second and we name them 1,2,3,4,5,6,7,8,9,10. Can you then decide for example at t=3 that now I need the sum of 5 seconds of data, in which case something would be launched at t=3 and then at t=8 it would return a result when 5 seconds have passed? Or do you launch a query at t=8 and then expect it to return immediately with sum of 3-8? If of the former sort, how do you expect to handle results? Can several queries waiting for 5 seconds be running in parallel? All this can be done but need to know what to do – Hannu Feb 13 '17 at 12:51
  • I have modified my answer with another guess of what you want to do. – Hannu Feb 13 '17 at 13:18
  • I add an example of output that should help! – egariM Feb 13 '17 at 21:25
0
garim@wof:~$ python solution.py
1 t1 produce element:  16:09:30.472497   1
2 t1 produce element:  16:09:33.475714   9
3 t1 produce element:  16:09:34.476922   10
4 t1 produce element:  16:09:37.480100   7
solution:  16:09:37.481171   {'id': UUID('adff334f-a97a-459d-8dcc-f28309e25574'), 'result': 19}
5 t1 produce element:  16:09:38.481352   10
solution:  16:09:38.482687   {'id': UUID('0a7481e5-e993-439a-9f7e-2c5aeef86155'), 'result': 19}

It still doent works :( I add a counter for each element it inserts with function t1. The goal is do the sum (result_queue.get) at this time:

16:09:35.472497 ---> 16:09:30.472497 + 5 seconds

no before. Only then the element goes out. The next time the sum will be done at:

16:09:35.475714 ---> 16:09:33.475714 + 5 seconds

I understand that it's hard to explain.. With both of your solution the time window slide so I can consider the problem solved :) I will try to improve when the function sum will be execute, that time trigger is important. I acquire a lot useful knowledge. Thanks for helping.

egariM
  • 175
  • 3
  • 13