ntdll-Vista_Threadpool: Update threadpool patchset, implement additional wait object functions.

This commit is contained in:
Sebastian Lackner 2015-02-06 20:49:32 +01:00
parent dac872a3f7
commit 99fc70333a
13 changed files with 1355 additions and 409 deletions

View File

@ -38,7 +38,7 @@ Wine. All those differences are also documented on the
Included bug fixes and improvements
===================================
**Bugfixes and features included in the next upcoming release [9]:**
**Bugfixes and features included in the next upcoming release [10]:**
* Add implementation for CreateThreadpool ([Wine Bug #35192](https://bugs.winehq.org/show_bug.cgi?id=35192))
* Call DriverUnload function when unloading a device driver.
@ -47,6 +47,7 @@ Included bug fixes and improvements
* Fix mouse jittering in Planetside 2 ([Wine Bug #32913](https://bugs.winehq.org/show_bug.cgi?id=32913))
* Implement additional stubs for vcomp dlls ([Wine Bug #31640](https://bugs.winehq.org/show_bug.cgi?id=31640))
* Implement threadpool timers ([Wine Bug #37306](https://bugs.winehq.org/show_bug.cgi?id=37306))
* Implement threadpool wait objects
* Implement threadpool work items ([Wine Bug #32531](https://bugs.winehq.org/show_bug.cgi?id=32531))
* Jedi Knight: Dark Forces II crashes with winmm set to native ([Wine Bug #37983](https://bugs.winehq.org/show_bug.cgi?id=37983))

View File

@ -1,14 +1,17 @@
From ceedfdb35e4ee398e3a36aba16778a1fa0d3d58a Mon Sep 17 00:00:00 2001
From 45bd5d08b3663253eb51f167176f94111aa389bb Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Sun, 1 Feb 2015 17:38:16 +0100
Subject: ntdll: Implement threadpool, cleanup group and callback instance
functions.
functions. (rev 2)
Changes in try 2:
* Make sure that always at least one worker thread is present.
* Merge a fix for a race condition between tp_object_{submit,shutdown}.
---
dlls/ntdll/Makefile.in | 1 +
dlls/ntdll/ntdll.spec | 30 +-
dlls/ntdll/threadpool2.c | 966 +++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 982 insertions(+), 15 deletions(-)
dlls/ntdll/threadpool2.c | 978 +++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 994 insertions(+), 15 deletions(-)
create mode 100644 dlls/ntdll/threadpool2.c
diff --git a/dlls/ntdll/Makefile.in b/dlls/ntdll/Makefile.in
@ -102,10 +105,10 @@ index 771f669..256ec6d 100644
# @ stub TpTrimPools
diff --git a/dlls/ntdll/threadpool2.c b/dlls/ntdll/threadpool2.c
new file mode 100644
index 0000000..c4f54af
index 0000000..c4e2d21
--- /dev/null
+++ b/dlls/ntdll/threadpool2.c
@@ -0,0 +1,966 @@
@@ -0,0 +1,978 @@
+/*
+ * Vista Threadpool implementation
+ *
@ -277,7 +280,7 @@ index 0000000..c4f54af
+static BOOL tp_threadpool_release( struct threadpool *pool );
+static void tp_threadpool_shutdown( struct threadpool *pool );
+
+static NTSTATUS tp_object_submit( struct threadpool_object *object );
+static void tp_object_submit( struct threadpool_object *object );
+static BOOL tp_object_release( struct threadpool_object *object );
+static void tp_object_shutdown( struct threadpool_object *object );
+
@ -426,12 +429,14 @@ index 0000000..c4f54af
+static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
+{
+ struct threadpool *pool;
+ NTSTATUS status;
+ HANDLE thread;
+
+ pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
+ if (!pool)
+ return STATUS_NO_MEMORY;
+
+ pool->refcount = 1;
+ pool->refcount = 2; /* this thread + worker proc */
+ pool->shutdown = FALSE;
+
+ RtlInitializeCriticalSection( &pool->cs );
@ -443,9 +448,20 @@ index 0000000..c4f54af
+ pool->max_workers = 500;
+ pool->min_workers = 1;
+
+ pool->num_workers = 0;
+ pool->num_workers = 1;
+ pool->num_busy_workers = 0;
+
+ status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
+ threadpool_worker_proc, pool, &thread, NULL );
+ if (status != STATUS_SUCCESS)
+ {
+ pool->cs.DebugInfo->Spare[0] = 0;
+ RtlDeleteCriticalSection( &pool->cs );
+ RtlFreeHeap( GetProcessHeap(), 0, pool );
+ return status;
+ }
+ NtClose( thread );
+
+ TRACE("allocated threadpool %p\n", pool);
+
+ *out = pool;
@ -553,10 +569,10 @@ index 0000000..c4f54af
+ if (pool->shutdown)
+ break;
+
+ /* Wait for new tasks or until timeout expires. */
+ /* Wait for new tasks or until timeout expires. Never terminate the last worker. */
+ timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
+ if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs,
+ &timeout ) == STATUS_TIMEOUT && !list_head( &pool->pool ))
+ &timeout ) == STATUS_TIMEOUT && !list_head( &pool->pool ) && pool->num_workers > 1)
+ {
+ break;
+ }
@ -571,7 +587,7 @@ index 0000000..c4f54af
+ ***********************************************************************/
+
+static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
+ PVOID userdata, TP_CALLBACK_ENVIRON *environment )
+ PVOID userdata, TP_CALLBACK_ENVIRON *environment, BOOL submit_and_release )
+{
+ object->refcount = 1;
+ object->shutdown = FALSE;
@ -623,6 +639,14 @@ index 0000000..c4f54af
+ /* Increase reference-count on the pool */
+ interlocked_inc( &pool->refcount );
+
+ TRACE("allocated object %p of type %u\n", object, object->type);
+
+ /* For simple callbacks we have to run tp_object_submit before adding this object
+ * to the cleanup group. As soon as the cleanup group members are released ->shutdown
+ * will be set, and tp_object_submit would fail with an assertion. */
+ if (submit_and_release)
+ tp_object_submit( object );
+
+ /* Assign this object to a specific group. Please note that this has to be done
+ * as the last step before returning a pointer to the application, otherwise
+ * there is a risk of having race-conditions. */
@ -635,10 +659,16 @@ index 0000000..c4f54af
+ list_add_tail( &group->members, &object->group_entry );
+ RtlLeaveCriticalSection( &group->cs );
+ }
+
+ if (submit_and_release)
+ {
+ tp_object_shutdown( object );
+ tp_object_release( object );
+ }
+}
+
+static NTSTATUS tp_object_alloc_simple( struct threadpool_object **out, PTP_SIMPLE_CALLBACK callback,
+ PVOID userdata, TP_CALLBACK_ENVIRON *environment )
+static NTSTATUS tp_object_submit_simple( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
+ TP_CALLBACK_ENVIRON *environment )
+{
+ struct threadpool_object *object;
+ struct threadpool *pool;
@ -654,11 +684,8 @@ index 0000000..c4f54af
+
+ object->type = TP_OBJECT_TYPE_SIMPLE;
+ object->u.simple.callback = callback;
+ tp_object_initialize( object, pool, userdata, environment );
+ tp_object_initialize( object, pool, userdata, environment, TRUE );
+
+ TRACE("allocated object %p of type %u\n", object, object->type);
+
+ *out = object;
+ return STATUS_SUCCESS;
+}
+
@ -739,10 +766,9 @@ index 0000000..c4f54af
+ RtlLeaveCriticalSection( &pool->cs );
+}
+
+static NTSTATUS tp_object_submit( struct threadpool_object *object )
+static void tp_object_submit( struct threadpool_object *object )
+{
+ struct threadpool *pool = object->pool;
+ NTSTATUS status = STATUS_SUCCESS;
+
+ assert( !object->shutdown );
+ assert( !pool->shutdown );
@ -752,7 +778,9 @@ index 0000000..c4f54af
+ /* Start new worker threads if required (and allowed) */
+ if (pool->num_busy_workers >= pool->num_workers && pool->num_workers < pool->max_workers)
+ {
+ NTSTATUS status;
+ HANDLE thread;
+
+ status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
+ threadpool_worker_proc, pool, &thread, NULL );
+ if (status == STATUS_SUCCESS)
@ -761,25 +789,21 @@ index 0000000..c4f54af
+ pool->num_workers++;
+ NtClose( thread );
+ }
+ else if (pool->num_workers)
+ else
+ {
+ assert( pool->num_workers > 0 );
+ RtlWakeConditionVariable( &pool->update_event );
+ status = STATUS_SUCCESS;
+ }
+ }
+ else RtlWakeConditionVariable( &pool->update_event );
+
+ /* Queue work item into pool and increment refcount */
+ if (!status)
+ {
+ if (!object->num_pending_callbacks++)
+ list_add_tail( &pool->pool, &object->pool_entry );
+ if (!object->num_pending_callbacks++)
+ list_add_tail( &pool->pool, &object->pool_entry );
+
+ interlocked_inc(&object->refcount);
+ }
+ interlocked_inc( &object->refcount );
+
+ RtlLeaveCriticalSection( &pool->cs );
+ return status;
+}
+
+/***********************************************************************
@ -1060,17 +1084,8 @@ index 0000000..c4f54af
+ */
+NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment )
+{
+ struct threadpool_object *object;
+ NTSTATUS status;
+ TRACE("%p %p %p\n", callback, userdata, environment);
+ status = tp_object_alloc_simple( &object, callback, userdata, environment );
+ if (!status)
+ {
+ status = tp_object_submit( object );
+ tp_object_shutdown( object );
+ tp_object_release( object );
+ }
+ return status;
+ return tp_object_submit_simple( callback, userdata, environment );
+}
--
2.2.2

View File

