CPython の threading を読み解く

2020.06.29

概要

Python の threading モジュールは、Python インタプリタのスレッド操作の高レベルな API を提供する:

threading - Thread-based parallelism - Python 3.8.3 documentation

この文章では、 threading モジュールでのスレッド操作を行った際のインタプリタの動きを、 CPython の実装を追うことで確認する。

準備

対象

Python 3.8.3 としてリリースされた CPython のソースコードを対象に行う:

python/cpython

Tag: v3.8.3

Commit SHA: 6f8c8320e9eac9bc7a7f653b43506e75916ce8e8

環境

POSIX Threads がサポートされた Linux x64 上での動きを追う

前提知識

アプリケーションコード

以下の Python コードをインタプリタにて実行した場合の動きを確認する:

import threading
from time import sleep

def countdown():
    count = 10
    while (count > 0):
        print(f"{threading.get_ident()}: {count}")
        count = count - 1
        sleep(1)

th1 = threading.Thread(target=countdown)
th2 = threading.Thread(target=countdown)
th1.start()
th2.start()
th1.join()
th2.join()
  • 実行結果

    $ ./python test_thread.py
    140150431667968: 10
    140150352901888: 10
    140150431667968: 9
    140150352901888: 9
    140150431667968: 8
    140150352901888: 8
    140150431667968: 7
    140150352901888: 7
    140150431667968: 6
    140150352901888: 6
    140150431667968: 5
    140150352901888: 5
    140150431667968: 4
    140150352901888: 4
    140150431667968: 3
    140150352901888: 3
    140150431667968: 2
    140150352901888: 2
    140150431667968: 1
    140150352901888: 1
    

検証

threading.Thread のインスタンス化

th1 = threading.Thread(target=countdown)
th2 = threading.Thread(target=countdown)

threading.Thread.__init__() から確認する。

  • __init__()

    # Lib/threading.py:761
    def __init__(self, group=None, target=None, name=None,
                     args=(), kwargs=None, *, daemon=None):
            """This constructor should always be called with keyword arguments. Arguments are:
    
            *group* should be None; reserved for future extension when a ThreadGroup
            class is implemented.
    
            *target* is the callable object to be invoked by the run()
            method. Defaults to None, meaning nothing is called.
    
            *name* is the thread name. By default, a unique name is constructed of
            the form "Thread-N" where N is a small decimal number.
    
            *args* is the argument tuple for the target invocation. Defaults to ().
    
            *kwargs* is a dictionary of keyword arguments for the target
            invocation. Defaults to {}.
    
            If a subclass overrides the constructor, it must make sure to invoke
            the base class constructor (Thread.__init__()) before doing anything
            else to the thread.
    
            """
            assert group is None, "group argument must be None for now"
            if kwargs is None:
                kwargs = {}
            self._target = target
            self._name = str(name or _newname())
            self._args = args
            self._kwargs = kwargs
            if daemon is not None:
                self._daemonic = daemon
            else:
                self._daemonic = current_thread().daemon
            self._ident = None
            if _HAVE_THREAD_NATIVE_ID:
                self._native_id = None
            self._tstate_lock = None
            self._started = Event()
            self._is_stopped = False
            self._initialized = True
            # Copy of sys.stderr used by self._invoke_excepthook()
            self._stderr = _sys.stderr
            self._invoke_excepthook = _make_invoke_excepthook()
            # For debugging and _after_fork()
            _dangling.add(self)
    

引数 target として渡される Callable は self._target に格納される。

スレッドの実行開始

th1.start()
th2.start()
  • start()

    # Lib/threading.py:834
    def start(self):
            """Start the thread's activity.
    
            It must be called at most once per thread object. It arranges for the
            object's run() method to be invoked in a separate thread of control.
    
            This method will raise a RuntimeError if called more than once on the
            same thread object.
    
            """
            if not self._initialized:
                raise RuntimeError("thread.__init__() not called")
    
            if self._started.is_set():
                raise RuntimeError("threads can only be started once")
            with _active_limbo_lock:
                _limbo[self] = self
            try:
                _start_new_thread(self._bootstrap, ())
            except Exception:
                with _active_limbo_lock:
                    del _limbo[self]
                raise
            self._started.wait()
    

既に開始されたスレッドかどうか確認した後に、 _start_new_thread() によって実際にスレッドを開始する。

  • _start_new_thread

    # Lib/threading.py:32
    # Rename some stuff so "from threading import *" is safe
    _start_new_thread = _thread.start_new_thread
    _allocate_lock = _thread.allocate_lock
    _set_sentinel = _thread._set_sentinel
    get_ident = _thread.get_ident
    

_start_new_thread_thread.start_new_thread のエイリアスであった。 _thread は、 CPython インタプリタの一部であり、Native Extension として C で記述されている。実体は Modules/_threadmodule.c にある。

