2022-07-18 12:09:19 +02:00
/**************************************************************************/
/* worker_thread_pool.cpp */
/**************************************************************************/
/* This file is part of: */
/* GODOT ENGINE */
/* https://godotengine.org */
/**************************************************************************/
/* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */
/* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur. */
/* */
/* Permission is hereby granted, free of charge, to any person obtaining */
/* a copy of this software and associated documentation files (the */
/* "Software"), to deal in the Software without restriction, including */
/* without limitation the rights to use, copy, modify, merge, publish, */
/* distribute, sublicense, and/or sell copies of the Software, and to */
/* permit persons to whom the Software is furnished to do so, subject to */
/* the following conditions: */
/* */
/* The above copyright notice and this permission notice shall be */
/* included in all copies or substantial portions of the Software. */
/* */
/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */
/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
/**************************************************************************/
# include "worker_thread_pool.h"
2023-11-09 12:29:47 +01:00
# include "core/object/script_language.h"
2022-07-18 12:09:19 +02:00
# include "core/os/os.h"
2024-07-18 14:54:58 +02:00
# include "core/os/safe_binary_mutex.h"
2023-06-08 11:33:35 +02:00
# include "core/os/thread_safe.h"
2022-07-18 12:09:19 +02:00
2024-04-09 17:26:45 +02:00
WorkerThreadPool : : Task * const WorkerThreadPool : : ThreadData : : YIELDING = ( Task * ) 1 ;
2022-07-23 19:12:41 +02:00
void WorkerThreadPool : : Task : : free_template_userdata ( ) {
2023-09-09 16:11:33 +02:00
ERR_FAIL_NULL ( template_userdata ) ;
ERR_FAIL_NULL ( native_func_userdata ) ;
2022-07-23 19:12:41 +02:00
BaseTemplateUserdata * btu = ( BaseTemplateUserdata * ) native_func_userdata ;
memdelete ( btu ) ;
}
2022-07-18 12:09:19 +02:00
WorkerThreadPool * WorkerThreadPool : : singleton = nullptr ;
2024-06-19 08:09:59 +02:00
# ifdef THREADS_ENABLED
2024-07-18 14:54:58 +02:00
thread_local WorkerThreadPool : : UnlockableLocks WorkerThreadPool : : unlockable_locks [ MAX_UNLOCKABLE_LOCKS ] ;
2024-06-19 08:09:59 +02:00
# endif
2023-12-29 01:27:17 +01:00
2022-07-18 12:09:19 +02:00
void WorkerThreadPool : : _process_task ( Task * p_task ) {
2023-12-01 13:39:09 -05:00
# ifdef THREADS_ENABLED
2024-01-05 17:39:26 +01:00
int pool_thread_index = thread_ids [ Thread : : get_caller_id ( ) ] ;
ThreadData & curr_thread = threads [ pool_thread_index ] ;
Task * prev_task = nullptr ; // In case this is recursively called.
2024-06-13 10:14:14 +02:00
2024-01-05 17:39:26 +01:00
bool safe_for_nodes_backup = is_current_thread_safe_for_nodes ( ) ;
2024-06-13 10:14:14 +02:00
CallQueue * call_queue_backup = MessageQueue : : get_singleton ( ) ! = MessageQueue : : get_main_singleton ( ) ? MessageQueue : : get_singleton ( ) : nullptr ;
2023-05-17 00:00:45 +02:00
2024-01-05 17:39:26 +01:00
{
2024-07-09 18:41:24 +02:00
// Tasks must start with these at default values. They are free to set-and-forget otherwise.
2023-07-03 12:27:57 +02:00
set_current_thread_safe_for_nodes ( false ) ;
2024-07-09 18:41:24 +02:00
MessageQueue : : set_thread_singleton_override ( nullptr ) ;
2024-09-10 11:29:54 +02:00
2023-11-09 12:29:47 +01:00
// Since the WorkerThreadPool is started before the script server,
// its pre-created threads can't have ScriptServer::thread_enter() called on them early.
// Therefore, we do it late at the first opportunity, so in case the task
// about to be run uses scripting, guarantees are held.
2024-09-10 11:29:54 +02:00
ScriptServer : : thread_enter ( ) ;
2024-04-08 17:37:00 +02:00
task_mutex . lock ( ) ;
2023-05-17 00:00:45 +02:00
p_task - > pool_thread_index = pool_thread_index ;
2024-01-05 17:39:26 +01:00
prev_task = curr_thread . current_task ;
curr_thread . current_task = p_task ;
2024-04-18 19:05:44 +02:00
if ( p_task - > pending_notify_yield_over ) {
curr_thread . yield_is_over = true ;
}
2023-05-17 00:00:45 +02:00
task_mutex . unlock ( ) ;
}
2023-12-01 13:39:09 -05:00
# endif
2022-07-18 12:09:19 +02:00
2024-07-27 21:24:08 +08:00
# ifdef THREADS_ENABLED
bool low_priority = p_task - > low_priority ;
# endif
2022-07-18 12:09:19 +02:00
if ( p_task - > group ) {
// Handling a group
bool do_post = false ;
2022-07-23 19:12:41 +02:00
while ( true ) {
uint32_t work_index = p_task - > group - > index . postincrement ( ) ;
if ( work_index > = p_task - > group - > max ) {
break ;
}
if ( p_task - > native_group_func ) {
p_task - > native_group_func ( p_task - > native_func_userdata , work_index ) ;
} else if ( p_task - > template_userdata ) {
p_task - > template_userdata - > callback_indexed ( work_index ) ;
} else {
2023-07-11 16:18:10 +02:00
p_task - > callable . call ( work_index ) ;
2022-07-18 12:09:19 +02:00
}
2022-07-23 19:12:41 +02:00
// This is the only way to ensure posting is done when all tasks are really complete.
uint32_t completed_amount = p_task - > group - > completed_index . increment ( ) ;
if ( completed_amount = = p_task - > group - > max ) {
do_post = true ;
}
}
if ( do_post & & p_task - > template_userdata ) {
memdelete ( p_task - > template_userdata ) ; // This is no longer needed at this point, so get rid of it.
2022-07-18 12:09:19 +02:00
}
2024-01-05 17:39:26 +01:00
if ( do_post ) {
p_task - > group - > done_semaphore . post ( ) ;
p_task - > group - > completed . set_to ( true ) ;
}
uint32_t max_users = p_task - > group - > tasks_used + 1 ; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment.
uint32_t finished_users = p_task - > group - > finished . increment ( ) ;
2022-07-18 12:09:19 +02:00
2024-01-05 17:39:26 +01:00
if ( finished_users = = max_users ) {
// Get rid of the group, because nobody else is using it.
2024-08-28 11:27:01 +02:00
MutexLock task_lock ( task_mutex ) ;
2024-01-05 17:39:26 +01:00
group_allocator . free ( p_task - > group ) ;
2022-07-18 12:09:19 +02:00
}
2024-01-05 17:39:26 +01:00
// For groups, tasks get rid of themselves.
task_mutex . lock ( ) ;
task_allocator . free ( p_task ) ;
2022-07-18 12:09:19 +02:00
} else {
if ( p_task - > native_func ) {
p_task - > native_func ( p_task - > native_func_userdata ) ;
2022-07-23 19:12:41 +02:00
} else if ( p_task - > template_userdata ) {
p_task - > template_userdata - > callback ( ) ;
memdelete ( p_task - > template_userdata ) ;
2022-07-18 12:09:19 +02:00
} else {
2023-07-11 16:18:10 +02:00
p_task - > callable . call ( ) ;
2022-07-18 12:09:19 +02:00
}
2023-05-17 00:00:45 +02:00
task_mutex . lock ( ) ;
2022-07-18 12:09:19 +02:00
p_task - > completed = true ;
2024-01-05 17:39:26 +01:00
p_task - > pool_thread_index = - 1 ;
if ( p_task - > waiting_user ) {
p_task - > done_semaphore . post ( p_task - > waiting_user ) ;
2023-05-17 00:00:45 +02:00
}
2024-01-05 17:39:26 +01:00
// Let awaiters know.
for ( uint32_t i = 0 ; i < threads . size ( ) ; i + + ) {
if ( threads [ i ] . awaited_task = = p_task ) {
threads [ i ] . cond_var . notify_one ( ) ;
threads [ i ] . signaled = true ;
}
2023-05-17 00:00:45 +02:00
}
2022-07-18 12:09:19 +02:00
}
2023-12-01 13:39:09 -05:00
# ifdef THREADS_ENABLED
2024-01-05 17:39:26 +01:00
{
curr_thread . current_task = prev_task ;
2024-07-27 21:24:08 +08:00
if ( low_priority ) {
2023-05-11 12:24:59 +02:00
low_priority_threads_used - - ;
2023-05-17 00:00:45 +02:00
2024-01-05 17:39:26 +01:00
if ( _try_promote_low_priority_task ( ) ) {
if ( prev_task ) { // Otherwise, this thread will catch it.
_notify_threads ( & curr_thread , 1 , 0 ) ;
}
2023-05-17 00:00:45 +02:00
}
2022-07-18 12:09:19 +02:00
}
2024-01-05 17:39:26 +01:00
2023-05-11 12:24:59 +02:00
task_mutex . unlock ( ) ;
2022-07-18 12:09:19 +02:00
}
2024-01-05 17:39:26 +01:00
set_current_thread_safe_for_nodes ( safe_for_nodes_backup ) ;
2024-06-13 10:14:14 +02:00
MessageQueue : : set_thread_singleton_override ( call_queue_backup ) ;
2023-12-01 13:39:09 -05:00
# endif
2022-07-18 12:09:19 +02:00
}
void WorkerThreadPool : : _thread_function ( void * p_user ) {
2024-01-05 17:39:26 +01:00
ThreadData * thread_data = ( ThreadData * ) p_user ;
2024-09-16 18:03:36 +02:00
2022-07-18 12:09:19 +02:00
while ( true ) {
2024-01-05 17:39:26 +01:00
Task * task_to_process = nullptr ;
{
MutexLock lock ( singleton - > task_mutex ) ;
2024-09-16 18:03:36 +02:00
2024-09-16 11:51:57 +02:00
bool exit = singleton - > _handle_runlevel ( thread_data , lock ) ;
2024-09-13 14:20:11 +02:00
if ( unlikely ( exit ) ) {
2024-09-16 18:03:36 +02:00
break ;
2024-01-05 17:39:26 +01:00
}
2024-09-16 18:03:36 +02:00
2024-01-05 17:39:26 +01:00
thread_data - > signaled = false ;
if ( singleton - > task_queue . first ( ) ) {
task_to_process = singleton - > task_queue . first ( ) - > self ( ) ;
singleton - > task_queue . remove ( singleton - > task_queue . first ( ) ) ;
} else {
thread_data - > cond_var . wait ( lock ) ;
}
}
if ( task_to_process ) {
singleton - > _process_task ( task_to_process ) ;
2022-07-18 12:09:19 +02:00
}
}
}
2024-09-16 11:51:57 +02:00
void WorkerThreadPool : : _post_tasks ( Task * * p_tasks , uint32_t p_count , bool p_high_priority , MutexLock < BinaryMutex > & p_lock ) {
2023-02-03 23:41:43 -05:00
// Fall back to processing on the calling thread if there are no worker threads.
// Separated into its own variable to make it easier to extend this logic
// in custom builds.
bool process_on_calling_thread = threads . size ( ) = = 0 ;
if ( process_on_calling_thread ) {
2024-09-16 11:51:57 +02:00
p_lock . temp_unlock ( ) ;
2024-01-05 17:39:26 +01:00
for ( uint32_t i = 0 ; i < p_count ; i + + ) {
_process_task ( p_tasks [ i ] ) ;
}
2024-09-16 11:51:57 +02:00
p_lock . temp_relock ( ) ;
2023-02-03 23:41:43 -05:00
return ;
}
2024-09-16 11:51:57 +02:00
while ( runlevel = = RUNLEVEL_EXIT_LANGUAGES ) {
control_cond_var . wait ( p_lock ) ;
}
2024-01-05 17:39:26 +01:00
uint32_t to_process = 0 ;
uint32_t to_promote = 0 ;
2023-02-03 23:41:43 -05:00
2024-01-05 17:39:26 +01:00
ThreadData * caller_pool_thread = thread_ids . has ( Thread : : get_caller_id ( ) ) ? & threads [ thread_ids [ Thread : : get_caller_id ( ) ] ] : nullptr ;
for ( uint32_t i = 0 ; i < p_count ; i + + ) {
p_tasks [ i ] - > low_priority = ! p_high_priority ;
if ( p_high_priority | | low_priority_threads_used < max_low_priority_threads ) {
task_queue . add_last ( & p_tasks [ i ] - > task_elem ) ;
if ( ! p_high_priority ) {
low_priority_threads_used + + ;
}
to_process + + ;
} else {
// Too many threads using low priority, must go to queue.
low_priority_task_queue . add_last ( & p_tasks [ i ] - > task_elem ) ;
to_promote + + ;
2023-02-03 23:41:43 -05:00
}
2024-01-05 17:39:26 +01:00
}
_notify_threads ( caller_pool_thread , to_process , to_promote ) ;
}
void WorkerThreadPool : : _notify_threads ( const ThreadData * p_current_thread_data , uint32_t p_process_count , uint32_t p_promote_count ) {
uint32_t to_process = p_process_count ;
uint32_t to_promote = p_promote_count ;
// This is where which threads are awaken is decided according to the workload.
// Threads that will anyway have a chance to check the situation and process/promote tasks
// are excluded from being notified. Others will be tried anyway to try to distribute load.
// The current thread, if is a pool thread, is also excluded depending on the promoting/processing
// needs because it will anyway loop again. However, it will contribute to decreasing the count,
// which helps reducing sync traffic.
uint32_t thread_count = threads . size ( ) ;
// First round:
// 1. For processing: notify threads that are not running tasks, to keep the stacks as shallow as possible.
// 2. For promoting: since it's exclusive with processing, we fin threads able to promote low-prio tasks now.
for ( uint32_t i = 0 ;
i < thread_count & & ( to_process | | to_promote ) ;
i + + , notify_index = ( notify_index + 1 ) % thread_count ) {
ThreadData & th = threads [ notify_index ] ;
if ( th . signaled ) {
continue ;
}
if ( th . current_task ) {
// Good thread for promoting low-prio?
if ( to_promote & & th . awaited_task & & th . current_task - > low_priority ) {
if ( likely ( & th ! = p_current_thread_data ) ) {
th . cond_var . notify_one ( ) ;
}
th . signaled = true ;
to_promote - - ;
}
} else {
if ( to_process ) {
if ( likely ( & th ! = p_current_thread_data ) ) {
th . cond_var . notify_one ( ) ;
}
th . signaled = true ;
to_process - - ;
}
}
}
// Second round:
// For processing: if the first round wasn't enough, let's try now with threads processing tasks but currently awaiting.
for ( uint32_t i = 0 ;
i < thread_count & & to_process ;
i + + , notify_index = ( notify_index + 1 ) % thread_count ) {
ThreadData & th = threads [ notify_index ] ;
if ( th . signaled ) {
continue ;
}
if ( th . awaited_task ) {
if ( likely ( & th ! = p_current_thread_data ) ) {
th . cond_var . notify_one ( ) ;
}
th . signaled = true ;
to_process - - ;
2022-07-18 12:09:19 +02:00
}
}
}
2023-05-17 00:00:45 +02:00
bool WorkerThreadPool : : _try_promote_low_priority_task ( ) {
if ( low_priority_task_queue . first ( ) ) {
Task * low_prio_task = low_priority_task_queue . first ( ) - > self ( ) ;
low_priority_task_queue . remove ( low_priority_task_queue . first ( ) ) ;
task_queue . add_last ( & low_prio_task - > task_elem ) ;
low_priority_threads_used + + ;
return true ;
} else {
return false ;
}
}
2022-07-18 12:09:19 +02:00
WorkerThreadPool : : TaskID WorkerThreadPool : : add_native_task ( void ( * p_func ) ( void * ) , void * p_userdata , bool p_high_priority , const String & p_description ) {
2022-07-23 19:12:41 +02:00
return _add_task ( Callable ( ) , p_func , p_userdata , nullptr , p_high_priority , p_description ) ;
}
WorkerThreadPool : : TaskID WorkerThreadPool : : _add_task ( const Callable & p_callable , void ( * p_func ) ( void * ) , void * p_userdata , BaseTemplateUserdata * p_template_userdata , bool p_high_priority , const String & p_description ) {
2024-09-16 11:51:57 +02:00
MutexLock < BinaryMutex > lock ( task_mutex ) ;
2022-07-18 12:09:19 +02:00
// Get a free task
Task * task = task_allocator . alloc ( ) ;
TaskID id = last_task + + ;
2024-01-05 17:39:26 +01:00
task - > self = id ;
2022-07-23 19:12:41 +02:00
task - > callable = p_callable ;
2022-07-18 12:09:19 +02:00
task - > native_func = p_func ;
task - > native_func_userdata = p_userdata ;
task - > description = p_description ;
2022-07-23 19:12:41 +02:00
task - > template_userdata = p_template_userdata ;
2022-07-18 12:09:19 +02:00
tasks . insert ( id , task ) ;
2024-09-16 11:51:57 +02:00
_post_tasks ( & task , 1 , p_high_priority , lock ) ;
2022-07-18 12:09:19 +02:00
return id ;
}
WorkerThreadPool : : TaskID WorkerThreadPool : : add_task ( const Callable & p_action , bool p_high_priority , const String & p_description ) {
2022-07-23 19:12:41 +02:00
return _add_task ( p_action , nullptr , nullptr , nullptr , p_high_priority , p_description ) ;
2022-07-18 12:09:19 +02:00
}
bool WorkerThreadPool : : is_task_completed ( TaskID p_task_id ) const {
2024-08-28 11:27:01 +02:00
MutexLock task_lock ( task_mutex ) ;
2022-07-18 12:09:19 +02:00
const Task * const * taskp = tasks . getptr ( p_task_id ) ;
if ( ! taskp ) {
ERR_FAIL_V_MSG ( false , " Invalid Task ID " ) ; // Invalid task
}
2024-08-28 11:27:01 +02:00
return ( * taskp ) - > completed ;
2022-07-18 12:09:19 +02:00
}
2023-05-17 00:00:45 +02:00
Error WorkerThreadPool : : wait_for_task_completion ( TaskID p_task_id ) {
2022-07-18 12:09:19 +02:00
task_mutex . lock ( ) ;
Task * * taskp = tasks . getptr ( p_task_id ) ;
if ( ! taskp ) {
task_mutex . unlock ( ) ;
2023-05-17 00:00:45 +02:00
ERR_FAIL_V_MSG ( ERR_INVALID_PARAMETER , " Invalid Task ID " ) ; // Invalid task
2022-07-18 12:09:19 +02:00
}
Task * task = * taskp ;
2024-01-05 17:39:26 +01:00
if ( task - > completed ) {
if ( task - > waiting_pool = = 0 & & task - > waiting_user = = 0 ) {
tasks . erase ( p_task_id ) ;
task_allocator . free ( task ) ;
2022-07-18 12:09:19 +02:00
}
2023-05-17 00:00:45 +02:00
task_mutex . unlock ( ) ;
2024-01-05 17:39:26 +01:00
return OK ;
2023-05-17 00:00:45 +02:00
}
2024-01-05 17:39:26 +01:00
ThreadData * caller_pool_thread = thread_ids . has ( Thread : : get_caller_id ( ) ) ? & threads [ thread_ids [ Thread : : get_caller_id ( ) ] ] : nullptr ;
if ( caller_pool_thread & & p_task_id < = caller_pool_thread - > current_task - > self ) {
// Deadlock prevention:
// When a pool thread wants to wait for an older task, the following situations can happen:
// 1. Awaited task is deep in the stack of the awaiter.
// 2. A group of awaiter threads end up depending on some tasks buried in the stack
// of their worker threads in such a way that progress can't be made.
// Both would entail a deadlock. Some may be handled here in the WorkerThreadPool
// with some extra logic and bookkeeping. However, there would still be unavoidable
// cases of deadlock because of the way waiting threads process outstanding tasks.
// Taking into account there's no feasible solution for every possible case
// with the current design, we just simply reject attempts to await on older tasks,
// with a specific error code that signals the situation so the caller can handle it.
task_mutex . unlock ( ) ;
return ERR_BUSY ;
}
if ( caller_pool_thread ) {
task - > waiting_pool + + ;
} else {
task - > waiting_user + + ;
2022-07-18 12:09:19 +02:00
}
2024-01-05 17:39:26 +01:00
if ( caller_pool_thread ) {
2024-07-15 11:22:39 +02:00
task_mutex . unlock ( ) ;
2024-04-09 17:26:45 +02:00
_wait_collaboratively ( caller_pool_thread , task ) ;
2024-07-15 11:22:39 +02:00
task_mutex . lock ( ) ;
2024-04-09 17:26:45 +02:00
task - > waiting_pool - - ;
if ( task - > waiting_pool = = 0 & & task - > waiting_user = = 0 ) {
tasks . erase ( p_task_id ) ;
task_allocator . free ( task ) ;
2024-01-05 17:39:26 +01:00
}
} else {
2024-07-15 11:22:39 +02:00
task_mutex . unlock ( ) ;
2024-01-05 17:39:26 +01:00
task - > done_semaphore . wait ( ) ;
task_mutex . lock ( ) ;
task - > waiting_user - - ;
if ( task - > waiting_pool = = 0 & & task - > waiting_user = = 0 ) {
tasks . erase ( p_task_id ) ;
task_allocator . free ( task ) ;
}
}
2024-07-15 11:22:39 +02:00
task_mutex . unlock ( ) ;
2023-05-17 00:00:45 +02:00
return OK ;
2022-07-18 12:09:19 +02:00
}
2024-06-19 08:09:59 +02:00
void WorkerThreadPool : : _lock_unlockable_mutexes ( ) {
# ifdef THREADS_ENABLED
2024-07-18 14:54:58 +02:00
for ( uint32_t i = 0 ; i < MAX_UNLOCKABLE_LOCKS ; i + + ) {
if ( unlockable_locks [ i ] . ulock ) {
unlockable_locks [ i ] . ulock - > lock ( ) ;
2024-06-19 08:09:59 +02:00
}
}
# endif
}
void WorkerThreadPool : : _unlock_unlockable_mutexes ( ) {
# ifdef THREADS_ENABLED
2024-07-18 14:54:58 +02:00
for ( uint32_t i = 0 ; i < MAX_UNLOCKABLE_LOCKS ; i + + ) {
if ( unlockable_locks [ i ] . ulock ) {
unlockable_locks [ i ] . ulock - > unlock ( ) ;
2024-06-19 08:09:59 +02:00
}
}
# endif
}
2024-04-09 17:26:45 +02:00
void WorkerThreadPool : : _wait_collaboratively ( ThreadData * p_caller_pool_thread , Task * p_task ) {
// Keep processing tasks until the condition to stop waiting is met.
while ( true ) {
Task * task_to_process = nullptr ;
2024-06-19 08:09:59 +02:00
bool relock_unlockables = false ;
2024-04-09 17:26:45 +02:00
{
MutexLock lock ( task_mutex ) ;
2024-09-16 18:03:36 +02:00
2024-04-09 17:26:45 +02:00
bool was_signaled = p_caller_pool_thread - > signaled ;
p_caller_pool_thread - > signaled = false ;
2024-09-16 11:51:57 +02:00
bool exit = _handle_runlevel ( p_caller_pool_thread , lock ) ;
2024-09-13 14:20:11 +02:00
if ( unlikely ( exit ) ) {
2024-09-16 18:03:36 +02:00
break ;
}
2024-08-28 15:26:08 +02:00
2024-09-16 18:03:36 +02:00
bool wait_is_over = false ;
if ( unlikely ( p_task = = ThreadData : : YIELDING ) ) {
if ( p_caller_pool_thread - > yield_is_over ) {
p_caller_pool_thread - > yield_is_over = false ;
wait_is_over = true ;
}
} else {
if ( p_task - > completed ) {
wait_is_over = true ;
}
}
if ( wait_is_over ) {
if ( was_signaled ) {
2024-04-09 17:26:45 +02:00
// This thread was awaken for some additional reason, but it's about to exit.
// Let's find out what may be pending and forward the requests.
uint32_t to_process = task_queue . first ( ) ? 1 : 0 ;
uint32_t to_promote = p_caller_pool_thread - > current_task - > low_priority & & low_priority_task_queue . first ( ) ? 1 : 0 ;
if ( to_process | | to_promote ) {
// This thread must be left alone since it won't loop again.
p_caller_pool_thread - > signaled = true ;
_notify_threads ( p_caller_pool_thread , to_process , to_promote ) ;
}
}
break ;
}
2024-09-16 18:03:36 +02:00
if ( p_caller_pool_thread - > current_task - > low_priority & & low_priority_task_queue . first ( ) ) {
if ( _try_promote_low_priority_task ( ) ) {
_notify_threads ( p_caller_pool_thread , 1 , 0 ) ;
2024-04-09 17:26:45 +02:00
}
2024-09-16 18:03:36 +02:00
}
2024-04-09 17:26:45 +02:00
2024-09-16 18:03:36 +02:00
if ( singleton - > task_queue . first ( ) ) {
task_to_process = task_queue . first ( ) - > self ( ) ;
task_queue . remove ( task_queue . first ( ) ) ;
}
2024-04-09 17:26:45 +02:00
2024-09-16 18:03:36 +02:00
if ( ! task_to_process ) {
p_caller_pool_thread - > awaited_task = p_task ;
2024-04-09 17:26:45 +02:00
2024-09-16 18:03:36 +02:00
_unlock_unlockable_mutexes ( ) ;
relock_unlockables = true ;
2024-04-09 17:26:45 +02:00
2024-09-16 18:03:36 +02:00
p_caller_pool_thread - > cond_var . wait ( lock ) ;
p_caller_pool_thread - > awaited_task = nullptr ;
2024-04-09 17:26:45 +02:00
}
}
2024-06-19 08:09:59 +02:00
if ( relock_unlockables ) {
_lock_unlockable_mutexes ( ) ;
}
2024-04-09 17:26:45 +02:00
if ( task_to_process ) {
_process_task ( task_to_process ) ;
}
}
}
2024-09-13 14:20:11 +02:00
void WorkerThreadPool : : _switch_runlevel ( Runlevel p_runlevel ) {
DEV_ASSERT ( p_runlevel > runlevel ) ;
runlevel = p_runlevel ;
2024-09-16 11:51:57 +02:00
memset ( & runlevel_data , 0 , sizeof ( runlevel_data ) ) ;
2024-09-13 14:20:11 +02:00
for ( uint32_t i = 0 ; i < threads . size ( ) ; i + + ) {
threads [ i ] . cond_var . notify_one ( ) ;
threads [ i ] . signaled = true ;
}
2024-09-16 11:51:57 +02:00
control_cond_var . notify_all ( ) ;
2024-09-13 14:20:11 +02:00
}
// Returns whether threads have to exit. This may perform the check about handling needed.
2024-09-16 11:51:57 +02:00
bool WorkerThreadPool : : _handle_runlevel ( ThreadData * p_thread_data , MutexLock < BinaryMutex > & p_lock ) {
bool exit = false ;
switch ( runlevel ) {
case RUNLEVEL_NORMAL : {
} break ;
case RUNLEVEL_PRE_EXIT_LANGUAGES : {
if ( ! p_thread_data - > pre_exited_languages ) {
if ( ! task_queue . first ( ) & & ! low_priority_task_queue . first ( ) ) {
p_thread_data - > pre_exited_languages = true ;
runlevel_data . pre_exit_languages . num_idle_threads + + ;
control_cond_var . notify_all ( ) ;
}
}
} break ;
case RUNLEVEL_EXIT_LANGUAGES : {
if ( ! p_thread_data - > exited_languages ) {
p_lock . temp_unlock ( ) ;
ScriptServer : : thread_exit ( ) ;
p_lock . temp_relock ( ) ;
p_thread_data - > exited_languages = true ;
runlevel_data . exit_languages . num_exited_threads + + ;
control_cond_var . notify_all ( ) ;
}
} break ;
case RUNLEVEL_EXIT : {
exit = true ;
} break ;
}
return exit ;
2024-09-13 14:20:11 +02:00
}
2024-04-09 17:26:45 +02:00
void WorkerThreadPool : : yield ( ) {
int th_index = get_thread_index ( ) ;
ERR_FAIL_COND_MSG ( th_index = = - 1 , " This function can only be called from a worker thread. " ) ;
_wait_collaboratively ( & threads [ th_index ] , ThreadData : : YIELDING ) ;
2024-09-10 11:29:54 +02:00
2024-09-16 11:51:57 +02:00
task_mutex . lock ( ) ;
if ( runlevel < RUNLEVEL_EXIT_LANGUAGES ) {
// If this long-lived task started before the scripting server was initialized,
// now is a good time to have scripting languages ready for the current thread.
// Otherwise, such a piece of setup won't happen unless another task has been
// run during the collaborative wait.
task_mutex . unlock ( ) ;
ScriptServer : : thread_enter ( ) ;
} else {
task_mutex . unlock ( ) ;
}
2024-04-09 17:26:45 +02:00
}
void WorkerThreadPool : : notify_yield_over ( TaskID p_task_id ) {
2024-08-28 11:27:01 +02:00
MutexLock task_lock ( task_mutex ) ;
2024-04-09 17:26:45 +02:00
Task * * taskp = tasks . getptr ( p_task_id ) ;
if ( ! taskp ) {
ERR_FAIL_MSG ( " Invalid Task ID. " ) ;
}
Task * task = * taskp ;
2024-04-18 19:05:44 +02:00
if ( task - > pool_thread_index = = - 1 ) { // Completed or not started yet.
if ( ! task - > completed ) {
// This avoids a race condition where a task is created and yield-over called before it's processed.
task - > pending_notify_yield_over = true ;
}
2024-04-17 18:32:14 +02:00
return ;
2024-04-09 17:26:45 +02:00
}
ThreadData & td = threads [ task - > pool_thread_index ] ;
td . yield_is_over = true ;
td . signaled = true ;
td . cond_var . notify_one ( ) ;
}
2022-07-23 19:12:41 +02:00
WorkerThreadPool : : GroupID WorkerThreadPool : : _add_group_task ( const Callable & p_callable , void ( * p_func ) ( void * , uint32_t ) , void * p_userdata , BaseTemplateUserdata * p_template_userdata , int p_elements , int p_tasks , bool p_high_priority , const String & p_description ) {
ERR_FAIL_COND_V ( p_elements < 0 , INVALID_TASK_ID ) ;
2022-07-18 12:09:19 +02:00
if ( p_tasks < 0 ) {
2023-06-29 19:44:29 +02:00
p_tasks = MAX ( 1u , threads . size ( ) ) ;
2022-07-18 12:09:19 +02:00
}
2024-09-16 11:51:57 +02:00
MutexLock < BinaryMutex > lock ( task_mutex ) ;
2022-07-18 12:09:19 +02:00
Group * group = group_allocator . alloc ( ) ;
GroupID id = last_task + + ;
group - > max = p_elements ;
group - > self = id ;
2022-07-23 19:12:41 +02:00
Task * * tasks_posted = nullptr ;
if ( p_elements = = 0 ) {
// Should really not call it with zero Elements, but at least it should work.
group - > completed . set_to ( true ) ;
group - > done_semaphore . post ( ) ;
group - > tasks_used = 0 ;
p_tasks = 0 ;
if ( p_template_userdata ) {
memdelete ( p_template_userdata ) ;
}
} else {
group - > tasks_used = p_tasks ;
tasks_posted = ( Task * * ) alloca ( sizeof ( Task * ) * p_tasks ) ;
for ( int i = 0 ; i < p_tasks ; i + + ) {
Task * task = task_allocator . alloc ( ) ;
task - > native_group_func = p_func ;
task - > native_func_userdata = p_userdata ;
task - > description = p_description ;
task - > group = group ;
task - > callable = p_callable ;
task - > template_userdata = p_template_userdata ;
tasks_posted [ i ] = task ;
// No task ID is used.
}
2022-07-18 12:09:19 +02:00
}
2022-07-23 19:12:41 +02:00
2022-07-18 12:09:19 +02:00
groups [ id ] = group ;
2024-09-16 11:51:57 +02:00
_post_tasks ( tasks_posted , p_tasks , p_high_priority , lock ) ;
2022-07-18 12:09:19 +02:00
return id ;
}
2022-07-23 19:12:41 +02:00
WorkerThreadPool : : GroupID WorkerThreadPool : : add_native_group_task ( void ( * p_func ) ( void * , uint32_t ) , void * p_userdata , int p_elements , int p_tasks , bool p_high_priority , const String & p_description ) {
return _add_group_task ( Callable ( ) , p_func , p_userdata , nullptr , p_elements , p_tasks , p_high_priority , p_description ) ;
}
2022-07-18 12:09:19 +02:00
WorkerThreadPool : : GroupID WorkerThreadPool : : add_group_task ( const Callable & p_action , int p_elements , int p_tasks , bool p_high_priority , const String & p_description ) {
2022-07-23 19:12:41 +02:00
return _add_group_task ( p_action , nullptr , nullptr , nullptr , p_elements , p_tasks , p_high_priority , p_description ) ;
2022-07-18 12:09:19 +02:00
}
2022-07-23 19:12:41 +02:00
uint32_t WorkerThreadPool : : get_group_processed_element_count ( GroupID p_group ) const {
2024-08-28 11:27:01 +02:00
MutexLock task_lock ( task_mutex ) ;
2022-07-23 19:12:41 +02:00
const Group * const * groupp = groups . getptr ( p_group ) ;
if ( ! groupp ) {
ERR_FAIL_V_MSG ( 0 , " Invalid Group ID " ) ;
}
2024-08-28 11:27:01 +02:00
return ( * groupp ) - > completed_index . get ( ) ;
2022-07-23 19:12:41 +02:00
}
2022-07-18 12:09:19 +02:00
bool WorkerThreadPool : : is_group_task_completed ( GroupID p_group ) const {
2024-08-28 11:27:01 +02:00
MutexLock task_lock ( task_mutex ) ;
2022-07-18 12:09:19 +02:00
const Group * const * groupp = groups . getptr ( p_group ) ;
if ( ! groupp ) {
ERR_FAIL_V_MSG ( false , " Invalid Group ID " ) ;
}
2024-08-28 11:27:01 +02:00
return ( * groupp ) - > completed . is_set ( ) ;
2022-07-18 12:09:19 +02:00
}
void WorkerThreadPool : : wait_for_group_task_completion ( GroupID p_group ) {
2023-12-01 13:39:09 -05:00
# ifdef THREADS_ENABLED
2022-07-18 12:09:19 +02:00
task_mutex . lock ( ) ;
Group * * groupp = groups . getptr ( p_group ) ;
task_mutex . unlock ( ) ;
if ( ! groupp ) {
2024-02-23 14:49:13 -05:00
ERR_FAIL_MSG ( " Invalid Group ID. " ) ;
2022-07-18 12:09:19 +02:00
}
2024-01-05 17:39:26 +01:00
{
Group * group = * groupp ;
2023-12-29 01:27:17 +01:00
2024-06-19 08:09:59 +02:00
_unlock_unlockable_mutexes ( ) ;
2022-07-18 12:09:19 +02:00
group - > done_semaphore . wait ( ) ;
2024-06-19 08:09:59 +02:00
_lock_unlockable_mutexes ( ) ;
2022-07-18 12:09:19 +02:00
uint32_t max_users = group - > tasks_used + 1 ; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment.
uint32_t finished_users = group - > finished . increment ( ) ; // fetch happens before inc, so increment later.
if ( finished_users = = max_users ) {
2022-09-30 14:23:36 +02:00
// All tasks using this group are gone (finished before the group), so clear the group too.
2024-08-28 11:27:01 +02:00
MutexLock task_lock ( task_mutex ) ;
2022-07-18 12:09:19 +02:00
group_allocator . free ( group ) ;
}
}
2024-08-28 11:27:01 +02:00
MutexLock task_lock ( task_mutex ) ; // This mutex is needed when Physics 2D and/or 3D is selected to run on a separate thread.
2022-10-20 17:47:20 +01:00
groups . erase ( p_group ) ;
2023-12-01 13:39:09 -05:00
# endif
2022-07-18 12:09:19 +02:00
}
2023-05-23 11:05:32 +02:00
int WorkerThreadPool : : get_thread_index ( ) {
Thread : : ID tid = Thread : : get_caller_id ( ) ;
return singleton - > thread_ids . has ( tid ) ? singleton - > thread_ids [ tid ] : - 1 ;
}
2024-07-10 12:53:14 +02:00
WorkerThreadPool : : TaskID WorkerThreadPool : : get_caller_task_id ( ) {
int th_index = get_thread_index ( ) ;
if ( th_index ! = - 1 & & singleton - > threads [ th_index ] . current_task ) {
return singleton - > threads [ th_index ] . current_task - > self ;
} else {
return INVALID_TASK_ID ;
}
}
2024-06-19 08:09:59 +02:00
# ifdef THREADS_ENABLED
2024-07-18 14:54:58 +02:00
uint32_t WorkerThreadPool : : _thread_enter_unlock_allowance_zone ( THREADING_NAMESPACE : : unique_lock < THREADING_NAMESPACE : : mutex > & p_ulock ) {
for ( uint32_t i = 0 ; i < MAX_UNLOCKABLE_LOCKS ; i + + ) {
DEV_ASSERT ( ( bool ) unlockable_locks [ i ] . ulock = = ( bool ) unlockable_locks [ i ] . rc ) ;
if ( unlockable_locks [ i ] . ulock = = & p_ulock ) {
2024-06-19 08:09:59 +02:00
// Already registered in the current thread.
2024-07-18 14:54:58 +02:00
unlockable_locks [ i ] . rc + + ;
return i ;
} else if ( ! unlockable_locks [ i ] . ulock ) {
unlockable_locks [ i ] . ulock = & p_ulock ;
unlockable_locks [ i ] . rc = 1 ;
2024-06-19 08:09:59 +02:00
return i ;
}
}
2024-07-18 14:54:58 +02:00
ERR_FAIL_V_MSG ( UINT32_MAX , " No more unlockable lock slots available. Engine bug. " ) ;
2024-06-19 08:09:59 +02:00
}
void WorkerThreadPool : : thread_exit_unlock_allowance_zone ( uint32_t p_zone_id ) {
2024-07-18 14:54:58 +02:00
DEV_ASSERT ( unlockable_locks [ p_zone_id ] . ulock & & unlockable_locks [ p_zone_id ] . rc ) ;
unlockable_locks [ p_zone_id ] . rc - - ;
if ( unlockable_locks [ p_zone_id ] . rc = = 0 ) {
unlockable_locks [ p_zone_id ] . ulock = nullptr ;
2024-06-19 08:09:59 +02:00
}
}
# endif
2024-01-05 17:39:26 +01:00
void WorkerThreadPool : : init ( int p_thread_count , float p_low_priority_task_ratio ) {
2022-07-18 12:09:19 +02:00
ERR_FAIL_COND ( threads . size ( ) > 0 ) ;
2024-09-13 14:20:11 +02:00
runlevel = RUNLEVEL_NORMAL ;
2022-07-18 12:09:19 +02:00
if ( p_thread_count < 0 ) {
p_thread_count = OS : : get_singleton ( ) - > get_default_thread_pool_size ( ) ;
}
2024-01-05 17:39:26 +01:00
max_low_priority_threads = CLAMP ( p_thread_count * p_low_priority_task_ratio , 1 , p_thread_count - 1 ) ;
2022-07-18 12:09:19 +02:00
2024-08-30 12:13:21 +02:00
print_verbose ( vformat ( " WorkerThreadPool: %d threads, %d max low-priority. " , p_thread_count , max_low_priority_threads ) ) ;
2022-07-18 12:09:19 +02:00
threads . resize ( p_thread_count ) ;
for ( uint32_t i = 0 ; i < threads . size ( ) ; i + + ) {
threads [ i ] . index = i ;
threads [ i ] . thread . start ( & WorkerThreadPool : : _thread_function , & threads [ i ] ) ;
thread_ids . insert ( threads [ i ] . thread . get_id ( ) , i ) ;
}
}
2024-09-16 11:51:57 +02:00
void WorkerThreadPool : : exit_languages_threads ( ) {
if ( threads . size ( ) = = 0 ) {
return ;
}
MutexLock lock ( task_mutex ) ;
// Wait until all threads are idle.
_switch_runlevel ( RUNLEVEL_PRE_EXIT_LANGUAGES ) ;
while ( runlevel_data . pre_exit_languages . num_idle_threads ! = threads . size ( ) ) {
control_cond_var . wait ( lock ) ;
}
// Wait until all threads have detached from scripting languages.
_switch_runlevel ( RUNLEVEL_EXIT_LANGUAGES ) ;
while ( runlevel_data . exit_languages . num_exited_threads ! = threads . size ( ) ) {
control_cond_var . wait ( lock ) ;
}
}
2022-07-18 12:09:19 +02:00
void WorkerThreadPool : : finish ( ) {
if ( threads . size ( ) = = 0 ) {
return ;
}
2023-12-28 19:31:28 +01:00
{
MutexLock lock ( task_mutex ) ;
SelfList < Task > * E = low_priority_task_queue . first ( ) ;
while ( E ) {
print_error ( " Task waiting was never re-claimed: " + E - > self ( ) - > description ) ;
E = E - > next ( ) ;
}
2024-09-13 14:20:11 +02:00
_switch_runlevel ( RUNLEVEL_EXIT ) ;
2022-07-18 12:09:19 +02:00
}
2022-12-29 01:24:45 +01:00
for ( ThreadData & data : threads ) {
data . thread . wait_to_finish ( ) ;
2022-07-18 12:09:19 +02:00
}
2023-12-28 19:31:28 +01:00
{
MutexLock lock ( task_mutex ) ;
for ( KeyValue < TaskID , Task * > & E : tasks ) {
task_allocator . free ( E . value ) ;
}
}
2022-07-18 12:09:19 +02:00
threads . clear ( ) ;
}
void WorkerThreadPool : : _bind_methods ( ) {
ClassDB : : bind_method ( D_METHOD ( " add_task " , " action " , " high_priority " , " description " ) , & WorkerThreadPool : : add_task , DEFVAL ( false ) , DEFVAL ( String ( ) ) ) ;
ClassDB : : bind_method ( D_METHOD ( " is_task_completed " , " task_id " ) , & WorkerThreadPool : : is_task_completed ) ;
ClassDB : : bind_method ( D_METHOD ( " wait_for_task_completion " , " task_id " ) , & WorkerThreadPool : : wait_for_task_completion ) ;
ClassDB : : bind_method ( D_METHOD ( " add_group_task " , " action " , " elements " , " tasks_needed " , " high_priority " , " description " ) , & WorkerThreadPool : : add_group_task , DEFVAL ( - 1 ) , DEFVAL ( false ) , DEFVAL ( String ( ) ) ) ;
ClassDB : : bind_method ( D_METHOD ( " is_group_task_completed " , " group_id " ) , & WorkerThreadPool : : is_group_task_completed ) ;
2022-07-23 19:12:41 +02:00
ClassDB : : bind_method ( D_METHOD ( " get_group_processed_element_count " , " group_id " ) , & WorkerThreadPool : : get_group_processed_element_count ) ;
2022-07-18 12:09:19 +02:00
ClassDB : : bind_method ( D_METHOD ( " wait_for_group_task_completion " , " group_id " ) , & WorkerThreadPool : : wait_for_group_task_completion ) ;
}
WorkerThreadPool : : WorkerThreadPool ( ) {
singleton = this ;
}
WorkerThreadPool : : ~ WorkerThreadPool ( ) {
2024-09-13 12:59:48 +02:00
finish ( ) ;
2022-07-18 12:09:19 +02:00
}