@ -1,12 +1,12 @@
From 1dc1074ad196b6d869028d3dea283dbab8a8a707 Mon Sep 17 00:00:00 2001
From f0c8d1f64679e4777439ffc1b9d74d47287d72eb Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Sun, 1 Feb 2015 18:06:08 +0100
Subject: ntdll: Implement additional threadpool work item functions.
---
dlls/ntdll/ntdll.spec | 8 ++--
dlls/ntdll/threadpool2.c | 99 +++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 102 insertions(+), 5 deletions(-)
dlls/ntdll/threadpool2.c | 97 +++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 100 insertions(+), 5 deletions(-)
diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec
index 256ec6d..bf9e795 100644
@ -45,7 +45,7 @@ index 256ec6d..bf9e795 100644
+@ stdcall TpWaitForWork(ptr long)
@ stdcall -ret64 VerSetConditionMask(int64 long long)
diff --git a/dlls/ntdll/threadpool2.c b/dlls/ntdll/threadpool2.c
index c4f54af..acc477f 100644
index c4e2d21..a6fd141 100644
--- a/dlls/ntdll/threadpool2.c
+++ b/dlls/ntdll/threadpool2.c
@@ -123,7 +123,8 @@ struct threadpool_object
@ -84,15 +84,15 @@ index c4f54af..acc477f 100644
static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
{
return (struct threadpool_group *)group;
@@ -410,6 +423,16 @@ static void CALLBACK threadpool_worker_proc( void *param )
@@ -423,6 +436,16 @@ static void CALLBACK threadpool_worker_proc( void *param )
break;
}
+ case TP_OBJECT_TYPE_WORK:
+ {
+ TP_CALLBACK_INSTANCE *cb_instance = (TP_CALLBACK_INSTANCE *)&instance;
+ TRACE( "executing callback %p(%p, %p)\n",
+ object->u.work.callback, cb_instance, object->userdata );
+ TRACE( "executing callback %p(%p, %p, %p)\n",
+ object->u.work.callback, cb_instance, object->userdata, object );
+ object->u.work.callback( cb_instance, object->userdata, (TP_WORK *)object );
+ TRACE( "callback %p returned\n", object->u.work.callback );
+ break;
@ -101,7 +101,7 @@ index c4f54af..acc477f 100644
default:
FIXME( "callback type %u not implemented\n", object->type );
break;
@@ -554,6 +577,31 @@ static NTSTATUS tp_object_alloc_simple( struct threadpool_object **out, PTP_SIMP
@@ -578,6 +601,29 @@ static NTSTATUS tp_object_submit_simple( PTP_SIMPLE_CALLBACK callback, PVOID use
return STATUS_SUCCESS;
}
@ -122,9 +122,7 @@ index c4f54af..acc477f 100644
+
+ object->type = TP_OBJECT_TYPE_WORK;
+ object->u.work.callback = callback;
+ tp_object_initialize( object, pool, userdata, environment );
+
+ TRACE("allocated object %p of type %u\n", object, object->type);
+ tp_object_initialize( object, pool, userdata, environment, FALSE );
+
+ *out = object;
+ return STATUS_SUCCESS;
@ -133,7 +131,7 @@ index c4f54af..acc477f 100644
static BOOL tp_object_release( struct threadpool_object *object )
{
struct threadpool_group *group;
@@ -795,6 +843,16 @@ NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
@@ -816,6 +862,16 @@ NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
}
/***********************************************************************
@ -150,7 +148,7 @@ index c4f54af..acc477f 100644
* TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
*/
VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit )
@@ -889,6 +947,16 @@ VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
@@ -910,6 +966,16 @@ VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
}
/***********************************************************************
@ -167,7 +165,7 @@ index c4f54af..acc477f 100644
* TpReleaseCleanupGroup (NTDLL.@)
*/
VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
@@ -927,6 +995,20 @@ VOID WINAPI TpReleasePool( TP_POOL *pool )
@@ -948,6 +1014,20 @@ VOID WINAPI TpReleasePool( TP_POOL *pool )
}
/***********************************************************************
@ -188,9 +186,9 @@ index c4f54af..acc477f 100644
* TpSetPoolMaxThreads (NTDLL.@)
*/
VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
@@ -964,3 +1046,18 @@ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata, T
}
return status;
@@ -976,3 +1056,18 @@ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata, T
TRACE("%p %p %p\n", callback, userdata, environment);
return tp_object_submit_simple( callback, userdata, environment );
}
+
+/***********************************************************************

View File

@ -1,12 +1,12 @@
From 248c2929e4cef9d57b6ae641cbf428cfb73efe0f Mon Sep 17 00:00:00 2001
From e494481628d87eab64df7e8cb04047d2330c2847 Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Sun, 1 Feb 2015 18:21:28 +0100
Subject: ntdll: Implement threadpool timer functions.
Subject: ntdll: Implement threadpool timer functions. (rev 2)
---
dlls/ntdll/ntdll.spec | 10 +-
dlls/ntdll/threadpool2.c | 387 ++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 391 insertions(+), 6 deletions(-)
dlls/ntdll/threadpool2.c | 399 ++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 403 insertions(+), 6 deletions(-)
diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec
index bf9e795..f4328a9 100644
@ -58,7 +58,7 @@ index bf9e795..f4328a9 100644
@ stdcall TpWaitForWork(ptr long)
@ stdcall -ret64 VerSetConditionMask(int64 long long)
diff --git a/dlls/ntdll/threadpool2.c b/dlls/ntdll/threadpool2.c
index acc477f..4fa22bb 100644
index a6fd141..19096a0 100644
--- a/dlls/ntdll/threadpool2.c
+++ b/dlls/ntdll/threadpool2.c
@@ -124,7 +124,8 @@ struct threadpool_object
@ -71,7 +71,7 @@ index acc477f..4fa22bb 100644
} type;
/* arguments for callback */
@@ -140,6 +141,19 @@ struct threadpool_object
@@ -140,6 +141,21 @@ struct threadpool_object
{
PTP_WORK_CALLBACK callback;
} work;
@ -81,9 +81,11 @@ index acc477f..4fa22bb 100644
+ PTP_TIMER_CALLBACK callback;
+
+ /* information about the timer, locked via timerqueue.cs */
+ BOOL is_timer_set;
+ BOOL timer_initialized;
+ BOOL timer_pending;
+ struct list timer_entry;
+
+ BOOL timer_set;
+ ULONGLONG timeout;
+ LONG period;
+ LONG window_length;
@ -91,7 +93,7 @@ index acc477f..4fa22bb 100644
} u;
};
@@ -154,6 +168,35 @@ struct threadpool_group
@@ -154,6 +170,35 @@ struct threadpool_group
struct list members;
};
@ -100,9 +102,9 @@ index acc477f..4fa22bb 100644
+static struct
+{
+ CRITICAL_SECTION cs;
+ BOOL thread_running;
+
+ /* number of timer objects total */
+ BOOL thread_running;
+ LONG num_timers;
+
+ /* list of pending timers */
@ -127,7 +129,7 @@ index acc477f..4fa22bb 100644
static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
{
return (struct threadpool *)pool;
@@ -166,6 +209,13 @@ static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
@@ -166,6 +211,13 @@ static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
return object;
}
@ -141,7 +143,7 @@ index acc477f..4fa22bb 100644
static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
{
return (struct threadpool_group *)group;
@@ -177,6 +227,7 @@ static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CAL
@@ -177,6 +229,7 @@ static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CAL
}
static void CALLBACK threadpool_worker_proc( void *param );
@ -149,7 +151,7 @@ index acc477f..4fa22bb 100644
static NTSTATUS tp_threadpool_alloc( struct threadpool **out );
static BOOL tp_threadpool_release( struct threadpool *pool );
@@ -189,6 +240,222 @@ static void tp_object_shutdown( struct threadpool_object *object );
@@ -189,6 +242,243 @@ static void tp_object_shutdown( struct threadpool_object *object );
static BOOL tp_group_release( struct threadpool_group *group );
/***********************************************************************
@ -162,9 +164,20 @@ index acc477f..4fa22bb 100644
+ * specific implementation differences, like handling several events at
+ * once using a windowlength parameter. */
+
+static NTSTATUS tp_timerqueue_acquire( void )
+static NTSTATUS tp_timerqueue_acquire( struct threadpool_object *timer )
+{
+ NTSTATUS status = STATUS_SUCCESS;
+ assert( timer->type == TP_OBJECT_TYPE_TIMER );
+
+ timer->u.timer.timer_initialized = TRUE;
+ timer->u.timer.timer_pending = FALSE;
+ memset( &timer->u.timer.timer_entry, 0, sizeof(timer->u.timer.timer_entry) );
+
+ timer->u.timer.timer_set = FALSE;
+ timer->u.timer.timeout = 0;
+ timer->u.timer.period = 0;
+ timer->u.timer.window_length = 0;
+
+ RtlEnterCriticalSection( &timerqueue.cs );
+
+ if (!timerqueue.thread_running)
@ -184,71 +197,83 @@ index acc477f..4fa22bb 100644
+ return status;
+}
+
+static void tp_timerqueue_release( void )
+static void tp_timerqueue_release( struct threadpool_object *timer )
+{
+ assert( timer->type == TP_OBJECT_TYPE_TIMER );
+ RtlEnterCriticalSection( &timerqueue.cs );
+ if (!--timerqueue.num_timers)
+
+ if (timer->u.timer.timer_initialized)
+ {
+ assert( list_empty( &timerqueue.pending_timers ) );
+ RtlWakeAllConditionVariable( &timerqueue.update_event );
+ if (timer->u.timer.timer_pending)
+ {
+ list_remove( &timer->u.timer.timer_entry );
+ timer->u.timer.timer_pending = FALSE;
+ }
+
+ if (!--timerqueue.num_timers)
+ {
+ assert( list_empty( &timerqueue.pending_timers ) );
+ RtlWakeAllConditionVariable( &timerqueue.update_event );
+ }
+
+ timer->u.timer.timer_initialized = FALSE;
+ }
+
+ RtlLeaveCriticalSection( &timerqueue.cs );
+}
+
+static void tp_timerqueue_update_timer( struct threadpool_object *new_timer, LARGE_INTEGER *timeout,
+ LONG period, LONG window_length )
+{
+ BOOL queue_timer = FALSE, delete_timer = FALSE;
+ BOOL submit_timer = FALSE;
+ struct threadpool_object *timer;
+ ULONGLONG when;
+
+ assert( new_timer->type == TP_OBJECT_TYPE_TIMER );
+ RtlEnterCriticalSection( &timerqueue.cs );
+ assert( new_timer->u.timer.timer_initialized );
+
+ /* Remember if the timer is set or unset */
+ new_timer->u.timer.is_timer_set = timeout != NULL;
+ new_timer->u.timer.timer_set = timeout != NULL;
+
+ if (!timeout)
+ goto update_timer;
+
+ /* A timeout of zero is a special case, it means that the callback is queued immediately */
+ if ((when = timeout->QuadPart) == 0)
+ if (timeout)
+ {
+ queue_timer = TRUE;
+ when = timeout->QuadPart;
+
+ if (!period)
+ /* A timeout of zero means that the timer should be submitted immediately */
+ if (when == 0)
+ {
+ timeout = NULL;
+ goto update_timer;
+ submit_timer = TRUE;
+ if (!period)
+ {
+ timeout = NULL;
+ goto update_timer;
+ }
+ when = (ULONGLONG)period * -10000;
+ }
+
+ when = (ULONGLONG)period * -10000;
+ }
+
+ /* Convert relative timeouts into absolute timeouts */
+ if ((LONGLONG)when < 0)
+ {
+ LARGE_INTEGER now;
+ NtQuerySystemTime( &now );
+ when = now.QuadPart - when;
+ /* Convert relative timeout to absolute */
+ if ((LONGLONG)when < 0)
+ {
+ LARGE_INTEGER now;
+ NtQuerySystemTime( &now );
+ when = now.QuadPart - when;
+ }
+ }
+
+update_timer:
+
+ /* If timer is still pending, then remove the old one */
+ /* If timer is still pending, then remove the old one */
+ if (new_timer->u.timer.timer_pending)
+ {
+ list_remove( &new_timer->u.timer.timer_entry );
+ memset( &new_timer->u.timer.timer_entry, 0, sizeof(new_timer->u.timer.timer_entry) );
+ new_timer->u.timer.timer_pending = FALSE;
+
+ /* defer calls to tp_object_release until we are done */
+ delete_timer = TRUE;
+ }
+
+ /* Timer should be enabled again, add it to the queue */
+ if (timeout)
+ {
+ interlocked_inc( &new_timer->refcount );
+ new_timer->u.timer.timeout = when;
+ new_timer->u.timer.period = period;
+ new_timer->u.timer.window_length = window_length;
@ -270,10 +295,9 @@ index acc477f..4fa22bb 100644
+ }
+
+ RtlLeaveCriticalSection( &timerqueue.cs );
+ if (queue_timer)
+
+ if (submit_timer)
+ tp_object_submit( new_timer );
+ if (delete_timer)
+ tp_object_release( new_timer );
+}
+
+static void CALLBACK timerqueue_thread_proc( void *param )
@ -321,9 +345,8 @@ index acc477f..4fa22bb 100644
+ }
+ else
+ {
+ /* We no longer need the reference to this timer object */
+ /* The element is no longer queued */
+ timer->u.timer.timer_pending = FALSE;
+ tp_object_release( timer );
+ }
+ }
+
@ -372,15 +395,15 @@ index acc477f..4fa22bb 100644
* THREADPOOL INSTANCE IMPLEMENTATION
***********************************************************************/
@@ -433,6 +700,16 @@ static void CALLBACK threadpool_worker_proc( void *param )
@@ -446,6 +736,16 @@ static void CALLBACK threadpool_worker_proc( void *param )
break;
}
+ case TP_OBJECT_TYPE_TIMER:
+ {
+ TP_CALLBACK_INSTANCE *cb_instance = (TP_CALLBACK_INSTANCE *)&instance;
+ TRACE( "executing callback %p(%p, %p)\n",
+ object->u.timer.callback, cb_instance, object->userdata );
+ TRACE( "executing callback %p(%p, %p, %p)\n",
+ object->u.timer.callback, cb_instance, object->userdata, object );
+ object->u.timer.callback( cb_instance, object->userdata, (TP_TIMER *)object );
+ TRACE( "callback %p returned\n", object->u.timer.callback );
+ break;
@ -389,7 +412,7 @@ index acc477f..4fa22bb 100644
default:
FIXME( "callback type %u not implemented\n", object->type );
break;
@@ -602,6 +879,48 @@ static NTSTATUS tp_object_alloc_work( struct threadpool_object **out, PTP_WORK_C
@@ -624,6 +924,38 @@ static NTSTATUS tp_object_alloc_work( struct threadpool_object **out, PTP_WORK_C
return STATUS_SUCCESS;
}
@ -409,27 +432,17 @@ index acc477f..4fa22bb 100644
+ if (!object)
+ return STATUS_NO_MEMORY;
+
+ status = tp_timerqueue_acquire();
+ object->type = TP_OBJECT_TYPE_TIMER;
+ object->u.timer.callback = callback;
+
+ status = tp_timerqueue_acquire( object );
+ if (status)
+ {
+ RtlFreeHeap( GetProcessHeap(), 0, object );
+ return status;
+ }
+
+ object->type = TP_OBJECT_TYPE_TIMER;
+ object->u.timer.callback = callback;
+
+ object->u.timer.timer_pending = FALSE;
+ memset( &object->u.timer.timer_entry, 0, sizeof(object->u.timer.timer_entry));
+
+ object->u.timer.is_timer_set = FALSE;
+ object->u.timer.timeout = 0;
+ object->u.timer.period = 0;
+ object->u.timer.window_length = 0;
+
+ tp_object_initialize( object, pool, userdata, environment );
+
+ TRACE("allocated object %p of type %u\n", object, object->type);
+ tp_object_initialize( object, pool, userdata, environment, FALSE );
+
+ *out = object;
+ return STATUS_SUCCESS;
@ -438,21 +451,20 @@ index acc477f..4fa22bb 100644
static BOOL tp_object_release( struct threadpool_object *object )
{
struct threadpool_group *group;
@@ -624,6 +943,13 @@ static BOOL tp_object_release( struct threadpool_object *object )
tp_group_release( group );
}
@@ -659,6 +991,12 @@ static BOOL tp_object_release( struct threadpool_object *object )
+ /* release reference on the timerqueue */
static void tp_object_shutdown( struct threadpool_object *object )
{
+ if (object->type == TP_OBJECT_TYPE_TIMER)
+ {
+ assert( !object->u.timer.timer_pending );
+ tp_timerqueue_release();
+ /* release reference on the timerqueue */
+ tp_timerqueue_release( object );
+ }
+
/* release reference to library */
if (object->race_dll)
LdrUnloadDll( object->race_dll );
@@ -843,6 +1169,16 @@ NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
object->shutdown = TRUE;
}
@@ -862,6 +1200,16 @@ NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
}
/***********************************************************************
@ -469,7 +481,7 @@ index acc477f..4fa22bb 100644
* TpAllocWork (NTDLL.@)
*/
NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
@@ -947,6 +1283,16 @@ VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
@@ -966,6 +1314,16 @@ VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
}
/***********************************************************************
@ -479,14 +491,14 @@ index acc477f..4fa22bb 100644
+{
+ struct threadpool_object *this = impl_from_TP_TIMER( timer );
+ TRACE("%p\n", timer);
+ return this ? this->u.timer.is_timer_set : FALSE;
+ return this ? this->u.timer.timer_set : FALSE;
+}
+
+/***********************************************************************
* TpPostWork (NTDLL.@)
*/
VOID WINAPI TpPostWork( TP_WORK *work )
@@ -995,6 +1341,20 @@ VOID WINAPI TpReleasePool( TP_POOL *pool )
@@ -1014,6 +1372,20 @@ VOID WINAPI TpReleasePool( TP_POOL *pool )
}
/***********************************************************************
@ -507,7 +519,7 @@ index acc477f..4fa22bb 100644
* TpReleaseWork (NTDLL.@)
*/
VOID WINAPI TpReleaseWork( TP_WORK *work )
@@ -1030,6 +1390,16 @@ BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
@@ -1049,6 +1421,16 @@ BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
}
/***********************************************************************
@ -524,7 +536,7 @@ index acc477f..4fa22bb 100644
* TpSimpleTryPost (NTDLL.@)
*/
NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment )
@@ -1048,6 +1418,21 @@ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata, T
@@ -1058,6 +1440,21 @@ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata, T
}
/***********************************************************************

View File

@ -1,12 +1,12 @@
From 19b57d882236efe0708f7d495ca7301a6c4955e4 Mon Sep 17 00:00:00 2001
From b2606e85a68efcaf3e3b3a0011c44900c5641359 Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Sun, 1 Feb 2015 18:14:09 +0100
Subject: ntdll/tests: Add tests for Tp* threadpool functions.
---
dlls/ntdll/tests/Makefile.in | 1 +
dlls/ntdll/tests/threadpool.c | 779 ++++++++++++++++++++++++++++++++++++++++++
2 files changed, 780 insertions(+)
dlls/ntdll/tests/threadpool.c | 780 ++++++++++++++++++++++++++++++++++++++++++
2 files changed, 781 insertions(+)
create mode 100644 dlls/ntdll/tests/threadpool.c
diff --git a/dlls/ntdll/tests/Makefile.in b/dlls/ntdll/tests/Makefile.in
@ -21,10 +21,10 @@ index 81b4466..fc352dd 100644
time.c
diff --git a/dlls/ntdll/tests/threadpool.c b/dlls/ntdll/tests/threadpool.c
new file mode 100644
index 0000000..434bd9a
index 0000000..61f72ec
--- /dev/null
+++ b/dlls/ntdll/tests/threadpool.c
@@ -0,0 +1,779 @@
@@ -0,0 +1,780 @@
+/* Unit test suite for Threadpool functions
+ *
+ * Copyright 2015 Sebastian Lackner
@ -743,6 +743,7 @@ index 0000000..434bd9a
+ ok(!status, "TpAllocTimer failed with status %x\n", status);
+ ok(timer != NULL, "expected timer != NULL\n");
+
+ timer2 = NULL;
+ status = pTpAllocTimer(&timer2, window_length_cb, &ticks2, &environment);
+ ok(!status, "TpAllocTimer failed with status %x\n", status);
+ ok(timer2 != NULL, "expected timer2 != NULL\n");

View File

@ -1,4 +1,4 @@
From 3477bdc9e48312f6188f9c9beb8d062291f8b81f Mon Sep 17 00:00:00 2001
From 9775c66000942a2ae9e52abe0d27dcca20a24470 Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Sun, 1 Feb 2015 19:41:13 +0100
Subject: kernel32: Forward various threadpool functions to ntdll.

View File

@ -0,0 +1,717 @@
From 9c751685e15f98981a26275d2102b839fbee3472 Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Fri, 6 Feb 2015 00:32:26 +0100
Subject: ntdll: Implement threadpool wait objects.
---
dlls/ntdll/ntdll.spec | 8 +-
dlls/ntdll/threadpool2.c | 490 ++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 487 insertions(+), 11 deletions(-)
diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec
index f4328a9..17c71bc 100644
--- a/dlls/ntdll/ntdll.spec
+++ b/dlls/ntdll/ntdll.spec
@@ -974,7 +974,7 @@
# @ stub TpAllocJobNotification
@ stdcall TpAllocPool(ptr ptr)
@ stdcall TpAllocTimer(ptr ptr ptr)
-# @ stub TpAllocWait
+@ stdcall TpAllocWait(ptr ptr ptr ptr)
@ stdcall TpAllocWork(ptr ptr ptr ptr)
# @ stub TpAlpcRegisterCompletionList
# @ stub TpAlpcUnregisterCompletionList
@@ -1005,7 +1005,7 @@
# @ stub TpReleaseJobNotification
@ stdcall TpReleasePool(ptr)
@ stdcall TpReleaseTimer(ptr)
-# @ stub TpReleaseWait
+@ stdcall TpReleaseWait(ptr)
@ stdcall TpReleaseWork(ptr)
# @ stub TpSetDefaultPoolMaxThreads
# @ stub TpSetDefaultPoolStackInformation
@@ -1017,7 +1017,7 @@
# @ stub TpSetPoolWorkerThreadIdleTimeout
@ stdcall TpSetTimer(ptr ptr long long)
# @ stub TpSetTimerEx
-# @ stub TpSetWait
+@ stdcall TpSetWait(ptr long ptr)
# @ stub TpSetWaitEx
@ stdcall TpSimpleTryPost(ptr ptr ptr)
# @ stub TpStartAsyncIoOperation
@@ -1028,5 +1028,5 @@
# @ stub TpWaitForJobNotification
@ stdcall TpWaitForTimer(ptr long)
-# @ stub TpWaitForWait
+@ stdcall TpWaitForWait(ptr long)
@ stdcall TpWaitForWork(ptr long)
@ stdcall -ret64 VerSetConditionMask(int64 long long)
diff --git a/dlls/ntdll/threadpool2.c b/dlls/ntdll/threadpool2.c
index 19096a0..ae98d7a 100644
--- a/dlls/ntdll/threadpool2.c
+++ b/dlls/ntdll/threadpool2.c
@@ -54,6 +54,7 @@ static inline LONG interlocked_dec( PLONG dest )
}
#define THREADPOOL_WORKER_TIMEOUT 5000
+#define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
/* allocated on the stack while a callback is running */
struct threadpool_instance
@@ -125,7 +126,8 @@ struct threadpool_object
TP_OBJECT_TYPE_UNDEFINED,
TP_OBJECT_TYPE_SIMPLE,
TP_OBJECT_TYPE_WORK,
- TP_OBJECT_TYPE_TIMER
+ TP_OBJECT_TYPE_TIMER,
+ TP_OBJECT_TYPE_WAIT
} type;
/* arguments for callback */
@@ -156,6 +158,20 @@ struct threadpool_object
LONG period;
LONG window_length;
} timer;
+ /* wait callback */
+ struct
+ {
+ PTP_WAIT_CALLBACK callback;
+ LONG signaled;
+
+ /* information about the wait, locked via waitqueue.cs */
+ struct waitqueue_bucket *bucket;
+ BOOL wait_pending;
+ struct list wait_entry;
+
+ ULONGLONG timeout;
+ HANDLE handle;
+ } wait;
} u;
};
@@ -199,6 +215,40 @@ static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
};
+struct waitqueue_bucket
+{
+ struct list bucket_entry;
+
+ /* list of reserved slots / pending waits */
+ LONG num_waits;
+ struct list reserved;
+ struct list waits;
+ HANDLE update_event;
+};
+
+/* global waitqueue object */
+static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
+static struct
+{
+ CRITICAL_SECTION cs;
+
+ /* list of buckets */
+ LONG num_buckets;
+ struct list buckets;
+}
+waitqueue =
+{
+ { &waitqueue_debug, -1, 0, 0, 0, 0 },
+ 0,
+ LIST_INIT( waitqueue.buckets )
+};
+static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
+{
+ 0, 0, &waitqueue.cs,
+ { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
+ 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
+};
+
static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
{
return (struct threadpool *)pool;
@@ -218,6 +268,13 @@ static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
return object;
}
+static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
+{
+ struct threadpool_object *object = (struct threadpool_object *)wait;
+ assert( !object || object->type == TP_OBJECT_TYPE_WAIT );
+ return object;
+}
+
static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
{
return (struct threadpool_group *)group;
@@ -230,12 +287,13 @@ static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CAL
static void CALLBACK threadpool_worker_proc( void *param );
static void CALLBACK timerqueue_thread_proc( void *param );
+static void CALLBACK waitqueue_thread_proc( void *param );
static NTSTATUS tp_threadpool_alloc( struct threadpool **out );
static BOOL tp_threadpool_release( struct threadpool *pool );
static void tp_threadpool_shutdown( struct threadpool *pool );
-static void tp_object_submit( struct threadpool_object *object );
+static void tp_object_submit( struct threadpool_object *object, BOOL success );
static BOOL tp_object_release( struct threadpool_object *object );
static void tp_object_shutdown( struct threadpool_object *object );
@@ -384,7 +442,7 @@ update_timer:
RtlLeaveCriticalSection( &timerqueue.cs );
if (submit_timer)
- tp_object_submit( new_timer );
+ tp_object_submit( new_timer, FALSE );
}
static void CALLBACK timerqueue_thread_proc( void *param )
@@ -411,7 +469,7 @@ static void CALLBACK timerqueue_thread_proc( void *param )
/* Queue a new callback in one of the worker threads */
list_remove( &timer->u.timer.timer_entry );
- tp_object_submit( timer );
+ tp_object_submit( timer, FALSE );
/* Requeue the timer, except its marked for shutdown */
if (!timer->shutdown && timer->u.timer.period)
@@ -477,6 +535,306 @@ static void CALLBACK timerqueue_thread_proc( void *param )
RtlLeaveCriticalSection( &timerqueue.cs );
}
+/***********************************************************************
+ * WAITQUEUE IMPLEMENTATION
+ ***********************************************************************/
+
+static NTSTATUS tp_waitqueue_acquire( struct threadpool_object *wait )
+{
+ struct waitqueue_bucket *bucket;
+ NTSTATUS status;
+ HANDLE thread;
+ assert( wait->type = TP_OBJECT_TYPE_WAIT );
+
+ wait->u.wait.signaled = 0;
+
+ wait->u.wait.bucket = NULL;
+ wait->u.wait.wait_pending = FALSE;
+ memset( &wait->u.wait.wait_entry, 0, sizeof(wait->u.wait.wait_entry) );
+
+ wait->u.wait.timeout = 0;
+ wait->u.wait.handle = INVALID_HANDLE_VALUE;
+
+ RtlEnterCriticalSection( &waitqueue.cs );
+
+ LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
+ {
+ if (bucket->num_waits < MAXIMUM_WAITQUEUE_OBJECTS)
+ {
+ bucket->num_waits++;
+ wait->u.wait.bucket = bucket;
+ list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
+
+ status = STATUS_SUCCESS;
+ goto out;
+ }
+ }
+
+ bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
+ if (!bucket)
+ {
+ status = STATUS_NO_MEMORY;
+ goto out;
+ }
+
+ bucket->num_waits = 1;
+ list_init( &bucket->reserved );
+ list_init( &bucket->waits );
+
+ status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE );
+ if (status)
+ {
+ RtlFreeHeap( GetProcessHeap(), 0, bucket );
+ goto out;
+ }
+
+ status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
+ waitqueue_thread_proc, bucket, &thread, NULL );
+ if (status == STATUS_SUCCESS)
+ {
+ waitqueue.num_buckets++;
+ list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
+
+ wait->u.wait.bucket = bucket;
+ list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
+
+ NtClose( thread );
+ }
+ else
+ {
+ NtClose( bucket->update_event );
+ RtlFreeHeap( GetProcessHeap(), 0, bucket );
+ }
+
+out:
+ RtlLeaveCriticalSection( &waitqueue.cs );
+ return status;
+}
+
+/* Decrement the refcount of a timer queue. */
+static void tp_waitqueue_release( struct threadpool_object *wait )
+{
+ assert( wait->type == TP_OBJECT_TYPE_WAIT );
+ RtlEnterCriticalSection( &waitqueue.cs );
+
+ if (wait->u.wait.bucket)
+ {
+ struct waitqueue_bucket *bucket = wait->u.wait.bucket;
+ assert( bucket->num_waits > 0 );
+
+ bucket->num_waits--;
+ list_remove( &wait->u.wait.wait_entry );
+ NtSetEvent( bucket->update_event, NULL );
+
+ wait->u.wait.bucket = NULL;
+ }
+
+ RtlLeaveCriticalSection( &waitqueue.cs );
+}
+
+static void tp_waitqueue_update_wait( struct threadpool_object *new_wait, HANDLE handle, LARGE_INTEGER *timeout )
+{
+ BOOL submit_wait = FALSE;
+
+ assert( new_wait->type == TP_OBJECT_TYPE_WAIT );
+ RtlEnterCriticalSection( &waitqueue.cs );
+ assert( new_wait->u.wait.bucket );
+
+ /* update wait handle */
+ new_wait->u.wait.handle = handle;
+
+ /* for performance reasons we only wake up when something has changed */
+ if (handle || new_wait->u.wait.wait_pending)
+ {
+ struct waitqueue_bucket *bucket = new_wait->u.wait.bucket;
+ list_remove( &new_wait->u.wait.wait_entry );
+
+ if (handle)
+ {
+ ULONGLONG when = TIMEOUT_INFINITE;
+
+ if (timeout)
+ {
+ when = timeout->QuadPart;
+
+ /* A timeout of zero means that the wait should be submitted immediately */
+ if (when == 0)
+ {
+ submit_wait = TRUE;
+ goto remove_wait;
+ }
+
+ /* Convert relative timeout to absolute */
+ if ((LONGLONG)when < 0)
+ {
+ LARGE_INTEGER now;
+ NtQuerySystemTime( &now );
+ when = now.QuadPart - when;
+ }
+ }
+
+ list_add_tail( &bucket->waits, &new_wait->u.wait.wait_entry );
+ new_wait->u.wait.wait_pending = TRUE;
+ new_wait->u.wait.timeout = when;
+ }
+ else
+ {
+remove_wait:
+ list_add_tail( &bucket->reserved, &new_wait->u.wait.wait_entry );
+ new_wait->u.wait.wait_pending = FALSE;
+ }
+
+ NtSetEvent( bucket->update_event, NULL );
+ }
+
+ RtlLeaveCriticalSection( &waitqueue.cs );
+
+ if (submit_wait)
+ tp_object_submit( new_wait, FALSE );
+}
+
+static void CALLBACK waitqueue_thread_proc( void *param )
+{
+ HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
+ struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
+ struct waitqueue_bucket *bucket = param;
+ LARGE_INTEGER now, timeout;
+ struct threadpool_object *wait, *next;
+ DWORD num_handles;
+ NTSTATUS status;
+
+ RtlEnterCriticalSection( &waitqueue.cs );
+
+ for (;;)
+ {
+ NtQuerySystemTime( &now );
+ timeout.QuadPart = TIMEOUT_INFINITE;
+ num_handles = 0;
+
+ LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waits, struct threadpool_object, u.wait.wait_entry )
+ {
+ assert( wait->type == TP_OBJECT_TYPE_WAIT );
+
+ /* Timeout expired or object was signaled */
+ if (wait->u.wait.timeout <= now.QuadPart)
+ {
+ list_remove( &wait->u.wait.wait_entry );
+ list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
+ tp_object_submit( wait, FALSE );
+ }
+ else
+ {
+ if (wait->u.wait.timeout < timeout.QuadPart)
+ timeout.QuadPart = wait->u.wait.timeout;
+
+ /* We will have to wait for this object - keep a reference to make sure it doesn't get destroyed */
+ assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
+ interlocked_inc( &wait->refcount );
+ objects[num_handles] = wait;
+ handles[num_handles] = wait->u.wait.handle;
+ num_handles++;
+ }
+ }
+
+ if (!bucket->num_waits)
+ {
+ assert( num_handles == 0 );
+
+ /* All wait objects have been destroyed, if there are no new wait objects within some
+ * amount of time, then we can shutdown this thread. */
+ RtlLeaveCriticalSection( &waitqueue.cs );
+ timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
+ status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, FALSE, &timeout );
+ RtlEnterCriticalSection( &waitqueue.cs );
+
+ if (status == STATUS_TIMEOUT && !bucket->num_waits)
+ break;
+ }
+ else
+ {
+ handles[num_handles] = bucket->update_event;
+
+ /* Wait for a wait queue update event or until an event is triggered */
+ RtlLeaveCriticalSection( &waitqueue.cs );
+ status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, FALSE, &timeout );
+ RtlEnterCriticalSection( &waitqueue.cs );
+
+ if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
+ {
+ wait = objects[status - STATUS_WAIT_0];
+ assert( wait->type == TP_OBJECT_TYPE_WAIT );
+
+ if (wait->u.wait.bucket)
+ {
+ assert( wait->u.wait.bucket == bucket );
+ list_remove( &wait->u.wait.wait_entry );
+ list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
+ tp_object_submit( wait, TRUE );
+ }
+ else
+ FIXME("Wait object triggered while object was destroyed, race-condition.\n");
+ }
+
+ while (num_handles)
+ {
+ wait = objects[--num_handles];
+ assert( wait->type == TP_OBJECT_TYPE_WAIT );
+ tp_object_release( wait );
+ }
+ }
+
+ /* Try to merge with other buckets */
+ if (waitqueue.num_buckets > 1 && bucket->num_waits && bucket->num_waits < MAXIMUM_WAITQUEUE_OBJECTS / 2)
+ {
+ struct waitqueue_bucket *other_bucket;
+ LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
+ {
+ if (other_bucket != bucket && other_bucket->num_waits &&
+ other_bucket->num_waits + bucket->num_waits <= MAXIMUM_WAITQUEUE_OBJECTS)
+ {
+ other_bucket->num_waits += bucket->num_waits;
+ bucket->num_waits = 0;
+
+ LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
+ {
+ assert( wait->type == TP_OBJECT_TYPE_WAIT );
+ wait->u.wait.bucket = other_bucket;
+ }
+ list_move_tail( &other_bucket->reserved, &bucket->reserved );
+
+ LIST_FOR_EACH_ENTRY( wait, &bucket->waits, struct threadpool_object, u.wait.wait_entry )
+ {
+ assert( wait->type == TP_OBJECT_TYPE_WAIT );
+ wait->u.wait.bucket = other_bucket;
+ }
+ list_move_tail( &other_bucket->waits, &bucket->waits );
+
+ /* we will not terminate immediately, but instead after a timeout. Make sure that this
+ * bucket appears as the last one in the list, otherwise there is a high risk that
+ * elements will be added again. */
+ list_remove( &bucket->bucket_entry );
+ list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
+
+ NtSetEvent( other_bucket->update_event, NULL );
+ break;
+ }
+ }
+ }
+ }
+
+ waitqueue.num_buckets--;
+ list_remove( &bucket->bucket_entry );
+ if (!waitqueue.num_buckets)
+ assert( list_empty( &waitqueue.buckets ) );
+ RtlLeaveCriticalSection( &waitqueue.cs );
+
+ assert( bucket->num_waits == 0 );
+ assert( list_empty( &bucket->reserved ) );
+ assert( list_empty( &bucket->waits ) );
+
+ NtClose( bucket->update_event );
+ RtlFreeHeap( GetProcessHeap(), 0, bucket );
+}
/***********************************************************************
* THREADPOOL INSTANCE IMPLEMENTATION
@@ -689,6 +1047,7 @@ static void tp_threadpool_shutdown( struct threadpool *pool )
static void CALLBACK threadpool_worker_proc( void *param )
{
struct threadpool *pool = param;
+ TP_WAIT_RESULT wait_result;
LARGE_INTEGER timeout;
struct list *ptr;
@@ -708,6 +1067,18 @@ static void CALLBACK threadpool_worker_proc( void *param )
list_add_tail( &pool->pool, &object->pool_entry );
object->num_running_callbacks++;
+ /* for wait objects, determine if the object was signaled or if this
+ * is a timeout. */
+ if (object->type == TP_OBJECT_TYPE_WAIT)
+ {
+ if (object->u.wait.signaled > 0)
+ {
+ wait_result = WAIT_OBJECT_0;
+ object->u.wait.signaled--;
+ }
+ else wait_result = WAIT_TIMEOUT;
+ }
+
/* Leave critical section and do the actual callback. */
pool->num_busy_workers++;
RtlLeaveCriticalSection( &pool->cs );
@@ -746,6 +1117,16 @@ static void CALLBACK threadpool_worker_proc( void *param )
break;
}
+ case TP_OBJECT_TYPE_WAIT:
+ {
+ TP_CALLBACK_INSTANCE *cb_instance = (TP_CALLBACK_INSTANCE *)&instance;
+ TRACE( "executing callback %p(%p, %p, %p, %u)\n",
+ object->u.wait.callback, cb_instance, object->userdata, object, wait_result );
+ object->u.wait.callback( cb_instance, object->userdata, (TP_WAIT *)object, wait_result );
+ TRACE( "callback %p returned\n", object->u.wait.callback );
+ break;
+ }
+
default:
FIXME( "callback type %u not implemented\n", object->type );
break;
@@ -857,7 +1238,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
* to the cleanup group. As soon as the cleanup group members are released ->shutdown
* will be set, and tp_object_submit would fail with an assertion. */
if (submit_and_release)
- tp_object_submit( object );
+ tp_object_submit( object, FALSE );
/* Assign this object to a specific group. Please note that this has to be done
* as the last step before returning a pointer to the application, otherwise
@@ -956,6 +1337,38 @@ static NTSTATUS tp_object_alloc_timer( struct threadpool_object **out, PTP_TIMER
return STATUS_SUCCESS;
}
+static NTSTATUS tp_object_alloc_wait( struct threadpool_object **out, PTP_WAIT_CALLBACK callback,
+ PVOID userdata, TP_CALLBACK_ENVIRON *environment )
+{
+ struct threadpool_object *object;
+ struct threadpool *pool;
+ NTSTATUS status;
+
+ /* determine threadpool */
+ pool = environment ? (struct threadpool *)environment->Pool : NULL;
+ if (!pool) pool = get_default_threadpool();
+ if (!pool) return STATUS_NO_MEMORY;
+
+ object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
+ if (!object)
+ return STATUS_NO_MEMORY;
+
+ object->type = TP_OBJECT_TYPE_WAIT;
+ object->u.wait.callback = callback;
+
+ status = tp_waitqueue_acquire( object );
+ if (status)
+ {
+ RtlFreeHeap( GetProcessHeap(), 0, object );
+ return status;
+ }
+
+ tp_object_initialize( object, pool, userdata, environment, FALSE );
+
+ *out = object;
+ return STATUS_SUCCESS;
+}
+
static BOOL tp_object_release( struct threadpool_object *object )
{
struct threadpool_group *group;
@@ -996,6 +1409,11 @@ static void tp_object_shutdown( struct threadpool_object *object )
/* release reference on the timerqueue */
tp_timerqueue_release( object );
}
+ else if (object->type == TP_OBJECT_TYPE_WAIT)
+ {
+ /* release reference to the waitqueue */
+ tp_waitqueue_release( object );
+ }
object->shutdown = TRUE;
}
@@ -1007,12 +1425,17 @@ static void tp_object_cancel( struct threadpool_object *object, BOOL group_cance
/* Remove the pending callbacks from the pool */
RtlEnterCriticalSection( &pool->cs );
+
if (object->num_pending_callbacks)
{
pending_callbacks = object->num_pending_callbacks;
list_remove( &object->pool_entry );
object->num_pending_callbacks = 0;
}
+
+ if (object->type == TP_OBJECT_TYPE_WAIT)
+ object->u.wait.signaled = 0;
+
RtlLeaveCriticalSection( &pool->cs );
/* Execute group cancellation callback if defined, and if this was actually a group cancel. */
@@ -1039,7 +1462,7 @@ static void tp_object_wait( struct threadpool_object *object )
RtlLeaveCriticalSection( &pool->cs );
}
-static void tp_object_submit( struct threadpool_object *object )
+static void tp_object_submit( struct threadpool_object *object, BOOL success )
{
struct threadpool *pool = object->pool;
@@ -1076,6 +1499,10 @@ static void tp_object_submit( struct threadpool_object *object )
interlocked_inc( &object->refcount );
+ /* increment success counter by one */
+ if (object->type == TP_OBJECT_TYPE_WAIT && success)
+ object->u.wait.signaled++;
+
RtlLeaveCriticalSection( &pool->cs );
}
@@ -1210,6 +1637,16 @@ NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID
}
/***********************************************************************
+ * TpAllocWait (NTDLL.@)
+ */
+NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
+ TP_CALLBACK_ENVIRON *environment )
+{
+ TRACE("%p %p %p %p\n", out, callback, userdata, environment);
+ return tp_object_alloc_wait( (struct threadpool_object **)out, callback, userdata, environment );
+}
+
+/***********************************************************************
* TpAllocWork (NTDLL.@)
*/
NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
@@ -1330,7 +1767,7 @@ VOID WINAPI TpPostWork( TP_WORK *work )
{
struct threadpool_object *this = impl_from_TP_WORK( work );
TRACE("%p\n", work);
- if (this) tp_object_submit( this );
+ if (this) tp_object_submit( this, FALSE );
}
/***********************************************************************
@@ -1386,6 +1823,20 @@ VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
}
/***********************************************************************
+ * TpReleaseWait (NTDLL.@)
+ */
+VOID WINAPI TpReleaseWait( TP_WAIT *wait )
+{
+ struct threadpool_object *this = impl_from_TP_WAIT( wait );
+ TRACE("%p\n", wait);
+ if (this)
+ {
+ tp_object_shutdown( this );
+ tp_object_release( this );
+ }
+}
+
+/***********************************************************************
* TpReleaseWork (NTDLL.@)
*/
VOID WINAPI TpReleaseWork( TP_WORK *work )
@@ -1431,6 +1882,16 @@ VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LO
}
/***********************************************************************
+ * TpSetWait (KERNEL32.@)
+ */
+VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout )
+{
+ struct threadpool_object *this = impl_from_TP_WAIT( wait );
+ TRACE("%p %p %p\n", wait, handle, timeout);
+ if (this) tp_waitqueue_update_wait( this, handle, timeout );
+}
+
+/***********************************************************************
* TpSimpleTryPost (NTDLL.@)
*/
NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment )
@@ -1455,6 +1916,21 @@ VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
}
/***********************************************************************
+ * TpWaitForWait (KERNEL32.@)
+ */
+VOID WINAPI TpWaitForWait( TP_WAIT *wait, BOOL cancel_pending )
+{
+ struct threadpool_object *this = impl_from_TP_WAIT( wait );
+ TRACE("%p %d\n", wait, cancel_pending);
+ if (this)
+ {
+ if (cancel_pending)
+ tp_object_cancel( this, FALSE, NULL );
+ tp_object_wait( this );
+ }
+}
+
+/***********************************************************************
* TpWaitForWork (NTDLL.@)
*/
VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
--
2.2.2