ネイティブモジュールを追う

では、ネイティブモジュール内を読む。まずは、 Python に露出している API とネイティブ (C) で記述された関数とのマッピングを確認する。先立って、モジュール定義からメソッドテーブルを確認する。

  • threadmodule

    // Modules/_threadmodule.c:1497
    static struct PyModuleDef threadmodule = {
        PyModuleDef_HEAD_INIT,
        "_thread",
        thread_doc,
        -1,
        thread_methods,
        NULL,
        NULL,
        NULL,
        NULL
    };
    

メソッドテーブルは thread_methods である。

  • thread_methods

    // Modules/_threadmodule.c:1446
    static PyMethodDef thread_methods[] = {
        {"start_new_thread",        (PyCFunction)thread_PyThread_start_new_thread,
         METH_VARARGS, start_new_doc},
        {"start_new",               (PyCFunction)thread_PyThread_start_new_thread,
         METH_VARARGS, start_new_doc},
        {"allocate_lock",           thread_PyThread_allocate_lock,
         METH_NOARGS, allocate_doc},
        {"allocate",                thread_PyThread_allocate_lock,
         METH_NOARGS, allocate_doc},
        {"exit_thread",             thread_PyThread_exit_thread,
         METH_NOARGS, exit_doc},
        {"exit",                    thread_PyThread_exit_thread,
         METH_NOARGS, exit_doc},
        {"interrupt_main",          thread_PyThread_interrupt_main,
         METH_NOARGS, interrupt_doc},
        {"get_ident",               thread_get_ident,
         METH_NOARGS, get_ident_doc},
    #ifdef PY_HAVE_THREAD_NATIVE_ID
        {"get_native_id",           thread_get_native_id,
         METH_NOARGS, get_native_id_doc},
    #endif
        {"_count",                  thread__count,
         METH_NOARGS, _count_doc},
        {"stack_size",              (PyCFunction)thread_stack_size,
         METH_VARARGS, stack_size_doc},
        {"_set_sentinel",           thread__set_sentinel,
         METH_NOARGS, _set_sentinel_doc},
        {"_excepthook",              thread_excepthook,
         METH_O, excepthook_doc},
        {NULL,                      NULL}           /* sentinel */
    };
    

Python 上の _thread.start_new_thread() の実体は thread_PyThread_start_new_thread である。

  • thread_PyThread_start_new_thread()

    // Modules/_threadmodule.c:1024
    static PyObject *
    thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs)
    {
        PyObject *func, *args, *keyw = NULL;
        struct bootstate *boot;
        unsigned long ident;
    
        if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3,
                               &func, &args, &keyw))
            return NULL;
        if (!PyCallable_Check(func)) {
            PyErr_SetString(PyExc_TypeError,
                            "first arg must be callable");
            return NULL;
        }
        if (!PyTuple_Check(args)) {
            PyErr_SetString(PyExc_TypeError,
                            "2nd arg must be a tuple");
            return NULL;
        }
        if (keyw != NULL && !PyDict_Check(keyw)) {
            PyErr_SetString(PyExc_TypeError,
                            "optional 3rd arg must be a dictionary");
            return NULL;
        }
        boot = PyMem_NEW(struct bootstate, 1);
        if (boot == NULL)
            return PyErr_NoMemory();
        boot->interp = _PyInterpreterState_Get();
        boot->func = func;
        boot->args = args;
        boot->keyw = keyw;
        boot->tstate = _PyThreadState_Prealloc(boot->interp);
        if (boot->tstate == NULL) {
            PyMem_DEL(boot);
            return PyErr_NoMemory();
        }
        Py_INCREF(func);
        Py_INCREF(args);
        Py_XINCREF(keyw);
        PyEval_InitThreads(); /* Start the interpreter's thread-awareness */
        ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
        if (ident == PYTHREAD_INVALID_THREAD_ID) {
            PyErr_SetString(ThreadError, "can't start new thread");
            Py_DECREF(func);
            Py_DECREF(args);
            Py_XDECREF(keyw);
            PyThreadState_Clear(boot->tstate);
            PyMem_DEL(boot);
            return NULL;
        }
        return PyLong_FromUnsignedLong(ident);
    }
    
    PyDoc_STRVAR(start_new_doc,
    "start_new_thread(function, args[, kwargs])\n\
    (start_new() is an obsolete synonym)\n\
    \n\
    Start a new thread and return its identifier.  The thread will call the\n\
    function with positional arguments from the tuple args and keyword arguments\n\
    taken from the optional dictionary kwargs.  The thread exits when the\n\
    function returns; the return value is ignored.  The thread will also exit\n\
    when the function raises an unhandled exception; a stack trace will be\n\
    printed unless the exception is SystemExit.\n");
    

