Added patch to reimplement RtlQueueWorkItem on top of new threadpool API.

This commit is contained in:
Sebastian Lackner 2015-07-26 20:22:38 +02:00
parent 1b23958eb3
commit 7603fe0bc9
3 changed files with 315 additions and 0 deletions

View File

@ -0,0 +1,77 @@
From 7871b72dcbf44cd8bb63ace950aa69fcfa8adcbe Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Sun, 26 Jul 2015 23:23:57 +0200
Subject: ntdll/tests: Add basic tests for RtlQueueWorkItem.
---
dlls/ntdll/tests/threadpool.c | 47 +++++++++++++++++++++++++++++++++++++++++++
1 file changed, 47 insertions(+)
diff --git a/dlls/ntdll/tests/threadpool.c b/dlls/ntdll/tests/threadpool.c
index c69902b..7be3f0d 100644
--- a/dlls/ntdll/tests/threadpool.c
+++ b/dlls/ntdll/tests/threadpool.c
@@ -98,6 +98,51 @@ static BOOL init_threadpool(void)
#undef NTDLL_GET_PROC
+static DWORD CALLBACK rtl_work_cb(void *userdata)
+{
+ HANDLE semaphore = userdata;
+ trace("Running rtl_work callback\n");
+ ReleaseSemaphore(semaphore, 1, NULL);
+ return 0;
+}
+
+static void test_RtlQueueWorkItem(void)
+{
+ HANDLE semaphore;
+ NTSTATUS status;
+ DWORD result;
+
+ semaphore = CreateSemaphoreA(NULL, 0, 1, NULL);
+ ok(semaphore != NULL, "CreateSemaphoreA failed %u\n", GetLastError());
+
+ status = RtlQueueWorkItem(rtl_work_cb, semaphore, WT_EXECUTEDEFAULT);
+ ok(!status, "RtlQueueWorkItem failed with status %x\n", status);
+ result = WaitForSingleObject(semaphore, 1000);
+ ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
+
+ status = RtlQueueWorkItem(rtl_work_cb, semaphore, WT_EXECUTEINIOTHREAD);
+ ok(!status, "RtlQueueWorkItem failed with status %x\n", status);
+ result = WaitForSingleObject(semaphore, 1000);
+ ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
+
+ status = RtlQueueWorkItem(rtl_work_cb, semaphore, WT_EXECUTEINPERSISTENTTHREAD);
+ ok(!status, "RtlQueueWorkItem failed with status %x\n", status);
+ result = WaitForSingleObject(semaphore, 1000);
+ ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
+
+ status = RtlQueueWorkItem(rtl_work_cb, semaphore, WT_EXECUTELONGFUNCTION);
+ ok(!status, "RtlQueueWorkItem failed with status %x\n", status);
+ result = WaitForSingleObject(semaphore, 1000);
+ ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
+
+ status = RtlQueueWorkItem(rtl_work_cb, semaphore, WT_TRANSFER_IMPERSONATION);
+ ok(!status, "RtlQueueWorkItem failed with status %x\n", status);
+ result = WaitForSingleObject(semaphore, 1000);
+ ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
+
+ CloseHandle(semaphore);
+}
+
static void CALLBACK simple_cb(TP_CALLBACK_INSTANCE *instance, void *userdata)
{
HANDLE semaphore = userdata;
@@ -1292,6 +1337,8 @@ static void test_tp_multi_wait(void)
START_TEST(threadpool)
{
+ test_RtlQueueWorkItem();
+
if (!init_threadpool())
return;
--
2.4.5

View File

@ -0,0 +1,220 @@
From 9c243efb0149efa8d6b46555e36ab2c3d379769f Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Sun, 26 Jul 2015 23:24:06 +0200
Subject: ntdll: Reimplement RtlQueueWorkItem on top of new threadpool API.
---
dlls/ntdll/threadpool.c | 143 +++++++++---------------------------------------
1 file changed, 27 insertions(+), 116 deletions(-)
diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c
index 23091f3..ad4951f 100644
--- a/dlls/ntdll/threadpool.c
+++ b/dlls/ntdll/threadpool.c
@@ -42,44 +42,28 @@ WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
* Old thread pooling API
*/
-#define OLD_WORKER_TIMEOUT 30000 /* 30 seconds */
+struct rtl_work_item
+{
+ PRTL_WORK_ITEM_ROUTINE function;
+ PVOID context;
+};
+
#define EXPIRE_NEVER (~(ULONGLONG)0)
#define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
-static RTL_CRITICAL_SECTION_DEBUG critsect_debug;
static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug;
static struct
{
- /* threadpool_cs must be held while modifying the following four elements */
- struct list work_item_list;
- LONG num_workers;
- LONG num_busy_workers;
- LONG num_items_processed;
- RTL_CONDITION_VARIABLE threadpool_cond;
- RTL_CRITICAL_SECTION threadpool_cs;
HANDLE compl_port;
RTL_CRITICAL_SECTION threadpool_compl_cs;
}
old_threadpool =
{
- LIST_INIT(old_threadpool.work_item_list), /* work_item_list */
- 0, /* num_workers */
- 0, /* num_busy_workers */
- 0, /* num_items_processed */
- RTL_CONDITION_VARIABLE_INIT, /* threadpool_cond */
- { &critsect_debug, -1, 0, 0, 0, 0 }, /* threadpool_cs */
NULL, /* compl_port */
{ &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
};
-static RTL_CRITICAL_SECTION_DEBUG critsect_debug =
-{
- 0, 0, &old_threadpool.threadpool_cs,
- { &critsect_debug.ProcessLocksList, &critsect_debug.ProcessLocksList },
- 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") }
-};
-
static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
{
0, 0, &old_threadpool.threadpool_compl_cs,
@@ -87,13 +71,6 @@ static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
};
-struct work_item
-{
- struct list entry;
- PRTL_WORK_ITEM_ROUTINE function;
- PVOID context;
-};
-
struct wait_work_item
{
HANDLE Object;
@@ -364,47 +341,14 @@ static inline LONG interlocked_dec( PLONG dest )
return interlocked_xchg_add( dest, -1 ) - 1;
}
-static void WINAPI worker_thread_proc(void * param)
+static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata )
{
- struct list *item;
- struct work_item *work_item_ptr, work_item;
- LARGE_INTEGER timeout;
- timeout.QuadPart = -(OLD_WORKER_TIMEOUT * (ULONGLONG)10000);
-
- RtlEnterCriticalSection( &old_threadpool.threadpool_cs );
- old_threadpool.num_workers++;
-
- for (;;)
- {
- if ((item = list_head( &old_threadpool.work_item_list )))
- {
- work_item_ptr = LIST_ENTRY( item, struct work_item, entry );
- list_remove( &work_item_ptr->entry );
- old_threadpool.num_busy_workers++;
- old_threadpool.num_items_processed++;
- RtlLeaveCriticalSection( &old_threadpool.threadpool_cs );
-
- /* copy item to stack and do the work */
- work_item = *work_item_ptr;
- RtlFreeHeap( GetProcessHeap(), 0, work_item_ptr );
- TRACE("executing %p(%p)\n", work_item.function, work_item.context);
- work_item.function( work_item.context );
-
- RtlEnterCriticalSection( &old_threadpool.threadpool_cs );
- old_threadpool.num_busy_workers--;
- }
- else if (RtlSleepConditionVariableCS( &old_threadpool.threadpool_cond,
- &old_threadpool.threadpool_cs, &timeout ) != STATUS_SUCCESS)
- {
- break;
- }
- }
+ struct rtl_work_item *item = userdata;
- old_threadpool.num_workers--;
- RtlLeaveCriticalSection( &old_threadpool.threadpool_cs );
- RtlExitUserThread( 0 );
+ TRACE("executing %p(%p)\n", item->function, item->context);
+ item->function( item->context );
- /* never reached */
+ RtlFreeHeap( GetProcessHeap(), 0, item );
}
/***********************************************************************
@@ -413,9 +357,9 @@ static void WINAPI worker_thread_proc(void * param)
* Queues a work item into a thread in the thread pool.
*
* PARAMS
- * Function [I] Work function to execute.
- * Context [I] Context to pass to the work function when it is executed.
- * Flags [I] Flags. See notes.
+ * function [I] Work function to execute.
+ * context [I] Context to pass to the work function when it is executed.
+ * flags [I] Flags. See notes.
*
* RETURNS
* Success: STATUS_SUCCESS.
@@ -429,59 +373,26 @@ static void WINAPI worker_thread_proc(void * param)
*|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
*|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
*/
-NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context, ULONG Flags)
+NTSTATUS WINAPI RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags )
{
- HANDLE thread;
+ TP_CALLBACK_ENVIRON environment;
+ struct rtl_work_item *item;
NTSTATUS status;
- LONG items_processed;
- struct work_item *work_item = RtlAllocateHeap(GetProcessHeap(), 0, sizeof(struct work_item));
- if (!work_item)
+ item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
+ if (!item)
return STATUS_NO_MEMORY;
- work_item->function = Function;
- work_item->context = Context;
-
- if (Flags & ~WT_EXECUTELONGFUNCTION)
- FIXME("Flags 0x%x not supported\n", Flags);
-
- RtlEnterCriticalSection( &old_threadpool.threadpool_cs );
- list_add_tail( &old_threadpool.work_item_list, &work_item->entry );
- status = (old_threadpool.num_workers > old_threadpool.num_busy_workers) ?
- STATUS_SUCCESS : STATUS_UNSUCCESSFUL;
- items_processed = old_threadpool.num_items_processed;
- RtlLeaveCriticalSection( &old_threadpool.threadpool_cs );
-
- /* FIXME: tune this algorithm to not be as aggressive with creating threads
- * if WT_EXECUTELONGFUNCTION isn't specified */
- if (status == STATUS_SUCCESS)
- RtlWakeConditionVariable( &old_threadpool.threadpool_cond );
- else
- {
- status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
- worker_thread_proc, NULL, &thread, NULL );
+ memset( &environment, 0, sizeof(environment) );
+ environment.Version = 1;
+ environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
+ environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
- /* NOTE: we don't care if we couldn't create the thread if there is at
- * least one other available to process the request */
- if (status == STATUS_SUCCESS)
- NtClose( thread );
- else
- {
- RtlEnterCriticalSection( &old_threadpool.threadpool_cs );
- if (old_threadpool.num_workers > 0 ||
- old_threadpool.num_items_processed != items_processed)
- {
- status = STATUS_SUCCESS;
- }
- else
- list_remove( &work_item->entry );
- RtlLeaveCriticalSection( &old_threadpool.threadpool_cs );
-
- if (status != STATUS_SUCCESS)
- RtlFreeHeap( GetProcessHeap(), 0, work_item );
- }
- }
+ item->function = function;
+ item->context = context;
+ status = TpSimpleTryPost( process_rtl_work_item, item, &environment );
+ if (status) RtlFreeHeap( GetProcessHeap(), 0, item );
return status;
}
--
2.4.5

