1

I'm writing a python script to perform camera calibration with OpenCV.

I found that the cv2.findChessboardCorners function may take very long to run on certain images.

I would therefore like to be able to stop the function after a certain period of time has passed, and move on to the next image.

How do I do that?

for fname in images:        
    img = cv2.imread(fname)                            
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    ret, corners = cv2.findChessboardCorners(gray, (20, 17), None)   
Employee
  • 3,109
  • 5
  • 31
  • 50
  • 1
    This answer might have some possible solutions: [break the function after certain time](https://stackoverflow.com/questions/25027122/break-the-function-after-certain-time) – G. Anderson Dec 12 '18 at 23:12
  • 1
    Possibly related, helpful or dupe of [Timeout function if it takes too long to finish \[duplicate\]](https://stackoverflow.com/questions/2281850/timeout-function-if-it-takes-too-long-to-finish) or [Timeout on a function call](https://stackoverflow.com/questions/492519/timeout-on-a-function-call) – chickity china chinese chicken Dec 12 '18 at 23:12

1 Answers1

0

You could use Pebble as a multiprocessing library, which allows scheduling tasks. The code below also uses multiple threads for the processing:

from pebble import ProcessPool
from concurrent.futures import TimeoutError
import cv2
import glob
import os

CALIB_FOLDER = "path/to/folder"
# chessboard size
W = 10
H = 7


def f(fname):
    img = cv2.imread(fname)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    # Find the chessboard corners
    ret, corners = cv2.findChessboardCorners(gray, (W-1, H-1), None)
    return ret, corners


calib_imgs = glob.glob(os.path.join(CALIB_FOLDER, "*.jpg"))
calib_imgs = sorted(calib_imgs)

futures = []
with ProcessPool(max_workers=6) as pool:
    for fname in calib_imgs:          
        future = pool.schedule(f, args=[fname], timeout=10)
        futures.append(future)

for idx, future in enumerate(futures):
    try:
        ret, corners = future.result()  # blocks until results are ready
        print(ret)
    except TimeoutError as error:
        print("{} skipped. Took longer than {} seconds".format(calib_imgs[idx], error.args[1]))
    except Exception as error:
        print("Function raised %s" % error)
        print(error.traceback)  # traceback of the function
Daniel
  • 391
  • 4
  • 17