PyEvel_InitThreads()

  • PyEvel_InitThreads()

    // Python/ceval.c:200
    void
    PyEval_InitThreads(void)
    {
        _PyRuntimeState *runtime = &_PyRuntime;
        struct _ceval_runtime_state *ceval = &runtime->ceval;
        struct _gil_runtime_state *gil = &ceval->gil;
        if (gil_created(gil)) {
            return;
        }
    
        PyThread_init_thread();
        create_gil(gil);
        PyThreadState *tstate = _PyRuntimeState_GetThreadState(runtime);
        take_gil(ceval, tstate);
    
        struct _pending_calls *pending = &ceval->pending;
        pending->lock = PyThread_allocate_lock();
        if (pending->lock == NULL) {
            Py_FatalError("Can't initialize threads for pending calls");
        }
    }
    

GIL が初期化されていなければ create_gil() で初期化し、続けて take_gil()GIL を獲得する。スレッドを建てようとするときに初めて GIL を初期化しているため、インタプリタがシングルスレッドで走っている時は GIL を利用しない。

PyThread_start_new_thread()

  • PyThread_start_new_thread()

    環境によって実装が異なる。 Windows なら Python/threda_nt.h 、 POSIX なら Python/thread_pthread.h が利用される。

    // Python/thread_pthread.h:236
    unsigned long
    PyThread_start_new_thread(void (*func)(void *), void *arg)
    {
        pthread_t th;
        int status;
    #if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
        pthread_attr_t attrs;
    #endif
    #if defined(THREAD_STACK_SIZE)
        size_t      tss;
    #endif
    
        dprintf(("PyThread_start_new_thread called\n"));
        if (!initialized)
            PyThread_init_thread();
    
    #if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
        if (pthread_attr_init(&attrs) != 0)
            return PYTHREAD_INVALID_THREAD_ID;
    #endif
    #if defined(THREAD_STACK_SIZE)
        PyThreadState *tstate = _PyThreadState_GET();
        size_t stacksize = tstate ? tstate->interp->pythread_stacksize : 0;
        tss = (stacksize != 0) ? stacksize : THREAD_STACK_SIZE;
        if (tss != 0) {
            if (pthread_attr_setstacksize(&attrs, tss) != 0) {
                pthread_attr_destroy(&attrs);
                return PYTHREAD_INVALID_THREAD_ID;
            }
        }
    #endif
    #if defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
        pthread_attr_setscope(&attrs, PTHREAD_SCOPE_SYSTEM);
    #endif
    
        pythread_callback *callback = PyMem_RawMalloc(sizeof(pythread_callback));
    
        if (callback == NULL) {
          return PYTHREAD_INVALID_THREAD_ID;
        }
    
        callback->func = func;
        callback->arg = arg;
    
        status = pthread_create(&th,
    #if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
                                 &attrs,
    #else
                                 (pthread_attr_t*)NULL,
    #endif
                                 pythread_wrapper, callback);
    
    #if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
        pthread_attr_destroy(&attrs);
    #endif
    
        if (status != 0) {
            PyMem_RawFree(callback);
            return PYTHREAD_INVALID_THREAD_ID;
        }
    
        pthread_detach(th);
    
    #if SIZEOF_PTHREAD_T <= SIZEOF_LONG
        return (unsigned long) th;
    #else
        return (unsigned long) *(unsigned long *) &th;
    #endif
    }
    

ようやく pthread_create() が登場する。 Python のスレッドは OS のネイティブスレッドであることがわかる。

PTHREAD_CREATE

start_routinepythread_wrapper である:

  • pythread_wrapper()

    // Python/thread_pthread.h:223
    static void *
    pythread_wrapper(void *arg)
    {
        /* copy func and func_arg and free the temporary structure */
        pythread_callback *callback = arg;
        void (*func)(void *) = callback->func;
        void *func_arg = callback->arg;
        PyMem_RawFree(arg);
    
        func(func_arg);
        return NULL;
    }
    

引数として渡された pythread_callbackcallback->func を、 callback->arg を引数として実行する。この funcarg は、 PyThread_start_new_thread へ渡された引数であり、 funct_bootstrapargstruct bootstate 型の boot である。

t_bootstrap

  • t_bootstrap()

    // Modules/_threadmodule.c:990
    static void
    t_bootstrap(void *boot_raw)
    {
        struct bootstate *boot = (struct bootstate *) boot_raw;
        PyThreadState *tstate;
        PyObject *res;
    
        tstate = boot->tstate;
        tstate->thread_id = PyThread_get_thread_ident();
        _PyThreadState_Init(&_PyRuntime, tstate);
        PyEval_AcquireThread(tstate);
        tstate->interp->num_threads++;
        res = PyObject_Call(boot->func, boot->args, boot->keyw);
        if (res == NULL) {
            if (PyErr_ExceptionMatches(PyExc_SystemExit))
                /* SystemExit is ignored silently */
                PyErr_Clear();
            else {
                _PyErr_WriteUnraisableMsg("in thread started by", boot->func);
            }
        }
        else {
            Py_DECREF(res);
        }
        Py_DECREF(boot->func);
        Py_DECREF(boot->args);
        Py_XDECREF(boot->keyw);
        PyMem_DEL(boot_raw);
        tstate->interp->num_threads--;
        PyThreadState_Clear(tstate);
        PyThreadState_DeleteCurrent();
        PyThread_exit_thread();
    }
    

bootstate 構造体 boot に詰められて渡された boot->func , boot->args , boot->keywPyObject_Call() に渡し、Python コードとして実行する。

Object Protocol - Python 3.8.3 documentation

boot , struct bootstate

bootstruct bootstate 型の変数であり、スレッド生成後に実行される Python コードや引数が格納されている。

  • struct bootstate

    // Modules/_threadmodule.c:982
    struct bootstate {
        PyInterpreterState *interp;
        PyObject *func;
        PyObject *args;
        PyObject *keyw;
        PyThreadState *tstate;
    };
    

実際に boot に詰められる中身を確認する。

threading モジュールの start() から辿ると:

  • _bootstrap() , _bootstrap_inner()

    # Lib/threading.py:852
                _start_new_thread(self._bootstrap, ())
    
    # Lib/threading.py:876
    def _bootstrap(self):
            # Wrapper around the real bootstrap code that ignores
            # exceptions during interpreter cleanup.  Those typically
            # happen when a daemon thread wakes up at an unfortunate
            # moment, finds the world around it destroyed, and raises some
            # random exception *** while trying to report the exception in
            # _bootstrap_inner() below ***.  Those random exceptions
            # don't help anybody, and they confuse users, so we suppress
            # them.  We suppress them only when it appears that the world
            # indeed has already been destroyed, so that exceptions in
            # _bootstrap_inner() during normal business hours are properly
            # reported.  Also, we only suppress them for daemonic threads;
            # if a non-daemonic encounters this, something else is wrong.
            try:
                self._bootstrap_inner()
            except:
                if self._daemonic and _sys is None:
                    return
                raise
    
    # Lib/threading.py:915
    def _bootstrap_inner(self):
            try:
                self._set_ident()
                self._set_tstate_lock()
                if _HAVE_THREAD_NATIVE_ID:
                    self._set_native_id()
                self._started.set()
                with _active_limbo_lock:
                    _active[self._ident] = self
                    del _limbo[self]
    
                if _trace_hook:
                    _sys.settrace(_trace_hook)
                if _profile_hook:
                    _sys.setprofile(_profile_hook)
    
                try:
                    self.run()
                except:
                    self._invoke_excepthook(self)
            finally:
                with _active_limbo_lock:
                    try:
                        # We don't call self._delete() because it also
                        # grabs _active_limbo_lock.
                        del _active[get_ident()]
                    except:
                        pass
    

self._bootstrap()self.run() をラップしているだけであった。

  • self.run()

    def run(self):
            """Method representing the thread's activity.
    
            You may override this method in a subclass. The standard run() method
            invokes the callable object passed to the object's constructor as the
            target argument, if any, with sequential and keyword arguments taken
            from the args and kwargs arguments, respectively.
    
            """
            try:
                if self._target:
                    self._target(*self._args, **self._kwargs)
            finally:
                # Avoid a refcycle if the thread is running a function with
                # an argument that has a member that points to the thread.
                del self._target, self._args, self._kwargs
    

Thread.run() はデフォルトでは self._target() を実行する他、オーバーライドすることもできる。

まとめ

  • CPython のスレッドは OS のネイティブスレッド。POSIX なら pthread
  • 別スレッドの生成時に初めて GIL の初期化が行われる。インタプリタがメインスレッドのみで走っている場合は GIL は初期化されず、用いられない。

スレッドの完了待ち合わせ

th1.join()
th2.join()
  • join()

    # Lib/threading.py:979
    def join(self, timeout=None):
            """Wait until the thread terminates.
    
            This blocks the calling thread until the thread whose join() method is
            called terminates -- either normally or through an unhandled exception
            or until the optional timeout occurs.
    
            When the timeout argument is present and not None, it should be a
            floating point number specifying a timeout for the operation in seconds
            (or fractions thereof). As join() always returns None, you must call
            is_alive() after join() to decide whether a timeout happened -- if the
            thread is still alive, the join() call timed out.
    
            When the timeout argument is not present or None, the operation will
            block until the thread terminates.
    
            A thread can be join()ed many times.
    
            join() raises a RuntimeError if an attempt is made to join the current
            thread as that would cause a deadlock. It is also an error to join() a
            thread before it has been started and attempts to do so raises the same
            exception.
    
            """
            if not self._initialized:
                raise RuntimeError("Thread.__init__() not called")
            if not self._started.is_set():
                raise RuntimeError("cannot join thread before it is started")
            if self is current_thread():
                raise RuntimeError("cannot join current thread")
    
            if timeout is None:
                self._wait_for_tstate_lock()
            else:
                # the behavior of a negative timeout isn't documented, but
                # historically .join(timeout=x) for x<0 has acted as if timeout=0
                self._wait_for_tstate_lock(timeout=max(timeout, 0))
    

処理本体は self._wait_for_tstate_lock() .

  • self._wait_for_tstate_lock()

    # Lob/threading
    def _wait_for_tstate_lock(self, block=True, timeout=-1):
            # Issue #18808: wait for the thread state to be gone.
            # At the end of the thread's life, after all knowledge of the thread
            # is removed from C data structures, C code releases our _tstate_lock.
            # This method passes its arguments to _tstate_lock.acquire().
            # If the lock is acquired, the C code is done, and self._stop() is
            # called.  That sets ._is_stopped to True, and ._tstate_lock to None.
            lock = self._tstate_lock
            if lock is None:  # already determined that the C code is done
                assert self._is_stopped
            elif lock.acquire(block, timeout):
                lock.release()
                self._stop()
    

self._tstate_lock をロックしてみて、ロックが獲得できたらスレッドが死んでいると判断しているらしい。 self._stop() はつまらない片付け処理だった。

_tstate_lock の初期化、獲得

self._tstate_lock がどこで初期化されているか確認する。

まず、実際に self._tstate_lock へ値を代入しているのは _set_tstate_lock() である:

  • _set_tstate_lock()

    # Lib/threading.py:903
    def _set_tstate_lock(self):
            """
            Set a lock object which will be released by the interpreter when
            the underlying thread state (see pystate.h) gets deleted.
            """
            self._tstate_lock = _set_sentinel()
            self._tstate_lock.acquire()
    
            if not self.daemon:
                with _shutdown_locks_lock:
                    _shutdown_locks.add(self._tstate_lock)
    

ロックに _set_sentinel() の返り値を代入した後に、ロックを獲得している。

_set_tstate_lock() の呼び出し元は _bootstrap_inner() であり、ネイティブスレッドの生成後、実際にアプリケーションコードを走らせ始める直前にロックを作成していることがわかる。

続いて、 _set_sentinel() を見ていく。実体は:

  • _set_sentinel

    # Lib/threading.py:32
    # Rename some stuff so "from threading import *" is safe
    _start_new_thread = _thread.start_new_thread
    _allocate_lock = _thread.allocate_lock
    _set_sentinel = _thread._set_sentinel
    get_ident = _thread.get_ident
    

_threading モジュールのメソッドテーブルを確認すると、実装は thread__set_sentinel である:

  • thread_set_sentinel()

    // Modules/_threadmodule.c:1211
    static PyObject *
    thread__set_sentinel(PyObject *self, PyObject *Py_UNUSED(ignored))
    {
        PyObject *wr;
        PyThreadState *tstate = PyThreadState_Get();
        lockobject *lock;
    
        if (tstate->on_delete_data != NULL) {
            /* We must support the re-creation of the lock from a
               fork()ed child. */
            assert(tstate->on_delete == &release_sentinel);
            wr = (PyObject *) tstate->on_delete_data;
            tstate->on_delete = NULL;
            tstate->on_delete_data = NULL;
            Py_DECREF(wr);
        }
        lock = newlockobject();
        if (lock == NULL)
            return NULL;
        /* The lock is owned by whoever called _set_sentinel(), but the weakref
           hangs to the thread state. */
        wr = PyWeakref_NewRef((PyObject *) lock, NULL);
        if (wr == NULL) {
            Py_DECREF(lock);
            return NULL;
        }
        tstate->on_delete_data = (void *) wr;
        tstate->on_delete = &release_sentinel;
        return (PyObject *) lock;
    }
    

Python から叩けるロックは lockobject 型のようである。まずは、この lockobject について確認する。

lockobject

