Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1a39d41
switch to per thread tasks list
kumaraditya303 Jan 11, 2025
28735e7
traverse linked lists of all threads
kumaraditya303 Jan 12, 2025
3052330
cleanup
kumaraditya303 Jan 14, 2025
acef821
add comments
kumaraditya303 Jan 15, 2025
237a089
add a per interp tasks list
kumaraditya303 Jan 16, 2025
c12f271
fixup for regular builds
kumaraditya303 Jan 16, 2025
d36270f
fix missing start world
kumaraditya303 Jan 16, 2025
f863cd5
Merge branch 'main' of https://github.com/python/cpython into per-thr…
kumaraditya303 Jan 21, 2025
d0fbbc2
add interp llist
kumaraditya303 Jan 21, 2025
5f5b95e
add more tests
kumaraditya303 Jan 21, 2025
708b410
fix test
kumaraditya303 Jan 21, 2025
2c1e93e
try fix compilation
kumaraditya303 Jan 21, 2025
c0ce361
remove supp
kumaraditya303 Jan 21, 2025
aa4c2d1
Merge branch 'main' of https://github.com/python/cpython into per-thr…
kumaraditya303 Jan 24, 2025
fd3fc0b
Merge branch 'main' of https://github.com/python/cpython into per-thr…
kumaraditya303 Jan 24, 2025
c45c9d7
fix merge
kumaraditya303 Jan 24, 2025
f0a9e53
Merge branch 'main' into per-thread-tasks
kumaraditya303 Jan 25, 2025
5b33c45
code review
kumaraditya303 Jan 29, 2025
1d73843
cleanup
kumaraditya303 Jan 29, 2025
4014e70
add comment for asyncio_tasks_lock
kumaraditya303 Jan 29, 2025
83aec26
Reword explanation
ambv Feb 6, 2025
e52741f
Merge branch 'main' into per-thread-tasks
ambv Feb 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Include/internal/pycore_interp.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ struct _is {
PyMutex weakref_locks[NUM_WEAKREF_LIST_LOCKS];
_PyIndexPool tlbc_indices;
#endif

// Per-interpreter list of tasks, any lingering tasks from thread
// states gets added here and removed from the corresponding
// thread state's list.
struct llist_node asyncio_tasks_head;
// Per-interpreter state for the obmalloc allocator. For the main
// interpreter and for all interpreters that don't have their
// own obmalloc state, this points to the static structure in
Expand Down
2 changes: 1 addition & 1 deletion Include/internal/pycore_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ typedef enum _PyLockFlags {

// Lock a mutex with an optional timeout and additional options. See
// _PyLockFlags for details.
extern PyLockStatus
extern PyAPI_FUNC(PyLockStatus)
_PyMutex_LockTimed(PyMutex *m, PyTime_t timeout_ns, _PyLockFlags flags);

// Lock a mutex with additional options. See _PyLockFlags for details.
Expand Down
4 changes: 2 additions & 2 deletions Include/internal/pycore_pystate.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ extern void _PyEval_StartTheWorldAll(_PyRuntimeState *runtime);
// Perform a stop-the-world pause for threads in the specified interpreter.
//
// NOTE: This is a no-op outside of Py_GIL_DISABLED builds.
extern void _PyEval_StopTheWorld(PyInterpreterState *interp);
extern void _PyEval_StartTheWorld(PyInterpreterState *interp);
extern PyAPI_FUNC(void) _PyEval_StopTheWorld(PyInterpreterState *interp);
extern PyAPI_FUNC(void) _PyEval_StartTheWorld(PyInterpreterState *interp);


static inline void
Expand Down
5 changes: 5 additions & 0 deletions Include/internal/pycore_tstate.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@ typedef struct _PyThreadStateImpl {
PyObject *asyncio_running_loop; // Strong reference
PyObject *asyncio_running_task; // Strong reference

/* Head of circular linked-list of all tasks which are instances of `asyncio.Task`
or subclasses of it used in `asyncio.all_tasks`.
*/
struct llist_node asyncio_tasks_head;
struct _qsbr_thread_state *qsbr; // only used by free-threaded build
struct llist_node mem_free_queue; // delayed free queue


#ifdef Py_GIL_DISABLED
struct _gc_thread_state gc;
struct _mimalloc_thread_state mimalloc;
Expand Down
19 changes: 18 additions & 1 deletion Lib/test/test_asyncio/test_free_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import unittest
from threading import Thread
from unittest import TestCase

import weakref
from test import support
from test.support import threading_helper

threading_helper.requires_working_threading(module=True)
Expand Down Expand Up @@ -95,6 +96,22 @@ def check():
done.set()
runner.join()

def test_task_different_thread_finalized(self) -> None:
task = None
async def func():
nonlocal task
task = asyncio.current_task()

thread = Thread(target=lambda: asyncio.run(func()))
thread.start()
thread.join()
wr = weakref.ref(task)
del thread
del task
# task finalization in different thread shouldn't crash
support.gc_collect()
self.assertIsNone(wr())

def test_run_coroutine_threadsafe(self) -> None:
results = []

Expand Down
153 changes: 100 additions & 53 deletions Modules/_asynciomodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ typedef struct TaskObj {
PyObject *task_name;
PyObject *task_context;
struct llist_node task_node;
#ifdef Py_GIL_DISABLED
// thread id of the thread where this task was created
uintptr_t task_tid;
#endif
} TaskObj;

typedef struct {
Expand Down Expand Up @@ -94,14 +98,6 @@ typedef struct {
|| PyObject_TypeCheck(obj, state->FutureType) \
|| PyObject_TypeCheck(obj, state->TaskType))

#ifdef Py_GIL_DISABLED
# define ASYNCIO_STATE_LOCK(state) Py_BEGIN_CRITICAL_SECTION_MUT(&state->mutex)
# define ASYNCIO_STATE_UNLOCK(state) Py_END_CRITICAL_SECTION()
#else
# define ASYNCIO_STATE_LOCK(state) ((void)state)
# define ASYNCIO_STATE_UNLOCK(state) ((void)state)
#endif

typedef struct _Py_AsyncioModuleDebugOffsets {
struct _asyncio_task_object {
uint64_t size;
Expand Down Expand Up @@ -135,9 +131,6 @@ GENERATE_DEBUG_SECTION(AsyncioDebug, Py_AsyncioModuleDebugOffsets AsyncioDebug)

/* State of the _asyncio module */
typedef struct {
#ifdef Py_GIL_DISABLED
PyMutex mutex;
#endif
PyTypeObject *FutureIterType;
PyTypeObject *TaskStepMethWrapper_Type;
PyTypeObject *FutureType;
Expand Down Expand Up @@ -184,11 +177,6 @@ typedef struct {
/* Counter for autogenerated Task names */
uint64_t task_name_counter;

/* Head of circular linked-list of all tasks which are instances of `asyncio.Task`
or subclasses of it. Third party tasks implementations which don't inherit from
`asyncio.Task` are tracked separately using the `non_asyncio_tasks` WeakSet.
*/
struct llist_node asyncio_tasks_head;
} asyncio_state;

static inline asyncio_state *
Expand Down Expand Up @@ -2180,16 +2168,15 @@ static PyMethodDef TaskWakeupDef = {
static void
register_task(asyncio_state *state, TaskObj *task)
{
ASYNCIO_STATE_LOCK(state);
assert(Task_Check(state, task));
if (task->task_node.next != NULL) {
// already registered
assert(task->task_node.prev != NULL);
goto exit;
return;
}
llist_insert_tail(&state->asyncio_tasks_head, &task->task_node);
exit:
ASYNCIO_STATE_UNLOCK(state);
_PyThreadStateImpl *tstate = (_PyThreadStateImpl *) _PyThreadState_GET();
struct llist_node *head = &tstate->asyncio_tasks_head;
llist_insert_tail(head, &task->task_node);
}

static int
Expand All @@ -2198,19 +2185,38 @@ register_eager_task(asyncio_state *state, PyObject *task)
return PySet_Add(state->eager_tasks, task);
}

static void
unregister_task(asyncio_state *state, TaskObj *task)
static inline void
unregister_task_safe(TaskObj *task)
{
ASYNCIO_STATE_LOCK(state);
assert(Task_Check(state, task));
if (task->task_node.next == NULL) {
// not registered
assert(task->task_node.prev == NULL);
goto exit;
return;
}
llist_remove(&task->task_node);
exit:
ASYNCIO_STATE_UNLOCK(state);
}

static void
unregister_task(asyncio_state *state, TaskObj *task)
{
assert(Task_Check(state, task));
#ifdef Py_GIL_DISABLED
// check if we are in the same thread
// if so, we can avoid locking
if (task->task_tid == _Py_ThreadId()) {
unregister_task_safe(task);
}
else {
// we are in a different thread
// stop the world then check and remove the task
PyThreadState *tstate = _PyThreadState_GET();
_PyEval_StopTheWorld(tstate->interp);
unregister_task_safe(task);
_PyEval_StartTheWorld(tstate->interp);
}
#else
unregister_task_safe(task);
#endif
}

static int
Expand Down Expand Up @@ -2424,6 +2430,9 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop,
}

Py_CLEAR(self->task_fut_waiter);
#ifdef Py_GIL_DISABLED
self->task_tid = _Py_ThreadId();
#endif
self->task_must_cancel = 0;
self->task_log_destroy_pending = 1;
self->task_num_cancels_requested = 0;
Expand Down Expand Up @@ -3966,6 +3975,7 @@ _asyncio_current_task_impl(PyObject *module, PyObject *loop)
static inline int
add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *loop)
{
assert(PySet_CheckExact(tasks));
PyObject *done = PyObject_CallMethodNoArgs(task, &_Py_ID(done));
if (done == NULL) {
return -1;
Expand All @@ -3988,6 +3998,43 @@ add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *lo
return 0;
}

static inline int
add_tasks_interp(PyInterpreterState *interp, PyListObject *tasks)
{
#ifdef Py_GIL_DISABLED
assert(interp->stoptheworld.world_stopped);
#endif
// Start traversing from interpreter's linked list
struct llist_node *head = &interp->asyncio_tasks_head;
_PyThreadStateImpl *thead = (_PyThreadStateImpl *)interp->threads.head;

struct llist_node *node;
traverse:
llist_for_each_safe(node, head) {
TaskObj *task = llist_data(node, TaskObj, task_node);
// The linked list holds borrowed references to task
// as such it is possible that the task is concurrently
// deallocated while added to this list.
// To protect against concurrent deallocations,
// we first try to incref the task which would fail
// if it is concurrently getting deallocated in another thread,
// otherwise it gets added to the list.
if (_Py_TryIncref((PyObject *)task)) {
if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) {
// do not call any escaping calls here while holding the runtime lock.
return -1;
}
}
}
// traverse the linked lists of thread states
if (thead != NULL) {
head = &thead->asyncio_tasks_head;
thead = (_PyThreadStateImpl *)thead->base.next;
goto traverse;
}
return 0;
}

/*********************** Module **************************/

/*[clinic input]
Expand Down Expand Up @@ -4026,30 +4073,31 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop)
Py_DECREF(loop);
return NULL;
}
int err = 0;
ASYNCIO_STATE_LOCK(state);
struct llist_node *node;

llist_for_each_safe(node, &state->asyncio_tasks_head) {
TaskObj *task = llist_data(node, TaskObj, task_node);
// The linked list holds borrowed references to task
// as such it is possible that the task is concurrently
// deallocated while added to this list.
// To protect against concurrent deallocations,
// we first try to incref the task which would fail
// if it is concurrently getting deallocated in another thread,
// otherwise it gets added to the list.
if (_Py_TryIncref((PyObject *)task)) {
if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) {
Py_DECREF(tasks);
Py_DECREF(loop);
err = 1;
break;
}
}
}
ASYNCIO_STATE_UNLOCK(state);
if (err) {
PyInterpreterState *interp = PyInterpreterState_Get();
// Stop the world and traverse the per-thread linked list
// of asyncio tasks of all threads and the interpreter's
// linked list and them to tasks list.
// The interpreter linked list is used for any lingering tasks
// whose thread state has been deallocated but the task is
// still alive. This can happen if task is referenced by a
// different thread, in which case the task is moved to the
// interpreter's linked list from the thread's linked list
// before deallocation.
// Stop the world pause is required so that no thread
// modifies it's linked list while being iterated here
// concurrently.
// This design allows for lock free register/unregister of tasks
// of loops running concurrently in different threads (general case).
_PyEval_StopTheWorld(interp);
HEAD_LOCK(interp->runtime);
int ret = add_tasks_interp(interp, (PyListObject *)tasks);
HEAD_UNLOCK(interp->runtime);
_PyEval_StartTheWorld(interp);
if (ret < 0) {
// call any escaping calls after releasing the runtime lock
// and starting the world to avoid any deadlocks.
Py_DECREF(tasks);
Py_DECREF(loop);
return NULL;
}
PyObject *scheduled_iter = PyObject_GetIter(state->non_asyncio_tasks);
Expand Down Expand Up @@ -4321,7 +4369,6 @@ module_exec(PyObject *mod)
{
asyncio_state *state = get_asyncio_state(mod);

llist_init(&state->asyncio_tasks_head);

#define CREATE_TYPE(m, tp, spec, base) \
do { \
Expand Down
11 changes: 10 additions & 1 deletion Python/pystate.c
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ init_interpreter(PyInterpreterState *interp,
_Py_brc_init_state(interp);
#endif
llist_init(&interp->mem_free_queue.head);
llist_init(&interp->asyncio_tasks_head);
for (int i = 0; i < _PY_MONITORING_UNGROUPED_EVENTS; i++) {
interp->monitors.tools[i] = 0;
}
Expand Down Expand Up @@ -1520,7 +1521,7 @@ init_threadstate(_PyThreadStateImpl *_tstate,
tstate->delete_later = NULL;

llist_init(&_tstate->mem_free_queue);

llist_init(&_tstate->asyncio_tasks_head);
if (interp->stoptheworld.requested || _PyRuntime.stoptheworld.requested) {
// Start in the suspended state if there is an ongoing stop-the-world.
tstate->state = _Py_THREAD_SUSPENDED;
Expand Down Expand Up @@ -1700,6 +1701,14 @@ PyThreadState_Clear(PyThreadState *tstate)
Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_loop);
Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_task);


_PyEval_StopTheWorld(tstate->interp);
// merge any lingering tasks from thread state to interpreter's
// tasks list
llist_concat(&tstate->interp->asyncio_tasks_head,
&((_PyThreadStateImpl *)tstate)->asyncio_tasks_head);
_PyEval_StartTheWorld(tstate->interp);

Py_CLEAR(tstate->dict);
Py_CLEAR(tstate->async_exc);

Expand Down
Loading