View File

@ -1,143 +0,0 @@
From 1cd9f0a44db192fb1add9989d040c96fd4f537a3 Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Mon, 2 Feb 2015 13:26:06 +0100
Subject: ntdll: Make sure that threadpools have always at least one worker
thread.
Since various functions are declared as VOID on Windows, we always have
to make sure that tasks can be processed. We do that by spawning the first
worker immediately when the threadpool is created.
---
dlls/ntdll/threadpool2.c | 45 ++++++++++++++++++++++++++++-----------------
1 file changed, 28 insertions(+), 17 deletions(-)
diff --git a/dlls/ntdll/threadpool2.c b/dlls/ntdll/threadpool2.c
index 76aee5a..7829212 100644
--- a/dlls/ntdll/threadpool2.c
+++ b/dlls/ntdll/threadpool2.c
@@ -233,7 +233,7 @@ static NTSTATUS tp_threadpool_alloc( struct threadpool **out );
static BOOL tp_threadpool_release( struct threadpool *pool );
static void tp_threadpool_shutdown( struct threadpool *pool );
-static NTSTATUS tp_object_submit( struct threadpool_object *object );
+static void tp_object_submit( struct threadpool_object *object );
static BOOL tp_object_release( struct threadpool_object *object );
static void tp_object_shutdown( struct threadpool_object *object );
@@ -598,12 +598,14 @@ static struct threadpool *get_default_threadpool( void )
static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
{
struct threadpool *pool;
+ NTSTATUS status;
+ HANDLE thread;
pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
if (!pool)
return STATUS_NO_MEMORY;
- pool->refcount = 1;
+ pool->refcount = 2; /* this thread + worker proc */
pool->shutdown = FALSE;
RtlInitializeCriticalSection( &pool->cs );
@@ -615,9 +617,20 @@ static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
pool->max_workers = 500;
pool->min_workers = 1;
- pool->num_workers = 0;
+ pool->num_workers = 1;
pool->num_busy_workers = 0;
+ status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
+ threadpool_worker_proc, pool, &thread, NULL );
+ if (status != STATUS_SUCCESS)
+ {
+ pool->cs.DebugInfo->Spare[0] = 0;
+ RtlDeleteCriticalSection( &pool->cs );
+ RtlFreeHeap( GetProcessHeap(), 0, pool );
+ return status;
+ }
+ NtClose( thread );
+
TRACE("allocated threadpool %p\n", pool);
*out = pool;
@@ -745,10 +758,10 @@ static void CALLBACK threadpool_worker_proc( void *param )
if (pool->shutdown)
break;
- /* Wait for new tasks or until timeout expires. */
+ /* Wait for new tasks or until timeout expires. Never terminate the last worker. */
timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs,
- &timeout ) == STATUS_TIMEOUT && !list_head( &pool->pool ))
+ &timeout ) == STATUS_TIMEOUT && !list_head( &pool->pool ) && pool->num_workers > 1)
{
break;
}
@@ -1005,10 +1018,9 @@ static void tp_object_wait( struct threadpool_object *object )
RtlLeaveCriticalSection( &pool->cs );
}
-static NTSTATUS tp_object_submit( struct threadpool_object *object )
+static void tp_object_submit( struct threadpool_object *object )
{
struct threadpool *pool = object->pool;
- NTSTATUS status = STATUS_SUCCESS;
assert( !object->shutdown );
assert( !pool->shutdown );
@@ -1018,7 +1030,9 @@ static NTSTATUS tp_object_submit( struct threadpool_object *object )
/* Start new worker threads if required (and allowed) */
if (pool->num_busy_workers >= pool->num_workers && pool->num_workers < pool->max_workers)
{
+ NTSTATUS status;
HANDLE thread;
+
status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
threadpool_worker_proc, pool, &thread, NULL );
if (status == STATUS_SUCCESS)
@@ -1027,25 +1041,21 @@ static NTSTATUS tp_object_submit( struct threadpool_object *object )
pool->num_workers++;
NtClose( thread );
}
- else if (pool->num_workers)
+ else
{
+ assert( pool->num_workers > 0 );
RtlWakeConditionVariable( &pool->update_event );
- status = STATUS_SUCCESS;
}
}
else RtlWakeConditionVariable( &pool->update_event );
/* Queue work item into pool and increment refcount */
- if (!status)
- {
- if (!object->num_pending_callbacks++)
- list_add_tail( &pool->pool, &object->pool_entry );
+ if (!object->num_pending_callbacks++)
+ list_add_tail( &pool->pool, &object->pool_entry );
- interlocked_inc(&object->refcount);
- }
+ interlocked_inc( &object->refcount );
RtlLeaveCriticalSection( &pool->cs );
- return status;
}
/***********************************************************************
@@ -1410,7 +1420,8 @@ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata, T
status = tp_object_alloc_simple( &object, callback, userdata, environment );
if (!status)
{
- status = tp_object_submit( object );
+ tp_object_submit( object );
+
tp_object_shutdown( object );
tp_object_release( object );
}
--
2.2.2

View File

@ -1,115 +0,0 @@
From d4de8e6120cac3e8c8862424a369fa90c4d73153 Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Mon, 2 Feb 2015 23:28:45 +0100
Subject: ntdll: Avoid race-conditions between tp_object_{submit,shutdown} for
simple callbacks.
---
dlls/ntdll/threadpool2.c | 45 +++++++++++++++++++++------------------------
1 file changed, 21 insertions(+), 24 deletions(-)
diff --git a/dlls/ntdll/threadpool2.c b/dlls/ntdll/threadpool2.c
index 7829212..9a9bf6a 100644
--- a/dlls/ntdll/threadpool2.c
+++ b/dlls/ntdll/threadpool2.c
@@ -776,7 +776,7 @@ static void CALLBACK threadpool_worker_proc( void *param )
***********************************************************************/
static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
- PVOID userdata, TP_CALLBACK_ENVIRON *environment )
+ PVOID userdata, TP_CALLBACK_ENVIRON *environment, BOOL submit_and_release )
{
object->refcount = 1;
object->shutdown = FALSE;
@@ -828,6 +828,14 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
/* Increase reference-count on the pool */
interlocked_inc( &pool->refcount );
+ TRACE("allocated object %p of type %u\n", object, object->type);
+
+ /* For simple callbacks we have to run tp_object_submit before adding this object
+ * to the cleanup group. As soon as the cleanup group members are released ->shutdown
+ * will be set, and tp_object_submit would fail with an assertion. */
+ if (submit_and_release)
+ tp_object_submit( object );
+
/* Assign this object to a specific group. Please note that this has to be done
* as the last step before returning a pointer to the application, otherwise
* there is a risk of having race-conditions. */
@@ -840,10 +848,16 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
list_add_tail( &group->members, &object->group_entry );
RtlLeaveCriticalSection( &group->cs );
}
+
+ if (submit_and_release)
+ {
+ tp_object_shutdown( object );
+ tp_object_release( object );
+ }
}
-static NTSTATUS tp_object_alloc_simple( struct threadpool_object **out, PTP_SIMPLE_CALLBACK callback,
- PVOID userdata, TP_CALLBACK_ENVIRON *environment )
+static NTSTATUS tp_object_submit_simple( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
+ TP_CALLBACK_ENVIRON *environment )
{
struct threadpool_object *object;
struct threadpool *pool;
@@ -859,11 +873,8 @@ static NTSTATUS tp_object_alloc_simple( struct threadpool_object **out, PTP_SIMP
object->type = TP_OBJECT_TYPE_SIMPLE;
object->u.simple.callback = callback;
- tp_object_initialize( object, pool, userdata, environment );
-
- TRACE("allocated object %p of type %u\n", object, object->type);
+ tp_object_initialize( object, pool, userdata, environment, TRUE );
- *out = object;
return STATUS_SUCCESS;
}
@@ -884,9 +895,7 @@ static NTSTATUS tp_object_alloc_work( struct threadpool_object **out, PTP_WORK_C
object->type = TP_OBJECT_TYPE_WORK;
object->u.work.callback = callback;
- tp_object_initialize( object, pool, userdata, environment );
-
- TRACE("allocated object %p of type %u\n", object, object->type);
+ tp_object_initialize( object, pool, userdata, environment, FALSE );
*out = object;
return STATUS_SUCCESS;
@@ -926,9 +935,7 @@ static NTSTATUS tp_object_alloc_timer( struct threadpool_object **out, PTP_TIMER
object->u.timer.period = 0;
object->u.timer.window_length = 0;
- tp_object_initialize( object, pool, userdata, environment );
-
- TRACE("allocated object %p of type %u\n", object, object->type);
+ tp_object_initialize( object, pool, userdata, environment, FALSE );
*out = object;
return STATUS_SUCCESS;
@@ -1414,18 +1421,8 @@ VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LO
*/
NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment )
{
- struct threadpool_object *object;
- NTSTATUS status;
TRACE("%p %p %p\n", callback, userdata, environment);
- status = tp_object_alloc_simple( &object, callback, userdata, environment );
- if (!status)
- {
- tp_object_submit( object );
-
- tp_object_shutdown( object );
- tp_object_release( object );
- }
- return status;
+ return tp_object_submit_simple( callback, userdata, environment );
}
/***********************************************************************
--
2.2.2

View File

@ -0,0 +1,317 @@
From eabc190f60fddb8d6e454da9686a403ae8391939 Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Fri, 6 Feb 2015 20:09:41 +0100
Subject: ntdll/tests: Add tests for threadpool wait objects.
---
dlls/ntdll/tests/threadpool.c | 287 ++++++++++++++++++++++++++++++++++++++++++
1 file changed, 287 insertions(+)
diff --git a/dlls/ntdll/tests/threadpool.c b/dlls/ntdll/tests/threadpool.c
index 61f72ec..424fde2 100644
--- a/dlls/ntdll/tests/threadpool.c
+++ b/dlls/ntdll/tests/threadpool.c
@@ -761,6 +761,291 @@ static void test_tp_window_length(void)
pTpReleasePool(pool);
}
+static void CALLBACK wait_cb(TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result)
+{
+ trace("Running wait callback\n");
+
+ if (result == WAIT_OBJECT_0)
+ InterlockedIncrement((LONG *)userdata);
+ else if (result == WAIT_TIMEOUT)
+ InterlockedExchangeAdd((LONG *)userdata, 0x10000);
+ else
+ ok(0, "unexpected result %u\n", result);
+}
+
+static void test_tp_wait(void)
+{
+ TP_CALLBACK_ENVIRON environment;
+ HANDLE semaphore;
+ TP_WAIT *wait, *wait2;
+ TP_POOL *pool;
+ NTSTATUS status;
+ LONG userdata;
+ LARGE_INTEGER when;
+ DWORD ret;
+
+ /* Allocate new threadpool */
+ pool = NULL;
+ status = pTpAllocPool(&pool, NULL);
+ ok(!status, "TpAllocPool failed with status %x\n", status);
+ ok(pool != NULL, "expected pool != NULL\n");
+
+ /* Allocate new wait items */
+ wait = NULL;
+ memset(&environment, 0, sizeof(environment));
+ environment.Version = 1;
+ environment.Pool = pool;
+ status = pTpAllocWait(&wait, wait_cb, &userdata, &environment);
+ ok(!status, "TpAllocWait failed with status %x\n", status);
+ ok(wait != NULL, "expected wait != NULL\n");
+
+ wait2 = NULL;
+ status = pTpAllocWait(&wait2, wait_cb, &userdata, &environment);
+ ok(!status, "TpAllocWait failed with status %x\n", status);
+ ok(wait != NULL, "expected wait != NULL\n");
+
+ semaphore = CreateSemaphoreW(NULL, 0, 1, NULL);
+ ok(semaphore != NULL, "failed to create semaphore\n");
+
+ /* Infinite timeout, signal the semaphore immediately */
+ userdata = 0;
+ pTpSetWait(wait, semaphore, NULL);
+ ReleaseSemaphore(semaphore, 1, NULL);
+ Sleep(50);
+ ok(userdata == 1, "expected userdata = 1, got %u\n", userdata);
+
+ /* Relative timeout, no event */
+ userdata = 0;
+ when.QuadPart = (ULONGLONG)50 * -10000;
+ pTpSetWait(wait, semaphore, &when);
+ Sleep(100);
+ pTpWaitForWait(wait, FALSE);
+ ok(userdata == 0x10000, "expected userdata = 0x10000, got %u\n", userdata);
+ ret = WaitForSingleObject(semaphore, 50);
+ ok(ret == WAIT_TIMEOUT, "expected ret = WAIT_TIMEOUT, got %u\n", ret);
+
+ /* Relative timeout, with event */
+ userdata = 0;
+ when.QuadPart = (ULONGLONG)500 * -10000;
+ pTpSetWait(wait, semaphore, &when);
+ pTpWaitForWait(wait, TRUE);
+ Sleep(250);
+ ReleaseSemaphore(semaphore, 1, NULL);
+ Sleep(50);
+ pTpWaitForWait(wait, FALSE);
+ ok(userdata == 1, "expected userdata = 1, got %u\n", userdata);
+ ret = WaitForSingleObject(semaphore, 50);
+ ok(ret == WAIT_TIMEOUT, "expected ret = WAIT_TIMEOUT, got %u\n", ret);
+
+ /* Absolute timeout, no event */
+ userdata = 0;
+ NtQuerySystemTime( &when );
+ when.QuadPart += (ULONGLONG)50 * 10000;
+ pTpSetWait(wait, semaphore, &when);
+ Sleep(100);
+ pTpWaitForWait(wait, FALSE);
+ ok(userdata == 0x10000, "expected userdata = 0x10000, got %u\n", userdata);
+ ret = WaitForSingleObject(semaphore, 50);
+ ok(ret == WAIT_TIMEOUT, "expected ret = WAIT_TIMEOUT, got %u\n", ret);
+
+ /* Absolute timeout, with event */
+ userdata = 0;
+ NtQuerySystemTime( &when );
+ when.QuadPart += (ULONGLONG)500 * 10000;
+ pTpSetWait(wait, semaphore, &when);
+ pTpWaitForWait(wait, TRUE);
+ Sleep(250);
+ ReleaseSemaphore(semaphore, 1, NULL);
+ Sleep(50);
+ pTpWaitForWait(wait, FALSE);
+ ok(userdata == 1, "expected userdata = 1, got %u\n", userdata);
+ ret = WaitForSingleObject(semaphore, 50);
+ ok(ret == WAIT_TIMEOUT, "expected ret = WAIT_TIMEOUT, got %u\n", ret);
+
+ /* Trigger event immediately */
+ userdata = 0;
+ when.QuadPart = 0;
+ pTpSetWait(wait, semaphore, &when);
+ pTpWaitForWait(wait, FALSE);
+ ok(userdata == 0x10000, "expected userdata = 0x10000, got %u\n", userdata);
+ ret = WaitForSingleObject(semaphore, 50);
+ ok(ret == WAIT_TIMEOUT, "expected ret = WAIT_TIMEOUT, got %u\n", ret);
+
+ /* Cancel a pending wait */
+ userdata = 0;
+ when.QuadPart = (ULONGLONG)500 * -10000;
+ pTpSetWait(wait, semaphore, &when);
+ pTpWaitForWait(wait, TRUE);
+ Sleep(250);
+ pTpSetWait(wait, NULL, (void *)0xdeadbeef);
+ Sleep(50);
+ ReleaseSemaphore(semaphore, 1, NULL);
+ Sleep(50);
+ ok(userdata == 0, "expected userdata = 0, got %u\n", userdata);
+ ret = WaitForSingleObject(semaphore, 1000);
+ ok(ret == WAIT_OBJECT_0, "expected ret = WAIT_OBJECT_0, got %u\n", ret);
+
+ /* Test with INVALID_HANDLE_VALUE */
+ userdata = 0;
+ when.QuadPart = 0;
+ pTpSetWait(wait, INVALID_HANDLE_VALUE, &when);
+ Sleep(50);
+ ok(userdata == 0x10000, "expected userdata = 0x10000, got %u\n", userdata);
+
+ /* Cancel a pending wait with INVALID_HANDLE_VALUE */
+ userdata = 0;
+ when.QuadPart = (ULONGLONG)500 * -10000;
+ pTpSetWait(wait, semaphore, &when);
+ pTpWaitForWait(wait, TRUE);
+ Sleep(250);
+ when.QuadPart = (ULONGLONG)100 * -10000;
+ pTpSetWait(wait, INVALID_HANDLE_VALUE, &when);
+ Sleep(250);
+ ok(userdata == 0x10000, "expected userdata = 0x10000, got %u\n", userdata);
+
+ /* Add two objects with the same semaphore */
+ userdata = 0;
+ pTpSetWait(wait, semaphore, NULL);
+ pTpSetWait(wait2, semaphore, NULL);
+ ok(userdata == 0, "expected userdata = 0, got %u\n", userdata);
+ ReleaseSemaphore(semaphore, 1, NULL);
+ Sleep(10);
+ pTpWaitForWait(wait, FALSE);
+ pTpWaitForWait(wait2, FALSE);
+ ok(userdata == 1, "expected userdata = 1, got %u\n", userdata);
+ ret = WaitForSingleObject(semaphore, 50);
+ ok(ret == WAIT_TIMEOUT, "expected ret = WAIT_TIMEOUT, got %u\n", ret);
+
+ CloseHandle(semaphore);
+ semaphore = CreateSemaphoreW(NULL, 0, 2, NULL);
+ ok(semaphore != NULL, "failed to create semaphore\n");
+
+ /* Repeat test above, but with a semaphore of count 2 */
+ userdata = 0;
+ pTpSetWait(wait, semaphore, NULL);
+ pTpSetWait(wait2, semaphore, NULL);
+ ok(userdata == 0, "expected userdata = 0, got %u\n", userdata);
+ ReleaseSemaphore(semaphore, 2, NULL);
+ Sleep(10);
+ pTpWaitForWait(wait, FALSE);
+ pTpWaitForWait(wait2, FALSE);
+ ok(userdata == 2, "expected userdata = 2, got %u\n", userdata);
+ ret = WaitForSingleObject(semaphore, 50);
+ ok(ret == WAIT_TIMEOUT, "expected ret = WAIT_TIMEOUT, got %u\n", ret);
+
+ CloseHandle(semaphore);
+
+ /* Cleanup */
+ pTpReleaseWait(wait2);
+ pTpReleaseWait(wait);
+ pTpReleasePool(pool);
+}
+
+static LONG multi_wait_callbacks;
+static DWORD multi_wait_result;
+
+static void CALLBACK multi_wait_cb(TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result)
+{
+ DWORD index = (DWORD)(DWORD_PTR)userdata;
+ InterlockedIncrement(&multi_wait_callbacks);
+
+ if (result == WAIT_OBJECT_0)
+ multi_wait_result = index;
+ else if (result == WAIT_TIMEOUT)
+ multi_wait_result = 0x10000 | index;
+ else
+ ok(0, "unexpected result %u\n", result);
+}
+
+static void test_tp_multi_wait(void)
+{
+ TP_CALLBACK_ENVIRON environment;
+ HANDLE semaphores[512];
+ TP_WAIT *waits[512];
+ TP_POOL *pool;
+ NTSTATUS status;
+ LARGE_INTEGER when;
+ int i;
+
+ /* Allocate new threadpool */
+ pool = NULL;
+ status = pTpAllocPool(&pool, NULL);
+ ok(!status, "TpAllocPool failed with status %x\n", status);
+ ok(pool != NULL, "expected pool != NULL\n");
+
+ memset(&environment, 0, sizeof(environment));
+ environment.Version = 1;
+ environment.Pool = pool;
+
+ /* Create semaphores, wait objects and enable them */
+ for (i = 0; i < sizeof(semaphores)/sizeof(semaphores[0]); i++)
+ {
+ semaphores[i] = CreateSemaphoreW(NULL, 0, 1, NULL);
+ ok(semaphores[i] != NULL, "failed to create semaphores[%d]\n", i);
+
+ waits[i] = NULL;
+ status = pTpAllocWait(&waits[i], multi_wait_cb, (void *)i, &environment);
+ ok(!status, "TpAllocWait failed with status %x\n", status);
+ ok(waits[i] != NULL, "expected waits[%d] != NULL\n", i);
+
+ pTpSetWait(waits[i], semaphores[i], NULL);
+ }
+
+ /* Now test releasing the semaphores */
+ for (i = 0; i < sizeof(semaphores)/sizeof(semaphores[0]); i++)
+ {
+ multi_wait_callbacks = 0;
+ multi_wait_result = 0;
+
+ ReleaseSemaphore(semaphores[i], 1, NULL);
+ while (multi_wait_callbacks == 0) Sleep(10);
+ ok(multi_wait_callbacks == 1, "expected multi_wait_callbacks = 1, got %u\n", multi_wait_callbacks);
+ ok(multi_wait_result == i, "expected multi_wait_result = %u, got %u\n", multi_wait_result, i);
+
+ pTpSetWait(waits[i], semaphores[i], NULL);
+ }
+
+ /* Now again in the reversed order */
+ for (i = sizeof(semaphores)/sizeof(semaphores[0]) - 1; i >= 0; i--)
+ {
+ multi_wait_callbacks = 0;
+ multi_wait_result = 0;
+
+ ReleaseSemaphore(semaphores[i], 1, NULL);
+ while (multi_wait_callbacks == 0) Sleep(10);
+ ok(multi_wait_callbacks == 1, "expected multi_wait_callbacks = 1, got %u\n", multi_wait_callbacks);
+ ok(multi_wait_result == i, "expected multi_wait_result = %u, got %u\n", multi_wait_result, i);
+
+ pTpSetWait(waits[i], semaphores[i], NULL);
+ }
+
+ /* Now test with a timeout */
+ multi_wait_callbacks = 0;
+ multi_wait_result = 0;
+ for (i = 0; i < sizeof(semaphores)/sizeof(semaphores[0]); i++)
+ {
+ when.QuadPart = 0;
+ pTpSetWait(waits[i], semaphores[i], &when);
+ }
+ Sleep(50);
+ ok(multi_wait_callbacks == sizeof(semaphores)/sizeof(semaphores[0]),
+ "got wrong multi_wait_callbacks %u\n", multi_wait_callbacks);
+ ok(multi_wait_result >> 16, "expected multi_wait_result >> 16 != 0\n");
+
+ /* Add them all again, we want that the wait is pending while destroying it */
+ for (i = 0; i < sizeof(semaphores)/sizeof(semaphores[0]); i++)
+ pTpSetWait(waits[i], semaphores[i], NULL);
+
+ /* Destroy the objects and semaphores */
+ for (i = 0; i < sizeof(semaphores)/sizeof(semaphores[0]); i++)
+ {
+ pTpReleaseWait(waits[i]);
+ NtClose(semaphores[i]);
+ }
+
+ pTpReleasePool(pool);
+}
+
START_TEST(threadpool)
{
if(!init_threadpool())
@@ -774,6 +1059,8 @@ START_TEST(threadpool)
test_tp_group_cancel();
test_tp_timer();
test_tp_window_length();
+ test_tp_wait();
+ test_tp_multi_wait();
/* FIXME: Make sure worker threads have terminated before. */
Sleep(100);
--
2.2.2

View File

@ -0,0 +1,140 @@
From 67ad3c12465ea23c508534d7dc660a3d03a69caf Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Fri, 6 Feb 2015 20:24:27 +0100
Subject: kernel32: Forward threadpool wait functions to ntdll.
---
dlls/kernel32/kernel32.spec | 8 ++++----
dlls/kernel32/thread.c | 43 +++++++++++++++++++++++++++++++++++++++++++
include/winternl.h | 4 ++++
3 files changed, 51 insertions(+), 4 deletions(-)
diff --git a/dlls/kernel32/kernel32.spec b/dlls/kernel32/kernel32.spec
index 4d170d5..b4af160 100644
--- a/dlls/kernel32/kernel32.spec
+++ b/dlls/kernel32/kernel32.spec
@@ -233,7 +233,7 @@
@ stdcall CloseThreadpoolCleanupGroupMembers(ptr long ptr) ntdll.TpReleaseCleanupGroupMembers
# @ stub CloseThreadpoolIo
@ stdcall CloseThreadpoolTimer(ptr) ntdll.TpReleaseTimer
-# @ stub CloseThreadpoolWait
+@ stdcall CloseThreadpoolWait(ptr) ntdll.TpReleaseWait
@ stdcall CloseThreadpoolWork(ptr) ntdll.TpReleaseWork
@ stdcall CmdBatNotification(long)
@ stdcall CommConfigDialogA(str long ptr)
@@ -335,7 +335,7 @@
@ stdcall CreateThreadpoolCleanupGroup()
# @ stub CreateThreadpoolIo
@ stdcall CreateThreadpoolTimer(ptr ptr ptr)
-# @ stub CreateThreadpoolWait
+@ stdcall CreateThreadpoolWait(ptr ptr ptr)
@ stdcall CreateThreadpoolWork(ptr ptr ptr)
@ stdcall CreateTimerQueue ()
@ stdcall CreateTimerQueueTimer(ptr long ptr ptr long long long)
@@ -1456,7 +1456,7 @@
@ stdcall SetThreadpoolThreadMaximum(ptr long) ntdll.TpSetPoolMaxThreads
@ stdcall SetThreadpoolThreadMinimum(ptr long) ntdll.TpSetPoolMinThreads
@ stdcall SetThreadpoolTimer(ptr ptr long long)
-# @ stub SetThreadpoolWait
+@ stdcall SetThreadpoolWait(ptr long ptr)
@ stdcall SetTimeZoneInformation(ptr)
@ stub SetTimerQueueTimer
# @ stub -arch=x86_64 SetUmsThreadInformation
@@ -1571,7 +1571,7 @@
@ stdcall WaitForSingleObjectEx(long long long)
# @ stub WaitForThreadpoolIoCallbacks
@ stdcall WaitForThreadpoolTimerCallbacks(ptr long) ntdll.TpWaitForTimer
-# @ stub WaitForThreadpoolWaitCallbacks
+@ stdcall WaitForThreadpoolWaitCallbacks(ptr long) ntdll.TpWaitForWait
@ stdcall WaitForThreadpoolWorkCallbacks(ptr long) ntdll.TpWaitForWork
@ stdcall WaitNamedPipeA (str long)
@ stdcall WaitNamedPipeW (wstr long)
diff --git a/dlls/kernel32/thread.c b/dlls/kernel32/thread.c
index 085b011..77037ef 100644
--- a/dlls/kernel32/thread.c
+++ b/dlls/kernel32/thread.c
@@ -894,6 +894,27 @@ PTP_TIMER WINAPI CreateThreadpoolTimer( PTP_TIMER_CALLBACK callback, PVOID userd
}
/***********************************************************************
+ * CreateThreadpoolWait (KERNEL32.@)
+ */
+PTP_WAIT WINAPI CreateThreadpoolWait( PTP_WAIT_CALLBACK callback, PVOID userdata,
+ TP_CALLBACK_ENVIRON *environment )
+{
+ TP_WAIT *wait;
+ NTSTATUS status;
+
+ TRACE( "%p, %p, %p\n", callback, userdata, environment );
+
+ status = TpAllocWait( &wait, callback, userdata, environment );
+ if (status)
+ {
+ SetLastError( RtlNtStatusToDosError(status) );
+ return NULL;
+ }
+
+ return wait;
+}
+
+/***********************************************************************
* CreateThreadpoolWork (KERNEL32.@)
*/
PTP_WORK WINAPI CreateThreadpoolWork( PTP_WORK_CALLBACK callback, PVOID userdata,
@@ -932,3 +953,25 @@ VOID WINAPI SetThreadpoolTimer( TP_TIMER *timer, FILETIME *due_time,
TpSetTimer( timer, due_time ? &timeout : NULL, period, window_length );
}
+
+/***********************************************************************
+ * SetThreadpoolWait (KERNEL32.@)
+ */
+VOID WINAPI SetThreadpoolWait( TP_WAIT *wait, HANDLE handle, FILETIME *due_time )
+{
+ LARGE_INTEGER timeout;
+
+ TRACE( "%p, %p, %p\n", wait, handle, due_time );
+
+ if (!handle)
+ {
+ due_time = NULL;
+ }
+ else if (due_time)
+ {
+ timeout.u.LowPart = due_time->dwLowDateTime;
+ timeout.u.HighPart = due_time->dwHighDateTime;
+ }
+
+ TpSetWait( wait, handle, due_time ? &timeout : NULL );
+}
diff --git a/include/winternl.h b/include/winternl.h
index a534bd1..c93931b 100644
--- a/include/winternl.h
+++ b/include/winternl.h
@@ -2604,6 +2604,7 @@ NTSYSAPI NTSTATUS WINAPI RtlLargeIntegerToChar(const ULONGLONG *,ULONG,ULONG,PC
NTSYSAPI NTSTATUS WINAPI TpAllocCleanupGroup(TP_CLEANUP_GROUP **);
NTSYSAPI NTSTATUS WINAPI TpAllocPool(TP_POOL **,PVOID);
NTSYSAPI NTSTATUS WINAPI TpAllocTimer(TP_TIMER **,PTP_TIMER_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
+NTSYSAPI NTSTATUS WINAPI TpAllocWait(TP_WAIT **,PTP_WAIT_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
NTSYSAPI NTSTATUS WINAPI TpAllocWork(TP_WORK **,PTP_WORK_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
NTSYSAPI void WINAPI TpCallbackLeaveCriticalSectionOnCompletion(TP_CALLBACK_INSTANCE *,RTL_CRITICAL_SECTION *);
NTSYSAPI NTSTATUS WINAPI TpCallbackMayRunLong(TP_CALLBACK_INSTANCE *);
@@ -2618,12 +2619,15 @@ NTSYSAPI void WINAPI TpReleaseCleanupGroup(TP_CLEANUP_GROUP *);
NTSYSAPI void WINAPI TpReleaseCleanupGroupMembers(TP_CLEANUP_GROUP *,BOOL,PVOID);
NTSYSAPI void WINAPI TpReleasePool(TP_POOL *);
NTSYSAPI void WINAPI TpReleaseTimer(TP_TIMER *);
+NTSYSAPI void WINAPI TpReleaseWait(TP_WAIT *);
NTSYSAPI void WINAPI TpReleaseWork(TP_WORK *);
NTSYSAPI void WINAPI TpSetPoolMaxThreads(TP_POOL *,DWORD);
NTSYSAPI BOOL WINAPI TpSetPoolMinThreads(TP_POOL *,DWORD);
NTSYSAPI void WINAPI TpSetTimer(TP_TIMER *, LARGE_INTEGER *,LONG,LONG);
+NTSYSAPI void WINAPI TpSetWait(TP_WAIT *,HANDLE,LARGE_INTEGER *);
NTSYSAPI NTSTATUS WINAPI TpSimpleTryPost(PTP_SIMPLE_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
NTSYSAPI void WINAPI TpWaitForTimer(TP_TIMER *,BOOL);
+NTSYSAPI void WINAPI TpWaitForWait(TP_WAIT *,BOOL);
NTSYSAPI void WINAPI TpWaitForWork(TP_WORK *,BOOL);
/* Wine internal functions */
--
2.2.2

View File

@ -1,3 +1,4 @@
Fixes: [35192] Add implementation for CreateThreadpool
Fixes: [32531] Implement threadpool work items
Fixes: [37306] Implement threadpool timers
Fixes: Implement threadpool wait objects

View File

@ -2233,20 +2233,22 @@ if test "$enable_ntdll_Vista_Threadpool" -eq 1; then
patch_apply ntdll-Vista_Threadpool/0001-ntdll-Add-threadpool-stub-functions-to-specfile.patch
patch_apply ntdll-Vista_Threadpool/0002-ntdll-Implement-threadpool-cleanup-group-and-callbac.patch
patch_apply ntdll-Vista_Threadpool/0003-ntdll-Implement-additional-threadpool-work-item-func.patch
patch_apply ntdll-Vista_Threadpool/0004-ntdll-Implement-threadpool-timer-functions.patch
patch_apply ntdll-Vista_Threadpool/0004-ntdll-Implement-threadpool-timer-functions.-rev-2.patch
patch_apply ntdll-Vista_Threadpool/0005-ntdll-tests-Add-tests-for-Tp-threadpool-functions.patch
patch_apply ntdll-Vista_Threadpool/0006-kernel32-Forward-various-threadpool-functions-to-ntd.patch
patch_apply ntdll-Vista_Threadpool/0007-ntdll-Make-sure-that-threadpools-have-always-at-leas.patch
patch_apply ntdll-Vista_Threadpool/0008-ntdll-Avoid-race-conditions-between-tp_object_-submi.patch
patch_apply ntdll-Vista_Threadpool/0007-ntdll-Implement-threadpool-wait-objects.patch
patch_apply ntdll-Vista_Threadpool/0008-ntdll-tests-Add-tests-for-threadpool-wait-objects.patch
patch_apply ntdll-Vista_Threadpool/0009-kernel32-Forward-threadpool-wait-functions-to-ntdll.patch
(
echo '+ { "Sebastian Lackner", "ntdll: Add threadpool stub functions to specfile.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Implement threadpool, cleanup group and callback instance functions.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Implement threadpool, cleanup group and callback instance functions.", 2 },';
echo '+ { "Sebastian Lackner", "ntdll: Implement additional threadpool work item functions.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Implement threadpool timer functions.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Implement threadpool timer functions.", 2 },';
echo '+ { "Sebastian Lackner", "ntdll/tests: Add tests for Tp* threadpool functions.", 1 },';
echo '+ { "Sebastian Lackner", "kernel32: Forward various threadpool functions to ntdll.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Make sure that threadpools have always at least one worker thread.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Avoid race-conditions between tp_object_{submit,shutdown} for simple callbacks.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Implement threadpool wait objects.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll/tests: Add tests for threadpool wait objects.", 1 },';
echo '+ { "Sebastian Lackner", "kernel32: Forward threadpool wait functions to ntdll.", 1 },';
) >> "$patchlist"
fi