タイプオブジェクト、メソッド定義を見る:

  • PyTypeObject Locktype

    // Modules/_threadmodule.c:228
    static PyTypeObject Locktype = {
        PyVarObject_HEAD_INIT(&PyType_Type, 0)
        "_thread.lock",                     /*tp_name*/
        sizeof(lockobject),                 /*tp_basicsize*/
        0,                                  /*tp_itemsize*/
        /* methods */
        (destructor)lock_dealloc,           /*tp_dealloc*/
        0,                                  /*tp_vectorcall_offset*/
        0,                                  /*tp_getattr*/
        0,                                  /*tp_setattr*/
        0,                                  /*tp_as_async*/
        (reprfunc)lock_repr,                /*tp_repr*/
        0,                                  /*tp_as_number*/
        0,                                  /*tp_as_sequence*/
        0,                                  /*tp_as_mapping*/
        0,                                  /*tp_hash*/
        0,                                  /*tp_call*/
        0,                                  /*tp_str*/
        0,                                  /*tp_getattro*/
        0,                                  /*tp_setattro*/
        0,                                  /*tp_as_buffer*/
        Py_TPFLAGS_DEFAULT,                 /*tp_flags*/
        0,                                  /*tp_doc*/
        0,                                  /*tp_traverse*/
        0,                                  /*tp_clear*/
        0,                                  /*tp_richcompare*/
        offsetof(lockobject, in_weakreflist), /*tp_weaklistoffset*/
        0,                                  /*tp_iter*/
        0,                                  /*tp_iternext*/
        lock_methods,                       /*tp_methods*/
    };
    
  • PyMethodDef lock_methods

    // Modules/_threadmodule.c:208
    static PyMethodDef lock_methods[] = {
        {"acquire_lock", (PyCFunction)(void(*)(void))lock_PyThread_acquire_lock,
         METH_VARARGS | METH_KEYWORDS, acquire_doc},
        {"acquire",      (PyCFunction)(void(*)(void))lock_PyThread_acquire_lock,
         METH_VARARGS | METH_KEYWORDS, acquire_doc},
        {"release_lock", (PyCFunction)lock_PyThread_release_lock,
         METH_NOARGS, release_doc},
        {"release",      (PyCFunction)lock_PyThread_release_lock,
         METH_NOARGS, release_doc},
        {"locked_lock",  (PyCFunction)lock_locked_lock,
         METH_NOARGS, locked_doc},
        {"locked",       (PyCFunction)lock_locked_lock,
         METH_NOARGS, locked_doc},
        {"__enter__",    (PyCFunction)(void(*)(void))lock_PyThread_acquire_lock,
         METH_VARARGS | METH_KEYWORDS, acquire_doc},
        {"__exit__",    (PyCFunction)lock_PyThread_release_lock,
         METH_VARARGS, release_doc},
        {NULL,           NULL}              /* sentinel */
    };
    
  • struct lockobject

    // Modules/_threadmodule.c:19
    typedef struct {
        PyObject_HEAD
        PyThread_type_lock lock_lock;
        PyObject *in_weakreflist;
        char locked; /* for sanity checking */
    } lockobject;
    

acquire, release された際の動作を見る。

  • lock_PyThread_acquire_lock()

    // Modules/_threadmodule.c:137
    static PyObject *
    lock_PyThread_acquire_lock(lockobject *self, PyObject *args, PyObject *kwds)
    {
        _PyTime_t timeout;
        PyLockStatus r;
    
        if (lock_acquire_parse_args(args, kwds, &timeout) < 0)
            return NULL;
    
        r = acquire_timed(self->lock_lock, timeout);
        if (r == PY_LOCK_INTR) {
            return NULL;
        }
    
        if (r == PY_LOCK_ACQUIRED)
            self->locked = 1;
        return PyBool_FromLong(r == PY_LOCK_ACQUIRED);
    }
    

本質は acquire_timed() ぽい。

  • acquire_timed()

    // Modules/_threadmodule.c:46
    static PyLockStatus
    acquire_timed(PyThread_type_lock lock, _PyTime_t timeout)
    {
        PyLockStatus r;
        _PyTime_t endtime = 0;
        _PyTime_t microseconds;
    
        if (timeout > 0)
            endtime = _PyTime_GetMonotonicClock() + timeout;
    
        do {
            microseconds = _PyTime_AsMicroseconds(timeout, _PyTime_ROUND_CEILING);
    
            /* first a simple non-blocking try without releasing the GIL */
            r = PyThread_acquire_lock_timed(lock, 0, 0);
            if (r == PY_LOCK_FAILURE && microseconds != 0) {
                Py_BEGIN_ALLOW_THREADS
                r = PyThread_acquire_lock_timed(lock, microseconds, 1);
                Py_END_ALLOW_THREADS
            }
    
            if (r == PY_LOCK_INTR) {
                /* Run signal handlers if we were interrupted.  Propagate
                 * exceptions from signal handlers, such as KeyboardInterrupt, by
                 * passing up PY_LOCK_INTR.  */
                if (Py_MakePendingCalls() < 0) {
                    return PY_LOCK_INTR;
                }
    
                /* If we're using a timeout, recompute the timeout after processing
                 * signals, since those can take time.  */
                if (timeout > 0) {
                    timeout = endtime - _PyTime_GetMonotonicClock();
    
                    /* Check for negative values, since those mean block forever.
                     */
                    if (timeout < 0) {
                        r = PY_LOCK_FAILURE;
                    }
                }
            }
        } while (r == PY_LOCK_INTR);  /* Retry if we were interrupted. */
    
        return r;
    }
    

