0

I'm trying to speed up calculations for extensive real time object detection and doing computation on it.

I'm using OpenCV with thread pool and producer, consumer for the video capture. But the execution speed is the same as the serial one.

How would I improve the speed of the execution ?

if __name__ == "__main__":
    video_name = '2016-11-18_07-30-01.h264'

    cap = cv2.VideoCapture(video_name)

    det = detector.CarDetector()
    car_tracker = Sort_Algorithm.Sort()
    ped_tracker = Sort_Algorithm.Sort()
    df_region, df_line = load_filter()
    region = Region(df_region)
    distance = compute_max_polygon_diagonal(df_region) * 0.1
    region_buffered = region.buffer(distance)

    threadn = cv2.getNumberOfCPUs()
    pool = ThreadPool(processes = 2)
    pending = deque()
    threaded_mode = True
    lock = threading.Lock()
    while True:
        while len(pending) > 0 and pending[0].ready():
            res = pending.popleft().get()
            cv2.imshow('video ', res)

        if  len(pending) < threadn:
            ret, frame = cap.read()
            if threaded_mode:
                t1 = time.time()
                H = [-2.01134074616, -16.6502442427, -1314.05715739, -3.35391526592, -22.3546973012, 2683.63584335,
                     -0.00130731963137, -0.0396207582264, 1]
                matrix = np.reshape(H, (3, 3))
                dst = cv2.warpPerspective(frame.copy(), matrix, (frame.shape[1], frame.shape[0]))
                task = pool.apply_async(pipeline, (lock, frame.copy(),car_tracker, ped_tracker,df_region,region_buffered, df_line, det, dst, matrix))

                cv2.imshow('dst', dst)
            else:
                task = DummyTask(pipeline,(lock, frame.copy(),car_tracker, ped_tracker,df_region, region_buffered, df_line, det, dst, matrix))

            pending.append(task)
        ch = cv2.waitKey(1)
        if ch == ord(' '):
            threaded_mode = not threaded_mode
        if ch == 27:
            break

The code for pipeline:

def pipeline(lock, img, car_tracker, ped_tracker, df_region, region_buffered, df_line, det, dst, H):
    lock.acquire()
    global point_lists
    global df_car_lists
    global frame_idx
    global counter
    global  data_peds
    global  data_cars
    global  genera_data_pd_cars
    global  genera_data_pd_peds

    car_box, ped_box = det.get_localization(img)
    car_detections = car_tracker.update(np.array(car_box))
    ped_detections = ped_tracker.update(np.array(ped_box))
    saved_region = df_region.values
    saved_region = np.delete(saved_region, 2, 1)
    frame_idx+=1
    cv2.warpPerspective(np.array(df_line, dtype=np.float32), H, (df_line.shape[1], df_line.shape[0]))

    cv2.polylines(dst, np.int32([[saved_region]]), False, color=(255, 0, 0))
    cv2.polylines(dst, np.int32([np.array(df_line, dtype=np.float32)]), False, color=(255, 0, 0))

    for trk in car_detections:
        trk = trk.astype(np.int32)
        helpers.draw_box_label(img, trk, trk[4])  # Draw the bounding boxes on the

    for other in ped_detections:
            other = other.astype(np.int32)
            helpers.draw_box_label(img, other, other[4])  # Draw the bounding boxes on the

    for trk in car_detections:
        trk = trk.astype(np.int32)
        p = np.array([[((trk[1] + trk[3]) / 2,  (trk[0] + trk[2]) / 2)]], dtype=np.float32)
        center_pt = cv2.perspectiveTransform(p, H)
        ptx = center_pt.T.item(0)
        pty = center_pt.T.item(1)
        df_cars = compute(trk[4], ptx, pty, frame_idx, df_region, region_buffered, df_line)
        genera_data_pd_cars = genera_data_pd_cars.append(df_cars)
        for other in ped_detections:
            other = other.astype(np.int32)
            p = np.array([[((other[1] + other[3]) / 2, (other[0] + other[2]) / 2)]], dtype=np.float32)
            center_pt = cv2.perspectiveTransform(p, H)
            ptx = center_pt.T.item(0)
            pty = center_pt.T.item(1)
            df_peds = compute(other[4], ptx, pty, frame_idx, df_region, region_buffered, df_line)
            genera_data_pd_peds = genera_data_pd_cars.append(df_peds)
            query = "is_in_region == True and is_in_region_now == True"
            df_peds = genera_data_pd_peds.query(query)
            query = " is_in_region == True"
            df_cars = genera_data_pd_cars.query(query)
            if len(df_cars)> 1 and len(df_peds) > 1:

                df_car_in_t_range_ped = select_slice(df_cars, df_peds)
                df_ped_in_t_range_car = select_slice(df_peds, df_cars)
                t_abs_crossing_car = df_cars['t_abs_at_crossing'].iloc[0]
                t_abs_crossing_ped = df_peds['t_abs_at_crossing'].iloc[0]

                dt_crossing = t_abs_crossing_car - t_abs_crossing_ped

                is_able_to_pass_before_ped = \
                    ((df_car_in_t_range_ped['t_abs_at_crossing_estimated'] -
                      t_abs_crossing_ped) > 0).any()

                behavior = Behavior(  # is_passed_before_ped
                    dt_crossing < 0,
                    # is_able_to_stop
                    df_car_in_t_range_ped['is_able_to_halt'].any(),
                    # is_too_fast
                    df_car_in_t_range_ped['is_too_fast'].any(),
                    # is_close_enough
                    df_car_in_t_range_ped['is_close_enough'].any(),
                    # is_able_to_pass_before_ped
                    is_able_to_pass_before_ped)

                interaction = Interaction(trk[4], other[4])
                interaction = interaction.assess_behavior(behavior)
                code, res, msg = interaction.code, interaction.res, interaction.msg
                print(msg)
                genera_data_pd_cars = genera_data_pd_cars.iloc[0:0]
                genera_data_pd_peds = genera_data_pd_peds.iloc[0:0]
    lock.release()
    return img

2 Answers2

0

Threads aren't executed in parallel in cpython. Try using the ProcessPoolExecutor instead.

seenorth
  • 17
  • 1
  • 9
  • I have problems with picklable objects I couldn't use it – Waleed Saleh Mar 20 '19 at 11:09
  • Do you know how to modify the code to work with ProcessPoolExecutor ? What needs to be changed ? – Waleed Saleh Mar 20 '19 at 11:12
  • I'm not 100% sure myself, there seem to be other answers to the pickle problem https://stackoverflow.com/a/1816969/7062162 . But you're also using a lock to access shared data, which I think is also not allowed with processes, because processes can't share data. If the processes don't work out for you, you might also consider asyncio. – seenorth Mar 20 '19 at 11:26
  • I use tensorflow so it has an issue too – Waleed Saleh Mar 20 '19 at 11:27
  • @seenorth your answer is partially true, and does not hold for IO bound tasks, you only need `GIL` for executing python code, so if you have multiple IO bound threads, they don't need the GIL and don't need to execute python code and hence can run in parallel. – Rohit Mar 20 '19 at 12:00
0

Multi-threading in python for CPU bound tasks is limited by GIL and effectively makes single thread run a time.

Ofcourse if you launch multiple threads for CPU bound tasks the performance is going to be even degraded because there is lot of overhead for both for kernel and python interpreter to manage these threads.

Kernel wants to schedule these threads and python wants to restrict these threads from running simultaneous and this results lot of context switches happening which degrades the performance.

If you are using just numpy in the threads then you would be fine as numpy isn't impacted by GIL since it uses atomic operations, but I am not sure if that is true for OpenCV as well.

Threads in python arn't meant for computation tasks.

This is classic problem of threads with python, consider using multiprocessing and there are number of articles on this topic, you might want to check few of them.

Rohit
  • 3,659
  • 3
  • 35
  • 57