0

We are running quit a large Python code to randomly scan the parameter space of some physics models (So, it is very difficult to give a minimal example, sorry). Evaluating one parameter point takes about 300ms, but sometimes (I don't know why) the evaluation suddenly takes several hours which kills the CPU budget we have on a computing cluster.

So, my idea was to use threading to give each evaluation of a parameter point a maximal time for running. If the evaluation takes longer, then I can ignore this point as being unphysical. Now, this does not seem to work. I start the calculation in a new thread, join it to the main thread with a timeout set to, say, 1 second, but the main thread still keeps on waiting for the calculation to terminate (which takes significantly longer than 1 second).

How is this possible? How does threading measure the time the new thread is already running? I have to say that during the evaluation of one parameter point I heavily use nlopt, numpy and scipy. Most of which is, as I assume, written not directly in python but rather some binaries are used to speed up the calculation. Does this affect threading (since the functions are "black boxes" to it)?

Thanks!

Toeffel
  • 11
  • 1
  • 1
    Did you read the [documentation for `join()`](https://docs.python.org/3.5/library/threading.html#threading.Thread.join)? Quote: *As `join()` always returns `None`, **you must call `is_alive()` after `join()` to decide whether a timeout happened** – if the thread is still alive, the `join()` call timed out.* – Bakuriu Sep 17 '16 at 16:56
  • Also: the standard interface does **not** provide any way of killing a thread. You are better off using multi *processing*, which are way easier to kill. – Bakuriu Sep 17 '16 at 16:59

1 Answers1

1

Short answer:

I don't think threading.join checks timeout. You have to check if it has timed out.

In either case to get a working solution, a minimal code snippet would help. This is mostly a guess, but if the main process isn't checking the timeout then it will just keep on keeping on.

Longer answer:

Let's see where the timeout parameter goes:

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L1060

self._wait_for_tstate_lock(timeout=max(timeout, 0))

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L1062-L1074

def _wait_for_tstate_lock(self, block=True, timeout=-1):
    # Issue #18808: wait for the thread state to be gone.
    # At the end of the thread's life, after all knowledge of the thread
    # is removed from C data structures, C code releases our _tstate_lock.
    # This method passes its arguments to _tstate_lock.acquire().
    # If the lock is acquired, the C code is done, and self._stop() is
    # called.  That sets ._is_stopped to True, and ._tstate_lock to None.
    lock = self._tstate_lock
    if lock is None:  # already determined that the C code is done
        assert self._is_stopped
    elif lock.acquire(block, timeout):
        lock.release()
        self._stop()

If no lock make sure the thread is stopped. Otherwise acquire the lock with given the parameters block and timeout.

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L117

def acquire(self, blocking=True, timeout=-1):
    """Acquire a lock, blocking or non-blocking.
    When invoked without arguments: if this thread already owns the lock,
    increment the recursion level by one, and return immediately. Otherwise,
    if another thread owns the lock, block until the lock is unlocked. Once
    the lock is unlocked (not owned by any thread), then grab ownership, set
    the recursion level to one, and return. If more than one thread is
    blocked waiting until the lock is unlocked, only one at a time will be
    able to grab ownership of the lock. There is no return value in this
    case.
    When invoked with the blocking argument set to true, do the same thing
    as when called without arguments, and return true.
    When invoked with the blocking argument set to false, do not block. If a
    call without an argument would block, return false immediately;
    otherwise, do the same thing as when called without arguments, and
    return true.
    When invoked with the floating-point timeout argument set to a positive
    value, block for at most the number of seconds specified by timeout
    and as long as the lock cannot be acquired.  Return true if the lock has
    been acquired, false if the timeout has elapsed.
    """
    me = get_ident()
    if self._owner == me:
        self._count += 1
        return 1
    rc = self._block.acquire(blocking, timeout)
    if rc:
        self._owner = me
        self._count = 1
    return rc

To acquire the lock get the thread identity. Increment a count.

Really get the lock.

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L98

self._block = _allocate_lock()

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L33

_allocate_lock = _thread.allocate_lock

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L4

import _thread

https://github.com/python/cpython/blob/7b90e3674be86479c51faf872d0b9367c9fc2f96/Modules/_threadmodule.c#L1300-L1301

static PyMethodDef thread_methods[] = {
    {"start_new_thread",        (PyCFunction)thread_PyThread_start_new_thread,
     METH_VARARGS, start_new_doc},
    {"start_new",               (PyCFunction)thread_PyThread_start_new_thread,
     METH_VARARGS, start_new_doc},
    {"allocate_lock",           (PyCFunction)thread_PyThread_allocate_lock,
     METH_NOARGS, allocate_doc},
    {"allocate",                (PyCFunction)thread_PyThread_allocate_lock,
     METH_NOARGS, allocate_doc},
    {"exit_thread",             (PyCFunction)thread_PyThread_exit_thread,
     METH_NOARGS, exit_doc},
    {"exit",                    (PyCFunction)thread_PyThread_exit_thread,
     METH_NOARGS, exit_doc},
    {"interrupt_main",          (PyCFunction)thread_PyThread_interrupt_main,
     METH_NOARGS, interrupt_doc},
    {"get_ident",               (PyCFunction)thread_get_ident,
     METH_NOARGS, get_ident_doc},
    {"_count",                  (PyCFunction)thread__count,
     METH_NOARGS, _count_doc},
    {"stack_size",              (PyCFunction)thread_stack_size,
     METH_VARARGS, stack_size_doc},
    {"_set_sentinel",           (PyCFunction)thread__set_sentinel,
     METH_NOARGS, _set_sentinel_doc},
    {NULL,                      NULL}           /* sentinel */
};

Define the allocated_lock method as with the type PyCFunction and name thread_PyThread_allocate_lock

https://github.com/python/cpython/blob/7b90e3674be86479c51faf872d0b9367c9fc2f96/Modules/_threadmodule.c#L1128-L1131

static PyObject *
thread_PyThread_allocate_lock(PyObject *self)
{
    return (PyObject *) newlockobject();
}

https://github.com/python/cpython/blob/7b90e3674be86479c51faf872d0b9367c9fc2f96/Modules/_threadmodule.c#L538-L553

static lockobject *
newlockobject(void)
{
    lockobject *self;
    self = PyObject_New(lockobject, &Locktype);
    if (self == NULL)
        return NULL;
    self->lock_lock = PyThread_allocate_lock();
    self->locked = 0;
    self->in_weakreflist = NULL;
    if (self->lock_lock == NULL) {
        Py_DECREF(self);
        PyErr_SetString(ThreadError, "can't allocate lock");
        return NULL;
    }
    return self;
}

Allocate a new context and lock

https://github.com/python/cpython/blob/2d264235f6e066611b412f7c2e1603866e0f7f1b/Python/thread_pthread.h#L276

PyThread_type_lock
PyThread_allocate_lock(void)
{
    sem_t *lock;
    int status, error = 0;

    dprintf(("PyThread_allocate_lock called\n"));
    if (!initialized)
        PyThread_init_thread();

    lock = (sem_t *)PyMem_RawMalloc(sizeof(sem_t));

    if (lock) {
        status = sem_init(lock,0,1);
        CHECK_STATUS("sem_init");

        if (error) {
            PyMem_RawFree((void *)lock);
            lock = NULL;
        }
    }

    dprintf(("PyThread_allocate_lock() -> %p\n", lock));
    return (PyThread_type_lock)lock;
}

https://github.com/python/cpython/blob/2d264235f6e066611b412f7c2e1603866e0f7f1b/Python/thread.c#L60-L77

void
PyThread_init_thread(void)
{
#ifdef Py_DEBUG
    char *p = Py_GETENV("PYTHONTHREADDEBUG");

    if (p) {
        if (*p)
            thread_debug = atoi(p);
        else
            thread_debug = 1;
    }
#endif /* Py_DEBUG */
    if (initialized)
        return;
    initialized = 1;
    dprintf(("PyThread_init_thread called\n"));
    PyThread__init_thread();
}

https://github.com/python/cpython/blob/2d264235f6e066611b412f7c2e1603866e0f7f1b/Python/thread_pthread.h#L170-L176

static void
PyThread__init_thread(void)
{
#if defined(_AIX) && defined(__GNUC__)
    extern void pthread_init(void);
    pthread_init();
#endif
}

https://github.com/python/cpython/blob/f243de2bc8d940316ce8da778ec02a7bbe594de1/configure.ac#L3416

AC_CHECK_FUNCS(alarm accept4 setitimer getitimer bind_textdomain_codeset chown \
 clock confstr ctermid dup3 execv faccessat fchmod fchmodat fchown fchownat \
 fexecve fdopendir fork fpathconf fstatat ftime ftruncate futimesat \
 futimens futimes gai_strerror getentropy \
 getgrouplist getgroups getlogin getloadavg getpeername getpgid getpid \
 getpriority getresuid getresgid getpwent getspnam getspent getsid getwd \
 if_nameindex \
 initgroups kill killpg lchmod lchown lockf linkat lstat lutimes mmap \
 memrchr mbrtowc mkdirat mkfifo \
 mkfifoat mknod mknodat mktime mremap nice openat pathconf pause pipe2 plock poll \
 posix_fallocate posix_fadvise pread \
 pthread_init pthread_kill putenv pwrite readlink readlinkat readv realpath renameat \
 select sem_open sem_timedwait sem_getvalue sem_unlink sendfile setegid seteuid \
 setgid sethostname \
 setlocale setregid setreuid setresuid setresgid setsid setpgid setpgrp setpriority setuid setvbuf \
 sched_get_priority_max sched_setaffinity sched_setscheduler sched_setparam \
 sched_rr_get_interval \
 sigaction sigaltstack siginterrupt sigpending sigrelse \
 sigtimedwait sigwait sigwaitinfo snprintf strftime strlcpy symlinkat sync \
 sysconf tcgetpgrp tcsetpgrp tempnam timegm times tmpfile tmpnam tmpnam_r \
 truncate uname unlinkat unsetenv utimensat utimes waitid waitpid wait3 wait4 \
 wcscoll wcsftime wcsxfrm wmemcmp writev _getpty)

http://man7.org/linux/man-pages/man7/pthreads.7.html

All of this to ask two things: is timeout a float? and are you checking if isAlive?:

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

Community
  • 1
  • 1
jmunsch
  • 22,771
  • 11
  • 93
  • 114