PyThread_acquire_lock_timed() は実装が2つあり、以下のマクロによってどちらの実装を利用するか決められる:

// Python/thread_pthread.h:77
/* Whether or not to use semaphores directly rather than emulating them with
 * mutexes and condition variables:
 */
#if (defined(_POSIX_SEMAPHORES) && !defined(HAVE_BROKEN_POSIX_SEMAPHORES) && \
     defined(HAVE_SEM_TIMEDWAIT))
#  define USE_SEMAPHORES
#else
#  undef USE_SEMAPHORES
#endif

今回の環境は POSIX SEMAPHORE が使えるのでそちらの実装を確認する。

  • PyThread_acquire_lock_timed()

    PyLockStatus
    PyThread_acquire_lock_timed(PyThread_type_lock lock, PY_TIMEOUT_T microseconds,
                                int intr_flag)
    {
        PyLockStatus success;
        sem_t *thelock = (sem_t *)lock;
        int status, error = 0;
        struct timespec ts;
        _PyTime_t deadline = 0;
    
        (void) error; /* silence unused-but-set-variable warning */
        dprintf(("PyThread_acquire_lock_timed(%p, %lld, %d) called\n",
                 lock, microseconds, intr_flag));
    
        if (microseconds > PY_TIMEOUT_MAX) {
            Py_FatalError("Timeout larger than PY_TIMEOUT_MAX");
        }
    
        if (microseconds > 0) {
            MICROSECONDS_TO_TIMESPEC(microseconds, ts);
    
            if (!intr_flag) {
                /* cannot overflow thanks to (microseconds > PY_TIMEOUT_MAX)
                   check done above */
                _PyTime_t timeout = _PyTime_FromNanoseconds(microseconds * 1000);
                deadline = _PyTime_GetMonotonicClock() + timeout;
            }
        }
    
        while (1) {
            if (microseconds > 0) {
                status = fix_status(sem_timedwait(thelock, &ts));
            }
            else if (microseconds == 0) {
                status = fix_status(sem_trywait(thelock));
            }
            else {
                status = fix_status(sem_wait(thelock));
            }
    
            /* Retry if interrupted by a signal, unless the caller wants to be
               notified.  */
            if (intr_flag || status != EINTR) {
                break;
            }
    
            if (microseconds > 0) {
                /* wait interrupted by a signal (EINTR): recompute the timeout */
                _PyTime_t dt = deadline - _PyTime_GetMonotonicClock();
                if (dt < 0) {
                    status = ETIMEDOUT;
                    break;
                }
                else if (dt > 0) {
                    _PyTime_t realtime_deadline = _PyTime_GetSystemClock() + dt;
                    if (_PyTime_AsTimespec(realtime_deadline, &ts) < 0) {
                        /* Cannot occur thanks to (microseconds > PY_TIMEOUT_MAX)
                           check done above */
                        Py_UNREACHABLE();
                    }
                    /* no need to update microseconds value, the code only care
                       if (microseconds > 0 or (microseconds == 0). */
                }
                else {
                    microseconds = 0;
                }
            }
        }
    

タイムアウトの残り時間に応じて sem_timedwait , sem_trywait , sem_wait を使い分ける。

いずれも、POSIX SEMAPHORE として提供されている:

SEMAPHORES

SEM_WAIT

  • sem_wait : セマフォ減算。ブロッキング。
  • sem_trywait : セマフォ減算。ノンブロッキング。
  • sem_timedwait : セマフォ減算。ブロックのタイムアウトを設定可能。

つまり、 lockobjectacquire は、セマフォを獲得する。

続いて、 release を見る。

  • lock_PyThread_release_lock()

    // Modules/_threadmodule.c:167
    static PyObject *
    lock_PyThread_release_lock(lockobject *self, PyObject *Py_UNUSED(ignored))
    {
        /* Sanity check: the lock must be locked */
        if (!self->locked) {
            PyErr_SetString(ThreadError, "release unlocked lock");
            return NULL;
        }
    
        PyThread_release_lock(self->lock_lock);
        self->locked = 0;
        Py_RETURN_NONE;
    }
    
    PyDoc_STRVAR(release_doc,
    "release()\n\
    (release_lock() is an obsolete synonym)\n\
    \n\
    Release the lock, allowing another thread that is blocked waiting for\n\
    the lock to acquire the lock.  The lock must be in the locked state,\n\
    but it needn't be locked by the same thread that unlocks it.");
    
  • PyThread_release_lock()

    void
    PyThread_release_lock(PyThread_type_lock lock)
    {
        sem_t *thelock = (sem_t *)lock;
        int status, error = 0;
    
        (void) error; /* silence unused-but-set-variable warning */
        dprintf(("PyThread_release_lock(%p) called\n", lock));
    
        status = sem_post(thelock);
        CHECK_STATUS("sem_post");
    }
    

sem_post も POSIX SEMAPHORE で提供されている機能である:

SEM_POST

  • sem_post : セマフォ加算。ブロックしない。

つまり、 lockobject は OS のロック / セマフォ機構のラッパーである。

_tstate_lock の呼び出し

では、 _tstate_lock の解放はどこで行われているのだろうか。

実は、 Thread._set_tstate_lock()Thread._set_sentinel()thread_set_sentinel() において、スレッド消滅時にロックを解放するようにイベントハンドラが登録されている:

// Modules/_threadmodule.c:1211
static PyObject *
thread__set_sentinel(PyObject *self, PyObject *Py_UNUSED(ignored))
{
		...
    tstate->on_delete_data = (void *) wr;
    tstate->on_delete = &release_sentinel;
    return (PyObject *) lock;
}
  • release_sentinel()

    // Modules/_threadmodule.c:1189
    static void
    release_sentinel(void *wr_raw)
    {
        PyObject *wr = _PyObject_CAST(wr_raw);
        /* Tricky: this function is called when the current thread state
           is being deleted.  Therefore, only simple C code can safely
           execute here. */
        PyObject *obj = PyWeakref_GET_OBJECT(wr);
        lockobject *lock;
        if (obj != Py_None) {
            assert(Py_TYPE(obj) == &Locktype);
            lock = (lockobject *) obj;
            if (lock->locked) {
                PyThread_release_lock(lock->lock_lock);
                lock->locked = 0;
            }
        }
        /* Deallocating a weakref with a NULL callback only calls
           PyObject_GC_Del(), which can't call any Python code. */
        Py_DECREF(wr);
    }
    

PyThread_release_lock() を呼び出し、ロックを解放している。

では、 tstate->on_delete ハンドラはどこで呼び出されているのだろうか。

これは、 t_bootstrap() の末尾のスレッド終了処理にて行われている。 t_bootstrap() は別スレッド生成時に真っ先に実行される関数であった:

// Modules/_threadmodule.c:990
static void
t_bootstrap(void *boot_raw)
{
    ...
    PyThreadState_Clear(tstate);
    PyThreadState_DeleteCurrent();
    PyThread_exit_thread();
}
  • PyThreadState_DeleteCurrent()

    // Python/pystate.c:875
    void
    PyThreadState_DeleteCurrent()
    {
        _PyThreadState_DeleteCurrent(&_PyRuntime);
    }
    
  • _PyThreadState_DeleteCurrent()

    // Python/pystate.c:857
    static void
    _PyThreadState_DeleteCurrent(_PyRuntimeState *runtime)
    {
        struct _gilstate_runtime_state *gilstate = &runtime->gilstate;
        PyThreadState *tstate = _PyRuntimeGILState_GetThreadState(gilstate);
        if (tstate == NULL)
            Py_FatalError(
                "PyThreadState_DeleteCurrent: no current tstate");
        tstate_delete_common(runtime, tstate);
        if (gilstate->autoInterpreterState &&
            PyThread_tss_get(&gilstate->autoTSSkey) == tstate)
        {
            PyThread_tss_set(&gilstate->autoTSSkey, NULL);
        }
        _PyRuntimeGILState_SetThreadState(gilstate, NULL);
        PyEval_ReleaseLock();
    }
    
  • tstate_delete_common()

    // Python/pystate.c:808
    /* Common code for PyThreadState_Delete() and PyThreadState_DeleteCurrent() */
    static void
    tstate_delete_common(_PyRuntimeState *runtime, PyThreadState *tstate)
    {
        if (tstate == NULL) {
            Py_FatalError("PyThreadState_Delete: NULL tstate");
        }
        PyInterpreterState *interp = tstate->interp;
        if (interp == NULL) {
            Py_FatalError("PyThreadState_Delete: NULL interp");
        }
        HEAD_LOCK(runtime);
        if (tstate->prev)
            tstate->prev->next = tstate->next;
        else
            interp->tstate_head = tstate->next;
        if (tstate->next)
            tstate->next->prev = tstate->prev;
        HEAD_UNLOCK(runtime);
        if (tstate->on_delete != NULL) {
            tstate->on_delete(tstate->on_delete_data);
        }
        PyMem_RawFree(tstate);
    }
    

ここで tstate->on_deletetstate->on_delete_data が渡されて実行されていることがわかる。

まとめ

  • 別スレッドが生成され、アプリケーションコードが走る直前にロックが獲得される
  • アプリケーションコードの実行が終わり、スレッドが死ぬ寸前にロックが解放される
  • Thread.join() は、このロックに対してブロッキングの獲得を試みることで、スレッドが終了するまで処理をブロックしている

参考

【CPython3.6源码分析】Python 多线程机制


[draft] threading internal