View File

@ -173,6 +173,7 @@ patch_enable_all ()
enable_ntdll_ThreadQuerySetWin32StartAddress="$1"
enable_ntdll_ThreadTime="$1"
enable_ntdll_Threading="$1"
enable_ntdll_Threadpool_Cleanup="$1"
enable_ntdll_User_Shared_Data="$1"
enable_ntdll_WRITECOPY="$1"
enable_ntdll_WinSqm="$1"
@ -590,6 +591,9 @@ patch_enable ()
ntdll-Threading)
enable_ntdll_Threading="$2"
;;
ntdll-Threadpool_Cleanup)
enable_ntdll_Threadpool_Cleanup="$2"
;;
ntdll-User_Shared_Data)
enable_ntdll_User_Shared_Data="$2"
;;
@ -3664,6 +3668,20 @@ if test "$enable_ntdll_Threading" -eq 1; then
) >> "$patchlist"
fi
# Patchset ntdll-Threadpool_Cleanup
# |
# | Modified files:
# | * dlls/ntdll/tests/threadpool.c, dlls/ntdll/threadpool.c
# |
if test "$enable_ntdll_Threadpool_Cleanup" -eq 1; then
patch_apply ntdll-Threadpool_Cleanup/0001-ntdll-tests-Add-basic-tests-for-RtlQueueWorkItem.patch
patch_apply ntdll-Threadpool_Cleanup/0002-ntdll-Reimplement-RtlQueueWorkItem-on-top-of-new-thr.patch
(
echo '+ { "Sebastian Lackner", "ntdll/tests: Add basic tests for RtlQueueWorkItem.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Reimplement RtlQueueWorkItem on top of new threadpool API.", 1 },';
) >> "$patchlist"
fi
# Patchset ntdll-User_Shared_Data
# |
# | Modified files: