ntdll-Vista_Threadpool: Various cleanup and splitting into smaller patchsets.

This commit is contained in:
Sebastian Lackner 2015-03-04 14:19:38 +01:00
parent 274fe74055
commit 6b31c6f9e2
22 changed files with 2437 additions and 1695 deletions

View File

@ -0,0 +1,493 @@
From 674272bb028c74d513416d2f776c1ecd6b229c7f Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Tue, 24 Feb 2015 05:42:07 +0100
Subject: ntdll: Implement TpSimpleTryPost and basic threadpool infrastructure.
---
dlls/ntdll/Makefile.in | 1 +
dlls/ntdll/ntdll.spec | 3 +
dlls/ntdll/tests/threadpool.c | 3 +-
dlls/ntdll/threadpool2.c | 424 ++++++++++++++++++++++++++++++++++++++++++
4 files changed, 430 insertions(+), 1 deletion(-)
create mode 100644 dlls/ntdll/threadpool2.c
diff --git a/dlls/ntdll/Makefile.in b/dlls/ntdll/Makefile.in
index ed4bb94..2cecac6 100644
--- a/dlls/ntdll/Makefile.in
+++ b/dlls/ntdll/Makefile.in
@@ -47,6 +47,7 @@ C_SRCS = \
tape.c \
thread.c \
threadpool.c \
+ threadpool2.c \
time.c \
version.c \
virtual.c \
diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec
index 9355d04..aa16021 100644
--- a/dlls/ntdll/ntdll.spec
+++ b/dlls/ntdll/ntdll.spec
@@ -967,6 +967,9 @@
@ stdcall RtlxOemStringToUnicodeSize(ptr) RtlOemStringToUnicodeSize
@ stdcall RtlxUnicodeStringToAnsiSize(ptr) RtlUnicodeStringToAnsiSize
@ stdcall RtlxUnicodeStringToOemSize(ptr) RtlUnicodeStringToOemSize
+@ stdcall TpAllocPool(ptr ptr)
+@ stdcall TpReleasePool(ptr)
+@ stdcall TpSimpleTryPost(ptr ptr ptr)
@ stdcall -ret64 VerSetConditionMask(int64 long long)
@ stdcall ZwAcceptConnectPort(ptr long ptr long long ptr) NtAcceptConnectPort
@ stdcall ZwAccessCheck(ptr long long ptr ptr ptr ptr ptr) NtAccessCheck
diff --git a/dlls/ntdll/tests/threadpool.c b/dlls/ntdll/tests/threadpool.c
index 2e31b34..6f164e9 100644
--- a/dlls/ntdll/tests/threadpool.c
+++ b/dlls/ntdll/tests/threadpool.c
@@ -48,7 +48,7 @@ static BOOL init_threadpool(void)
if (!pTpAllocPool)
{
- skip("Threadpool functions not supported, skipping tests\n");
+ win_skip("Threadpool functions not supported, skipping tests\n");
return FALSE;
}
@@ -105,6 +105,7 @@ static void test_tp_simple(void)
environment.Version = 9999;
environment.Pool = pool;
status = pTpSimpleTryPost(simple_cb, semaphore, &environment);
+ todo_wine
ok(status == STATUS_INVALID_PARAMETER || broken(!status) /* Vista/2008 */,
"TpSimpleTryPost unexpectedly returned status %x\n", status);
if (!status)
diff --git a/dlls/ntdll/threadpool2.c b/dlls/ntdll/threadpool2.c
new file mode 100644
index 0000000..e912729
--- /dev/null
+++ b/dlls/ntdll/threadpool2.c
@@ -0,0 +1,424 @@
+/*
+ * Object-oriented thread pool API
+ *
+ * Copyright 2014-2015 Sebastian Lackner
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
+ */
+
+#include "config.h"
+#include "wine/port.h"
+
+#include <assert.h>
+#include <stdarg.h>
+#include <limits.h>
+
+#define NONAMELESSUNION
+#include "ntstatus.h"
+#define WIN32_NO_STATUS
+#include "winternl.h"
+
+#include "wine/debug.h"
+#include "wine/list.h"
+
+#include "ntdll_misc.h"
+
+WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
+
+static inline LONG interlocked_inc( PLONG dest )
+{
+ return interlocked_xchg_add( dest, 1 ) + 1;
+}
+
+static inline LONG interlocked_dec( PLONG dest )
+{
+ return interlocked_xchg_add( dest, -1 ) - 1;
+}
+
+#define THREADPOOL_WORKER_TIMEOUT 5000
+
+/* internal threadpool representation */
+struct threadpool
+{
+ LONG refcount;
+ BOOL shutdown;
+ CRITICAL_SECTION cs;
+ /* pool of work items, locked via .cs */
+ struct list pool;
+ RTL_CONDITION_VARIABLE update_event;
+ /* information about worker threads, locked via .cs */
+ int max_workers;
+ int min_workers;
+ int num_workers;
+ int num_busy_workers;
+};
+
+enum threadpool_objtype
+{
+ TP_OBJECT_TYPE_SIMPLE
+};
+
+/* internal threadpool object representation */
+struct threadpool_object
+{
+ LONG refcount;
+ BOOL shutdown;
+ /* read-only information */
+ enum threadpool_objtype type;
+ struct threadpool *pool;
+ PVOID userdata;
+ /* information about the pool, locked via .pool->cs */
+ struct list pool_entry;
+ LONG num_pending_callbacks;
+ LONG num_running_callbacks;
+ /* arguments for callback */
+ union
+ {
+ struct
+ {
+ PTP_SIMPLE_CALLBACK callback;
+ } simple;
+ } u;
+};
+
+static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
+{
+ return (struct threadpool *)pool;
+}
+
+static void CALLBACK threadpool_worker_proc( void *param );
+static NTSTATUS tp_threadpool_alloc( struct threadpool **out );
+static void tp_threadpool_shutdown( struct threadpool *pool );
+static BOOL tp_threadpool_release( struct threadpool *pool );
+static void tp_object_submit( struct threadpool_object *object );
+static void tp_object_shutdown( struct threadpool_object *object );
+static BOOL tp_object_release( struct threadpool_object *object );
+
+static struct threadpool *default_threadpool = NULL;
+
+/* allocates or returns the default threadpool */
+static struct threadpool *get_default_threadpool( void )
+{
+ if (!default_threadpool)
+ {
+ struct threadpool *pool;
+
+ if (tp_threadpool_alloc( &pool ) != STATUS_SUCCESS)
+ return NULL;
+
+ if (interlocked_cmpxchg_ptr( (void *)&default_threadpool, pool, NULL ) != NULL)
+ {
+ tp_threadpool_shutdown( pool );
+ tp_threadpool_release( pool );
+ }
+ }
+ return default_threadpool;
+}
+
+/* allocate a new threadpool (with at least one worker thread) */
+static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
+{
+ struct threadpool *pool;
+ NTSTATUS status;
+ HANDLE thread;
+
+ pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
+ if (!pool)
+ return STATUS_NO_MEMORY;
+
+ pool->refcount = 2; /* this thread + worker proc */
+ pool->shutdown = FALSE;
+
+ RtlInitializeCriticalSection( &pool->cs );
+ pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
+
+ list_init( &pool->pool );
+ RtlInitializeConditionVariable( &pool->update_event );
+
+ pool->max_workers = 500;
+ pool->min_workers = 1;
+ pool->num_workers = 1;
+ pool->num_busy_workers = 0;
+
+ status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
+ threadpool_worker_proc, pool, &thread, NULL );
+ if (status != STATUS_SUCCESS)
+ {
+ pool->cs.DebugInfo->Spare[0] = 0;
+ RtlDeleteCriticalSection( &pool->cs );
+ RtlFreeHeap( GetProcessHeap(), 0, pool );
+ return status;
+ }
+ NtClose( thread );
+
+ TRACE("allocated threadpool %p\n", pool);
+
+ *out = pool;
+ return STATUS_SUCCESS;
+}
+
+/* shutdown all threads of the threadpool */
+static void tp_threadpool_shutdown( struct threadpool *pool )
+{
+ assert( pool != default_threadpool );
+
+ pool->shutdown = TRUE;
+ RtlWakeAllConditionVariable( &pool->update_event );
+}
+
+/* release a reference to a threadpool */
+static BOOL tp_threadpool_release( struct threadpool *pool )
+{
+ if (interlocked_dec( &pool->refcount ))
+ return FALSE;
+
+ TRACE("destroying threadpool %p\n", pool);
+
+ assert( pool->shutdown );
+ assert( list_empty( &pool->pool ) );
+
+ pool->cs.DebugInfo->Spare[0] = 0;
+ RtlDeleteCriticalSection( &pool->cs );
+
+ RtlFreeHeap( GetProcessHeap(), 0, pool );
+ return TRUE;
+}
+
+/* threadpool worker function */
+static void CALLBACK threadpool_worker_proc( void *param )
+{
+ struct threadpool *pool = param;
+ LARGE_INTEGER timeout;
+ struct list *ptr;
+
+ RtlEnterCriticalSection( &pool->cs );
+ for (;;)
+ {
+ while ((ptr = list_head( &pool->pool )))
+ {
+ struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
+ assert( object->num_pending_callbacks > 0 );
+
+ /* If further pending callbacks are queued, move the work item to
+ * the end of the pool list. Otherwise remove it from the pool. */
+ list_remove( &object->pool_entry );
+ if (--object->num_pending_callbacks)
+ list_add_tail( &pool->pool, &object->pool_entry );
+
+ /* Leave critical section and do the actual callback. */
+ object->num_running_callbacks++;
+ pool->num_busy_workers++;
+ RtlLeaveCriticalSection( &pool->cs );
+
+ switch (object->type)
+ {
+ case TP_OBJECT_TYPE_SIMPLE:
+ {
+ TRACE( "executing simple callback %p(NULL, %p)\n",
+ object->u.simple.callback, object->userdata );
+ object->u.simple.callback( NULL, object->userdata );
+ TRACE( "callback %p returned\n", object->u.simple.callback );
+ break;
+ }
+
+ default:
+ assert(0);
+ break;
+ }
+
+ RtlEnterCriticalSection( &pool->cs );
+ pool->num_busy_workers--;
+ object->num_running_callbacks--;
+ tp_object_release( object );
+ }
+
+ /* Shutdown worker thread if requested. */
+ if (pool->shutdown)
+ break;
+
+ /* Wait for new tasks or until timeout expires. Never terminate the last worker. */
+ timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
+ if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
+ !list_head( &pool->pool ) && pool->num_workers > 1)
+ {
+ break;
+ }
+ }
+ pool->num_workers--;
+ RtlLeaveCriticalSection( &pool->cs );
+ tp_threadpool_release( pool );
+}
+
+/* initializes a new threadpool object */
+static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
+ PVOID userdata, TP_CALLBACK_ENVIRON *environment )
+{
+ object->refcount = 1;
+ object->shutdown = FALSE;
+
+ object->pool = pool;
+ object->userdata = userdata;
+
+ memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
+ object->num_pending_callbacks = 0;
+ object->num_running_callbacks = 0;
+
+ if (environment)
+ FIXME("environment not implemented yet\n");
+
+ /* Increase reference-count on the pool */
+ interlocked_inc( &pool->refcount );
+
+ TRACE("allocated object %p of type %u\n", object, object->type);
+}
+
+/* allocates and submits a 'simple' threadpool task. */
+static NTSTATUS tp_object_submit_simple( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
+ TP_CALLBACK_ENVIRON *environment )
+{
+ struct threadpool_object *object;
+ struct threadpool *pool = NULL;
+
+ if (environment)
+ pool = (struct threadpool *)environment->Pool;
+
+ if (!pool)
+ {
+ pool = get_default_threadpool();
+ if (!pool)
+ return STATUS_NO_MEMORY;
+ }
+
+ object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
+ if (!object)
+ return STATUS_NO_MEMORY;
+
+ object->type = TP_OBJECT_TYPE_SIMPLE;
+ object->u.simple.callback = callback;
+ tp_object_initialize( object, pool, userdata, environment );
+
+ tp_object_submit( object );
+
+ tp_object_shutdown( object );
+ tp_object_release( object );
+ return STATUS_SUCCESS;
+}
+
+/* submits an object to a threadpool */
+static void tp_object_submit( struct threadpool_object *object )
+{
+ struct threadpool *pool = object->pool;
+
+ assert( !object->shutdown );
+ assert( !pool->shutdown );
+
+ RtlEnterCriticalSection( &pool->cs );
+
+ /* Start new worker threads if required (and allowed) */
+ if (pool->num_busy_workers >= pool->num_workers && pool->num_workers < pool->max_workers)
+ {
+ NTSTATUS status;
+ HANDLE thread;
+
+ status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
+ threadpool_worker_proc, pool, &thread, NULL );
+ if (status == STATUS_SUCCESS)
+ {
+ interlocked_inc( &pool->refcount );
+ pool->num_workers++;
+ NtClose( thread );
+ goto out;
+ }
+ }
+
+ assert( pool->num_workers > 0 );
+ RtlWakeConditionVariable( &pool->update_event );
+
+out:
+ /* Queue work item into pool and increment refcount */
+ interlocked_inc( &object->refcount );
+ if (!object->num_pending_callbacks++)
+ list_add_tail( &pool->pool, &object->pool_entry );
+
+ RtlLeaveCriticalSection( &pool->cs );
+}
+
+/* mark an object as 'shutdown', submitting is no longer possible */
+static void tp_object_shutdown( struct threadpool_object *object )
+{
+ object->shutdown = TRUE;
+}
+
+/* release a reference to a threadpool object */
+static BOOL tp_object_release( struct threadpool_object *object )
+{
+ if (interlocked_dec( &object->refcount ))
+ return FALSE;
+
+ TRACE("destroying object %p of type %u\n", object, object->type);
+
+ assert( object->shutdown );
+ assert( !object->num_pending_callbacks );
+ assert( !object->num_running_callbacks );
+
+ /* release reference to threadpool */
+ tp_threadpool_release( object->pool );
+
+ RtlFreeHeap( GetProcessHeap(), 0, object );
+ return TRUE;
+}
+
+
+/***********************************************************************
+ * TpAllocPool (NTDLL.@)
+ */
+NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
+{
+ TRACE("%p %p\n", out, reserved);
+
+ if (reserved)
+ FIXME("reserved argument is nonzero (%p)", reserved);
+
+ if (!out)
+ return STATUS_ACCESS_VIOLATION;
+
+ return tp_threadpool_alloc( (struct threadpool **)out );
+}
+
+/***********************************************************************
+ * TpReleasePool (NTDLL.@)
+ */
+VOID WINAPI TpReleasePool( TP_POOL *pool )
+{
+ struct threadpool *this = impl_from_TP_POOL( pool );
+ TRACE("%p\n", pool);
+
+ if (this)
+ {
+ tp_threadpool_shutdown( this );
+ tp_threadpool_release( this );
+ }
+}
+
+/***********************************************************************
+ * TpSimpleTryPost (NTDLL.@)
+ */
+NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
+ TP_CALLBACK_ENVIRON *environment )
+{
+ TRACE("%p %p %p\n", callback, userdata, environment);
+
+ return tp_object_submit_simple( callback, userdata, environment );
+}
--
2.3.0

View File

@ -0,0 +1,70 @@
From b2f107dc3bbde6a242f2ce0fc4b350728f8135f5 Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Wed, 4 Mar 2015 00:16:20 +0100
Subject: ntdll: Implement TpSetPool[Min|Max]Threads.
---
dlls/ntdll/ntdll.spec | 2 ++
dlls/ntdll/threadpool2.c | 33 +++++++++++++++++++++++++++++++++
2 files changed, 35 insertions(+)
diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec
index aa16021..9a7ddeb 100644
--- a/dlls/ntdll/ntdll.spec
+++ b/dlls/ntdll/ntdll.spec
@@ -969,6 +969,8 @@
@ stdcall RtlxUnicodeStringToOemSize(ptr) RtlUnicodeStringToOemSize
@ stdcall TpAllocPool(ptr ptr)
@ stdcall TpReleasePool(ptr)
+@ stdcall TpSetPoolMaxThreads(ptr long)
+@ stdcall TpSetPoolMinThreads(ptr long)
@ stdcall TpSimpleTryPost(ptr ptr ptr)
@ stdcall -ret64 VerSetConditionMask(int64 long long)
@ stdcall ZwAcceptConnectPort(ptr long ptr long long ptr) NtAcceptConnectPort
diff --git a/dlls/ntdll/threadpool2.c b/dlls/ntdll/threadpool2.c
index e912729..08bcde0 100644
--- a/dlls/ntdll/threadpool2.c
+++ b/dlls/ntdll/threadpool2.c
@@ -413,6 +413,39 @@ VOID WINAPI TpReleasePool( TP_POOL *pool )
}
/***********************************************************************
+ * TpSetPoolMaxThreads (NTDLL.@)
+ */
+VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
+{
+ struct threadpool *this = impl_from_TP_POOL( pool );
+ TRACE("%p %d\n", pool, maximum);
+
+ if (this)
+ {
+ RtlEnterCriticalSection( &this->cs );
+ this->max_workers = max(maximum, 1);
+ RtlLeaveCriticalSection( &this->cs );
+ }
+}
+
+/***********************************************************************
+ * TpSetPoolMinThreads (NTDLL.@)
+ */
+BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
+{
+ struct threadpool *this = impl_from_TP_POOL( pool );
+ FIXME("%p %d: semi-stub\n", pool, minimum);
+
+ if (this)
+ {
+ RtlEnterCriticalSection( &this->cs );
+ this->min_workers = max(minimum, 1);
+ RtlLeaveCriticalSection( &this->cs );
+ }
+ return TRUE;
+}
+
+/***********************************************************************
* TpSimpleTryPost (NTDLL.@)
*/
NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
--
2.3.0

View File

@ -0,0 +1,387 @@
From c6e44243dba7ac4a57ccef25e83645fa09855b98 Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Wed, 4 Mar 2015 00:52:18 +0100
Subject: ntdll: Implement threadpool cleanup group functions.
---
dlls/ntdll/ntdll.spec | 3 +
dlls/ntdll/threadpool2.c | 261 ++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 259 insertions(+), 5 deletions(-)
diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec
index 9a7ddeb..6dad2bb 100644
--- a/dlls/ntdll/ntdll.spec
+++ b/dlls/ntdll/ntdll.spec
@@ -967,7 +967,10 @@
@ stdcall RtlxOemStringToUnicodeSize(ptr) RtlOemStringToUnicodeSize
@ stdcall RtlxUnicodeStringToAnsiSize(ptr) RtlUnicodeStringToAnsiSize
@ stdcall RtlxUnicodeStringToOemSize(ptr) RtlUnicodeStringToOemSize
+@ stdcall TpAllocCleanupGroup(ptr)
@ stdcall TpAllocPool(ptr ptr)
+@ stdcall TpReleaseCleanupGroup(ptr)
+@ stdcall TpReleaseCleanupGroupMembers(ptr long ptr)
@ stdcall TpReleasePool(ptr)
@ stdcall TpSetPoolMaxThreads(ptr long)
@ stdcall TpSetPoolMinThreads(ptr long)
diff --git a/dlls/ntdll/threadpool2.c b/dlls/ntdll/threadpool2.c
index 08bcde0..30f1e6d 100644
--- a/dlls/ntdll/threadpool2.c
+++ b/dlls/ntdll/threadpool2.c
@@ -78,9 +78,14 @@ struct threadpool_object
/* read-only information */
enum threadpool_objtype type;
struct threadpool *pool;
+ struct threadpool_group *group;
PVOID userdata;
+ /* information about the group, locked via .group->cs */
+ struct list group_entry;
+ BOOL is_group_member;
/* information about the pool, locked via .pool->cs */
struct list pool_entry;
+ RTL_CONDITION_VARIABLE finished_event;
LONG num_pending_callbacks;
LONG num_running_callbacks;
/* arguments for callback */
@@ -93,11 +98,26 @@ struct threadpool_object
} u;
};
+/* internal threadpool group representation */
+struct threadpool_group
+{
+ LONG refcount;
+ BOOL shutdown;
+ CRITICAL_SECTION cs;
+ /* list of group members, locked via .cs */
+ struct list members;
+};
+
static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
{
return (struct threadpool *)pool;
}
+static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
+{
+ return (struct threadpool_group *)group;
+}
+
static void CALLBACK threadpool_worker_proc( void *param );
static NTSTATUS tp_threadpool_alloc( struct threadpool **out );
static void tp_threadpool_shutdown( struct threadpool *pool );
@@ -105,6 +125,7 @@ static BOOL tp_threadpool_release( struct threadpool *pool );
static void tp_object_submit( struct threadpool_object *object );
static void tp_object_shutdown( struct threadpool_object *object );
static BOOL tp_object_release( struct threadpool_object *object );
+static BOOL tp_group_release( struct threadpool_group *group );
static struct threadpool *default_threadpool = NULL;
@@ -241,6 +262,8 @@ static void CALLBACK threadpool_worker_proc( void *param )
RtlEnterCriticalSection( &pool->cs );
pool->num_busy_workers--;
object->num_running_callbacks--;
+ if (!object->num_pending_callbacks && !object->num_running_callbacks)
+ RtlWakeAllConditionVariable( &object->finished_event );
tp_object_release( object );
}
@@ -265,23 +288,60 @@ static void CALLBACK threadpool_worker_proc( void *param )
static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
PVOID userdata, TP_CALLBACK_ENVIRON *environment )
{
+ BOOL simple_cb = (object->type == TP_OBJECT_TYPE_SIMPLE);
+
object->refcount = 1;
object->shutdown = FALSE;
object->pool = pool;
+ object->group = NULL;
object->userdata = userdata;
+ memset( &object->group_entry, 0, sizeof(object->group_entry) );
+ object->is_group_member = FALSE;
+
memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
+ RtlInitializeConditionVariable( &object->finished_event );
object->num_pending_callbacks = 0;
object->num_running_callbacks = 0;
if (environment)
- FIXME("environment not implemented yet\n");
+ {
+ if (environment->Version != 1)
+ FIXME("unsupported environment version %u\n", environment->Version);
+
+ object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
+
+ WARN("environment not fully implemented yet\n");
+ }
/* Increase reference-count on the pool */
interlocked_inc( &pool->refcount );
TRACE("allocated object %p of type %u\n", object, object->type);
+
+ /* For simple callbacks we have to run tp_object_submit before adding this object
+ * to the cleanup group. As soon as the cleanup group members are released ->shutdown
+ * will be set, and tp_object_submit would fail with an assertion. */
+ if (simple_cb)
+ tp_object_submit( object );
+
+ if (object->group)
+ {
+ struct threadpool_group *group = object->group;
+ interlocked_inc( &group->refcount );
+
+ RtlEnterCriticalSection( &group->cs );
+ list_add_tail( &group->members, &object->group_entry );
+ object->is_group_member = TRUE;
+ RtlLeaveCriticalSection( &group->cs );
+ }
+
+ if (simple_cb)
+ {
+ tp_object_shutdown( object );
+ tp_object_release( object );
+ }
}
/* allocates and submits a 'simple' threadpool task. */
@@ -309,10 +369,6 @@ static NTSTATUS tp_object_submit_simple( PTP_SIMPLE_CALLBACK callback, PVOID use
object->u.simple.callback = callback;
tp_object_initialize( object, pool, userdata, environment );
- tp_object_submit( object );
-
- tp_object_shutdown( object );
- tp_object_release( object );
return STATUS_SUCCESS;
}
@@ -355,6 +411,41 @@ out:
RtlLeaveCriticalSection( &pool->cs );
}
+static void tp_object_cancel( struct threadpool_object *object )
+{
+ struct threadpool *pool = object->pool;
+ LONG pending_callbacks = 0;
+
+ RtlEnterCriticalSection( &pool->cs );
+
+ /* Remove the pending callbacks from the pool */
+ if (object->num_pending_callbacks)
+ {
+ pending_callbacks = object->num_pending_callbacks;
+ object->num_pending_callbacks = 0;
+ list_remove( &object->pool_entry );
+ }
+
+ RtlLeaveCriticalSection( &pool->cs );
+
+ /* Release references */
+ while (pending_callbacks--)
+ tp_object_release( object );
+}
+
+static void tp_object_wait( struct threadpool_object *object )
+{
+ struct threadpool *pool = object->pool;
+
+ RtlEnterCriticalSection( &pool->cs );
+
+ /* Wait until there are no longer pending or running callbacks */
+ while (object->num_pending_callbacks || object->num_running_callbacks)
+ RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
+
+ RtlLeaveCriticalSection( &pool->cs );
+}
+
/* mark an object as 'shutdown', submitting is no longer possible */
static void tp_object_shutdown( struct threadpool_object *object )
{
@@ -373,6 +464,22 @@ static BOOL tp_object_release( struct threadpool_object *object )
assert( !object->num_pending_callbacks );
assert( !object->num_running_callbacks );
+ /* release reference to the group */
+ if (object->group)
+ {
+ struct threadpool_group *group = object->group;
+
+ RtlEnterCriticalSection( &group->cs );
+ if (object->is_group_member)
+ {
+ list_remove( &object->group_entry );
+ object->is_group_member = FALSE;
+ }
+ RtlLeaveCriticalSection( &group->cs );
+
+ tp_group_release( group );
+ }
+
/* release reference to threadpool */
tp_threadpool_release( object->pool );
@@ -380,6 +487,121 @@ static BOOL tp_object_release( struct threadpool_object *object )
return TRUE;
}
+/* allocates a new cleanup group */
+static NTSTATUS tp_group_alloc( struct threadpool_group **out )
+{
+ struct threadpool_group *group;
+
+ group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
+ if (!group)
+ return STATUS_NO_MEMORY;
+
+ group->refcount = 1;
+ group->shutdown = FALSE;
+
+ RtlInitializeCriticalSection( &group->cs );
+ group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
+
+ list_init( &group->members );
+
+ TRACE("allocated group %p\n", group);
+
+ *out = group;
+ return STATUS_SUCCESS;
+}
+
+/* marks a cleanup group for shutdown */
+static void tp_group_shutdown( struct threadpool_group *group )
+{
+ group->shutdown = TRUE;
+}
+
+/* releases a reference to a cleanup group */
+static BOOL tp_group_release( struct threadpool_group *group )
+{
+ if (interlocked_dec( &group->refcount ))
+ return FALSE;
+
+ TRACE("destroying group %p\n", group);
+
+ assert( group->shutdown );
+ assert( list_empty( &group->members ) );
+
+ group->cs.DebugInfo->Spare[0] = 0;
+ RtlDeleteCriticalSection( &group->cs );
+
+ RtlFreeHeap( GetProcessHeap(), 0, group );
+ return TRUE;
+}
+
+/* releases all group members of a cleanup group */
+static void tp_group_release_members( struct threadpool_group *group, BOOL cancel_pending, PVOID userdata )
+{
+ struct threadpool_object *object, *next;
+ struct list members;
+
+ RtlEnterCriticalSection( &group->cs );
+
+ /* Unset group, increase references, and mark objects for shutdown */
+ LIST_FOR_EACH_ENTRY_SAFE( object, next, &group->members, struct threadpool_object, group_entry )
+ {
+ assert( object->group == group );
+ assert( object->is_group_member );
+
+ /* Simple callbacks are very special. The user doesn't hold any reference, so
+ * they would be released too early. Add one additional temporary reference. */
+ if (object->type == TP_OBJECT_TYPE_SIMPLE)
+ {
+ if (interlocked_inc( &object->refcount ) == 1)
+ {
+ /* Object is basically already destroyed, but group reference
+ * was not deleted yet. We can safely ignore this object. */
+ interlocked_dec( &object->refcount );
+ list_remove( &object->group_entry );
+ object->is_group_member = FALSE;
+ continue;
+ }
+ }
+
+ object->is_group_member = FALSE;
+ tp_object_shutdown( object );
+ }
+
+ /* Move members to a local list */
+ list_init( &members );
+ list_move_tail( &members, &group->members );
+
+ RtlLeaveCriticalSection( &group->cs );
+
+ /* Cancel pending callbacks if requested */
+ if (cancel_pending)
+ {
+ LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
+ {
+ tp_object_cancel( object );
+ }
+ }
+
+ /* Wait for remaining callbacks to finish */
+ LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
+ {
+ tp_object_wait( object );
+ tp_object_release( object );
+ }
+}
+
+/***********************************************************************
+ * TpAllocCleanupGroup (NTDLL.@)
+ */
+NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
+{
+ TRACE("%p\n", out);
+
+ if (!out)
+ return STATUS_ACCESS_VIOLATION;
+
+ return tp_group_alloc( (struct threadpool_group **)out );
+}
/***********************************************************************
* TpAllocPool (NTDLL.@)
@@ -398,6 +620,35 @@ NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
}
/***********************************************************************
+ * TpReleaseCleanupGroup (NTDLL.@)
+ */
+VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
+{
+ struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
+ TRACE("%p\n", group);
+
+ if (this)
+ {
+ tp_group_shutdown( this );
+ tp_group_release( this );
+ }
+}
+
+/***********************************************************************
+ * TpReleaseCleanupGroupMembers (NTDLL.@)
+ */
+VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
+{
+ struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
+ TRACE("%p %d %p\n", group, cancel_pending, userdata);
+
+ if (this)
+ {
+ tp_group_release_members( this, cancel_pending, userdata );
+ }
+}
+
+/***********************************************************************
* TpReleasePool (NTDLL.@)
*/
VOID WINAPI TpReleasePool( TP_POOL *pool )
--
2.3.0

View File

@ -0,0 +1,102 @@
From 8517209b838b33e7ff69d70285c4e53d5b5a4258 Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Wed, 4 Mar 2015 01:19:41 +0100
Subject: ntdll/tests: Add tests for TpAllocCleanupGroup and related functions.
---
dlls/ntdll/tests/threadpool.c | 48 +++++++++++++++++++++++++++++++++++++++++++
1 file changed, 48 insertions(+)
diff --git a/dlls/ntdll/tests/threadpool.c b/dlls/ntdll/tests/threadpool.c
index 6f164e9..effa7fc 100644
--- a/dlls/ntdll/tests/threadpool.c
+++ b/dlls/ntdll/tests/threadpool.c
@@ -21,7 +21,10 @@
#include "ntdll_test.h"
static HMODULE hntdll = 0;
+static NTSTATUS (WINAPI *pTpAllocCleanupGroup)(TP_CLEANUP_GROUP **);
static NTSTATUS (WINAPI *pTpAllocPool)(TP_POOL **,PVOID);
+static VOID (WINAPI *pTpReleaseCleanupGroup)(TP_CLEANUP_GROUP *);
+static VOID (WINAPI *pTpReleaseCleanupGroupMembers)(TP_CLEANUP_GROUP *,BOOL,PVOID);
static VOID (WINAPI *pTpReleasePool)(TP_POOL *);
static NTSTATUS (WINAPI *pTpSimpleTryPost)(PTP_SIMPLE_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
@@ -42,7 +45,10 @@ static BOOL init_threadpool(void)
return FALSE;
}
+ NTDLL_GET_PROC(TpAllocCleanupGroup);
NTDLL_GET_PROC(TpAllocPool);
+ NTDLL_GET_PROC(TpReleaseCleanupGroup);
+ NTDLL_GET_PROC(TpReleaseCleanupGroupMembers);
NTDLL_GET_PROC(TpReleasePool);
NTDLL_GET_PROC(TpSimpleTryPost);
@@ -65,13 +71,23 @@ static void CALLBACK simple_cb(TP_CALLBACK_INSTANCE *instance, void *userdata)
ReleaseSemaphore(semaphore, 1, NULL);
}
+static void CALLBACK simple2_cb(TP_CALLBACK_INSTANCE *instance, void *userdata)
+{
+ trace("Running simple2 callback\n");
+ Sleep(100);
+ InterlockedIncrement((LONG *)userdata);
+}
+
static void test_tp_simple(void)
{
TP_CALLBACK_ENVIRON environment;
+ TP_CLEANUP_GROUP *group;
HANDLE semaphore;
NTSTATUS status;
TP_POOL *pool;
+ LONG userdata;
DWORD result;
+ int i;
semaphore = CreateSemaphoreA(NULL, 0, 1, NULL);
ok(semaphore != NULL, "CreateSemaphoreA failed %u\n", GetLastError());
@@ -114,7 +130,39 @@ static void test_tp_simple(void)
ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
}
+ /* allocate a cleanup group for synchronization */
+ group = NULL;
+ status = pTpAllocCleanupGroup(&group);
+ ok(!status, "TpAllocCleanupGroup failed with status %x\n", status);
+ ok(group != NULL, "expected pool != NULL\n");
+
+ /* use cleanup group to wait for a simple callback */
+ userdata = 0;
+ memset(&environment, 0, sizeof(environment));
+ environment.Version = 1;
+ environment.Pool = pool;
+ environment.CleanupGroup = group;
+ status = pTpSimpleTryPost(simple2_cb, &userdata, &environment);
+ ok(!status, "TpSimpleTryPost failed with status %x\n", status);
+ pTpReleaseCleanupGroupMembers(group, FALSE, NULL);
+ ok(userdata == 1, "expected userdata = 1, got %u\n", userdata);
+
+ /* test cancellation of pending simple callbacks */
+ userdata = 0;
+ memset(&environment, 0, sizeof(environment));
+ environment.Version = 1;
+ environment.Pool = pool;
+ environment.CleanupGroup = group;
+ for (i = 0; i < 100; i++)
+ {
+ status = pTpSimpleTryPost(simple2_cb, &userdata, &environment);
+ ok(!status, "TpSimpleTryPost failed with status %x\n", status);
+ }
+ pTpReleaseCleanupGroupMembers(group, TRUE, NULL);
+ ok(userdata < 100, "expected userdata < 100, got %u\n", userdata);
+
/* cleanup */
+ pTpReleaseCleanupGroup(group);
pTpReleasePool(pool);
CloseHandle(semaphore);
}
--
2.3.0

View File

@ -1,68 +1,52 @@
From f0c8d1f64679e4777439ffc1b9d74d47287d72eb Mon Sep 17 00:00:00 2001
From 5e0c3a3b77b95434e19f273da7dcadc7cd224cea Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Sun, 1 Feb 2015 18:06:08 +0100
Subject: ntdll: Implement additional threadpool work item functions.
Date: Wed, 4 Mar 2015 01:30:57 +0100
Subject: ntdll: Implement threadpool work item functions.
---
dlls/ntdll/ntdll.spec | 8 ++--
dlls/ntdll/threadpool2.c | 97 +++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 100 insertions(+), 5 deletions(-)
dlls/ntdll/ntdll.spec | 4 ++
dlls/ntdll/threadpool2.c | 108 ++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 111 insertions(+), 1 deletion(-)
diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec
index 256ec6d..bf9e795 100644
index 6dad2bb..1c768c9 100644
--- a/dlls/ntdll/ntdll.spec
+++ b/dlls/ntdll/ntdll.spec
@@ -975,7 +975,7 @@
@@ -969,12 +969,16 @@
@ stdcall RtlxUnicodeStringToOemSize(ptr) RtlUnicodeStringToOemSize
@ stdcall TpAllocCleanupGroup(ptr)
@ stdcall TpAllocPool(ptr ptr)
# @ stub TpAllocTimer
# @ stub TpAllocWait
-# @ stub TpAllocWork
+@ stdcall TpAllocWork(ptr ptr ptr ptr)
# @ stub TpAlpcRegisterCompletionList
# @ stub TpAlpcUnregisterCompletionList
# @ stub TpCallbackDetectedUnrecoverableError
@@ -996,7 +996,7 @@
# @ stub TpDisablePoolCallbackChecks
@ stdcall TpDisassociateCallback(ptr)
# @ stub TpIsTimerSet
-# @ stub TpPostWork
+@ stdcall TpPostWork(ptr)
# @ stub TpQueryPoolStackInformation
# @ stub TpReleaseAlpcCompletion
@ stdcall TpReleaseCleanupGroup(ptr)
@@ -1006,7 +1006,7 @@
@ stdcall TpReleaseCleanupGroupMembers(ptr long ptr)
@ stdcall TpReleasePool(ptr)
# @ stub TpReleaseTimer
# @ stub TpReleaseWait
-# @ stub TpReleaseWork
+@ stdcall TpReleaseWork(ptr)
# @ stub TpSetDefaultPoolMaxThreads
# @ stub TpSetDefaultPoolStackInformation
@ stdcall TpSetPoolMaxThreads(ptr long)
@@ -1030,3 +1030,3 @@
# @ stub TpWaitForWait
-# @ stub TpWaitForWork
@ stdcall TpSetPoolMinThreads(ptr long)
@ stdcall TpSimpleTryPost(ptr ptr ptr)
+@ stdcall TpWaitForWork(ptr long)
@ stdcall -ret64 VerSetConditionMask(int64 long long)
@ stdcall ZwAcceptConnectPort(ptr long ptr long long ptr) NtAcceptConnectPort
@ stdcall ZwAccessCheck(ptr long long ptr ptr ptr ptr ptr) NtAccessCheck
diff --git a/dlls/ntdll/threadpool2.c b/dlls/ntdll/threadpool2.c
index c4e2d21..a6fd141 100644
index 30f1e6d..31b5e2e 100644
--- a/dlls/ntdll/threadpool2.c
+++ b/dlls/ntdll/threadpool2.c
@@ -123,7 +123,8 @@ struct threadpool_object
enum
{
TP_OBJECT_TYPE_UNDEFINED,
- TP_OBJECT_TYPE_SIMPLE
+ TP_OBJECT_TYPE_SIMPLE,
+ TP_OBJECT_TYPE_WORK
} type;
@@ -67,7 +67,8 @@ struct threadpool
/* arguments for callback */
@@ -134,6 +135,11 @@ struct threadpool_object
enum threadpool_objtype
{
- TP_OBJECT_TYPE_SIMPLE
+ TP_OBJECT_TYPE_SIMPLE,
+ TP_OBJECT_TYPE_WORK
};
/* internal threadpool object representation */
@@ -95,6 +96,10 @@ struct threadpool_object
{
PTP_SIMPLE_CALLBACK callback;
} simple;
+ /* work callback */
+ struct
+ {
+ PTP_WORK_CALLBACK callback;
@ -70,7 +54,7 @@ index c4e2d21..a6fd141 100644
} u;
};
@@ -153,6 +159,13 @@ static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
@@ -113,6 +118,13 @@ static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
return (struct threadpool *)pool;
}
@ -84,24 +68,23 @@ index c4e2d21..a6fd141 100644
static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
{
return (struct threadpool_group *)group;
@@ -423,6 +436,16 @@ static void CALLBACK threadpool_worker_proc( void *param )
@@ -254,6 +266,15 @@ static void CALLBACK threadpool_worker_proc( void *param )
break;
}
+ case TP_OBJECT_TYPE_WORK:
+ {
+ TP_CALLBACK_INSTANCE *cb_instance = (TP_CALLBACK_INSTANCE *)&instance;
+ TRACE( "executing callback %p(%p, %p, %p)\n",
+ object->u.work.callback, cb_instance, object->userdata, object );
+ object->u.work.callback( cb_instance, object->userdata, (TP_WORK *)object );
+ TRACE( "executing work callback %p(NULL, %p, %p)\n",
+ object->u.work.callback, object->userdata, object );
+ object->u.work.callback( NULL, object->userdata, (TP_WORK *)object );
+ TRACE( "callback %p returned\n", object->u.work.callback );
+ break;
+ }
+
default:
FIXME( "callback type %u not implemented\n", object->type );
assert(0);
break;
@@ -578,6 +601,29 @@ static NTSTATUS tp_object_submit_simple( PTP_SIMPLE_CALLBACK callback, PVOID use
@@ -372,6 +393,34 @@ static NTSTATUS tp_object_submit_simple( PTP_SIMPLE_CALLBACK callback, PVOID use
return STATUS_SUCCESS;
}
@ -109,12 +92,17 @@ index c4e2d21..a6fd141 100644
+ PVOID userdata, TP_CALLBACK_ENVIRON *environment )
+{
+ struct threadpool_object *object;
+ struct threadpool *pool;
+ struct threadpool *pool = NULL;
+
+ /* determine threadpool */
+ pool = environment ? (struct threadpool *)environment->Pool : NULL;
+ if (!pool) pool = get_default_threadpool();
+ if (!pool) return STATUS_NO_MEMORY;
+ if (environment)
+ pool = (struct threadpool *)environment->Pool;
+
+ if (!pool)
+ {
+ pool = get_default_threadpool();
+ if (!pool)
+ return STATUS_NO_MEMORY;
+ }
+
+ object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
+ if (!object)
@ -122,16 +110,16 @@ index c4e2d21..a6fd141 100644
+
+ object->type = TP_OBJECT_TYPE_WORK;
+ object->u.work.callback = callback;
+ tp_object_initialize( object, pool, userdata, environment, FALSE );
+ tp_object_initialize( object, pool, userdata, environment );
+
+ *out = object;
+ return STATUS_SUCCESS;
+}
+
static BOOL tp_object_release( struct threadpool_object *object )
/* submits an object to a threadpool */
static void tp_object_submit( struct threadpool_object *object )
{
struct threadpool_group *group;
@@ -816,6 +862,16 @@ NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
@@ -620,6 +669,32 @@ NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
}
/***********************************************************************
@ -141,31 +129,30 @@ index c4e2d21..a6fd141 100644
+ TP_CALLBACK_ENVIRON *environment )
+{
+ TRACE("%p %p %p %p\n", out, callback, userdata, environment);
+ return tp_object_alloc_work( (struct threadpool_object **)out, callback, userdata, environment );
+
+ return tp_object_alloc_work( (struct threadpool_object **)out, callback,
+ userdata, environment );
+}
+
+/***********************************************************************
* TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
*/
VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit )
@@ -910,6 +966,16 @@ VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
}
/***********************************************************************
+ * TpPostWork (NTDLL.@)
+ */
+VOID WINAPI TpPostWork( TP_WORK *work )
+{
+ struct threadpool_object *this = impl_from_TP_WORK( work );
+ TRACE("%p\n", work);
+ if (this) tp_object_submit( this );
+
+ if (this)
+ {
+ tp_object_submit( this );
+ }
+}
+
+/***********************************************************************
* TpReleaseCleanupGroup (NTDLL.@)
*/
VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
@@ -948,6 +1014,20 @@ VOID WINAPI TpReleasePool( TP_POOL *pool )
@@ -664,6 +739,21 @@ VOID WINAPI TpReleasePool( TP_POOL *pool )
}
/***********************************************************************
@ -175,6 +162,7 @@ index c4e2d21..a6fd141 100644
+{
+ struct threadpool_object *this = impl_from_TP_WORK( work );
+ TRACE("%p\n", work);
+
+ if (this)
+ {
+ tp_object_shutdown( this );
@ -186,8 +174,8 @@ index c4e2d21..a6fd141 100644
* TpSetPoolMaxThreads (NTDLL.@)
*/
VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
@@ -976,3 +1056,18 @@ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata, T
TRACE("%p %p %p\n", callback, userdata, environment);
@@ -706,3 +796,19 @@ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
return tp_object_submit_simple( callback, userdata, environment );
}
+
@ -198,13 +186,14 @@ index c4e2d21..a6fd141 100644
+{
+ struct threadpool_object *this = impl_from_TP_WORK( work );
+ TRACE("%p %d\n", work, cancel_pending);
+
+ if (this)
+ {
+ if (cancel_pending)
+ tp_object_cancel( this, FALSE, NULL );
+ tp_object_cancel( this );
+ tp_object_wait( this );
+ }
+}
--
2.2.2
2.3.0

View File

@ -0,0 +1,108 @@
From 688ef1722a39b3b51797cd179181012b0863cd1e Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Wed, 4 Mar 2015 01:38:23 +0100
Subject: ntdll/tests: Add basic tests for threadpool work items.
---
dlls/ntdll/tests/threadpool.c | 59 +++++++++++++++++++++++++++++++++++++++++++
1 file changed, 59 insertions(+)
diff --git a/dlls/ntdll/tests/threadpool.c b/dlls/ntdll/tests/threadpool.c
index effa7fc..420979c 100644
--- a/dlls/ntdll/tests/threadpool.c
+++ b/dlls/ntdll/tests/threadpool.c
@@ -23,10 +23,14 @@
static HMODULE hntdll = 0;
static NTSTATUS (WINAPI *pTpAllocCleanupGroup)(TP_CLEANUP_GROUP **);
static NTSTATUS (WINAPI *pTpAllocPool)(TP_POOL **,PVOID);
+static NTSTATUS (WINAPI *pTpAllocWork)(TP_WORK **,PTP_WORK_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
+static VOID (WINAPI *pTpPostWork)(TP_WORK *);
static VOID (WINAPI *pTpReleaseCleanupGroup)(TP_CLEANUP_GROUP *);
static VOID (WINAPI *pTpReleaseCleanupGroupMembers)(TP_CLEANUP_GROUP *,BOOL,PVOID);
static VOID (WINAPI *pTpReleasePool)(TP_POOL *);
+static VOID (WINAPI *pTpReleaseWork)(TP_WORK *);
static NTSTATUS (WINAPI *pTpSimpleTryPost)(PTP_SIMPLE_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
+static VOID (WINAPI *pTpWaitForWork)(TP_WORK *,BOOL);
#define NTDLL_GET_PROC(func) \
do \
@@ -47,10 +51,14 @@ static BOOL init_threadpool(void)
NTDLL_GET_PROC(TpAllocCleanupGroup);
NTDLL_GET_PROC(TpAllocPool);
+ NTDLL_GET_PROC(TpAllocWork);
+ NTDLL_GET_PROC(TpPostWork);
NTDLL_GET_PROC(TpReleaseCleanupGroup);
NTDLL_GET_PROC(TpReleaseCleanupGroupMembers);
NTDLL_GET_PROC(TpReleasePool);
+ NTDLL_GET_PROC(TpReleaseWork);
NTDLL_GET_PROC(TpSimpleTryPost);
+ NTDLL_GET_PROC(TpWaitForWork);
if (!pTpAllocPool)
{
@@ -167,10 +175,61 @@ static void test_tp_simple(void)
CloseHandle(semaphore);
}
+static void CALLBACK work_cb(TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WORK *work)
+{
+ trace("Running work callback\n");
+ Sleep(10);
+ InterlockedIncrement((LONG *)userdata);
+}
+
+static void test_tp_work(void)
+{
+ TP_CALLBACK_ENVIRON environment;
+ TP_WORK *work;
+ TP_POOL *pool;
+ NTSTATUS status;
+ LONG userdata;
+ int i;
+
+ /* allocate new threadpool */
+ pool = NULL;
+ status = pTpAllocPool(&pool, NULL);
+ ok(!status, "TpAllocPool failed with status %x\n", status);
+ ok(pool != NULL, "expected pool != NULL\n");
+
+ /* allocate new work item */
+ work = NULL;
+ memset(&environment, 0, sizeof(environment));
+ environment.Version = 1;
+ environment.Pool = pool;
+ status = pTpAllocWork(&work, work_cb, &userdata, &environment);
+ ok(!status, "TpAllocWork failed with status %x\n", status);
+ ok(work != NULL, "expected work != NULL\n");
+
+ /* post 10 identical work items at once */
+ userdata = 0;
+ for (i = 0; i < 10; i++)
+ pTpPostWork(work);
+ pTpWaitForWork(work, FALSE);
+ ok(userdata == 10, "expected userdata = 10, got %u\n", userdata);
+
+ /* add more tasks and cancel them immediately */
+ userdata = 0;
+ for (i = 0; i < 10; i++)
+ pTpPostWork(work);
+ pTpWaitForWork(work, TRUE);
+ ok(userdata < 10, "expected userdata < 10, got %u\n", userdata);
+
+ /* cleanup */
+ pTpReleaseWork(work);
+ pTpReleasePool(pool);
+}
+
START_TEST(threadpool)
{
if (!init_threadpool())
return;
test_tp_simple();
+ test_tp_work();
}
--
2.3.0

View File

@ -0,0 +1,132 @@
From d796254c2e3c5188d893fb8d1de4978bb864357e Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Wed, 4 Mar 2015 01:45:11 +0100
Subject: ntdll/tests: Add threadpool scheduler tests for work items.
---
dlls/ntdll/tests/threadpool.c | 83 +++++++++++++++++++++++++++++++++++++++++++
1 file changed, 83 insertions(+)
diff --git a/dlls/ntdll/tests/threadpool.c b/dlls/ntdll/tests/threadpool.c
index 420979c..1cfd67b 100644
--- a/dlls/ntdll/tests/threadpool.c
+++ b/dlls/ntdll/tests/threadpool.c
@@ -29,6 +29,7 @@ static VOID (WINAPI *pTpReleaseCleanupGroup)(TP_CLEANUP_GROUP *);
static VOID (WINAPI *pTpReleaseCleanupGroupMembers)(TP_CLEANUP_GROUP *,BOOL,PVOID);
static VOID (WINAPI *pTpReleasePool)(TP_POOL *);
static VOID (WINAPI *pTpReleaseWork)(TP_WORK *);
+static VOID (WINAPI *pTpSetPoolMaxThreads)(TP_POOL *,DWORD);
static NTSTATUS (WINAPI *pTpSimpleTryPost)(PTP_SIMPLE_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
static VOID (WINAPI *pTpWaitForWork)(TP_WORK *,BOOL);
@@ -57,6 +58,7 @@ static BOOL init_threadpool(void)
NTDLL_GET_PROC(TpReleaseCleanupGroupMembers);
NTDLL_GET_PROC(TpReleasePool);
NTDLL_GET_PROC(TpReleaseWork);
+ NTDLL_GET_PROC(TpSetPoolMaxThreads);
NTDLL_GET_PROC(TpSimpleTryPost);
NTDLL_GET_PROC(TpWaitForWork);
@@ -182,6 +184,13 @@ static void CALLBACK work_cb(TP_CALLBACK_INSTANCE *instance, void *userdata, TP_
InterlockedIncrement((LONG *)userdata);
}
+static void CALLBACK work2_cb(TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WORK *work)
+{
+ trace("Running work2 callback\n");
+ Sleep(10);
+ InterlockedExchangeAdd((LONG *)userdata, 0x10000);
+}
+
static void test_tp_work(void)
{
TP_CALLBACK_ENVIRON environment;
@@ -225,6 +234,79 @@ static void test_tp_work(void)
pTpReleasePool(pool);
}
+static void test_tp_work_scheduler(void)
+{
+ TP_CALLBACK_ENVIRON environment;
+ TP_CLEANUP_GROUP *group;
+ TP_WORK *work, *work2;
+ TP_POOL *pool;
+ NTSTATUS status;
+ LONG userdata;
+ int i;
+
+ /* allocate new threadpool */
+ pool = NULL;
+ status = pTpAllocPool(&pool, NULL);
+ ok(!status, "TpAllocPool failed with status %x\n", status);
+ ok(pool != NULL, "expected pool != NULL\n");
+
+ /* we limit the pool to a single thread */
+ pTpSetPoolMaxThreads(pool, 1);
+
+ /* create a cleanup group */
+ group = NULL;
+ status = pTpAllocCleanupGroup(&group);
+ ok(!status, "TpAllocCleanupGroup failed with status %x\n", status);
+ ok(group != NULL, "expected pool != NULL\n");
+
+ /* the first work item has no cleanup group associated */
+ work = NULL;
+ memset(&environment, 0, sizeof(environment));
+ environment.Version = 1;
+ environment.Pool = pool;
+ status = pTpAllocWork(&work, work_cb, &userdata, &environment);
+ ok(!status, "TpAllocWork failed with status %x\n", status);
+ ok(work != NULL, "expected work != NULL\n");
+
+ /* allocate a second work item with a cleanup group */
+ work2 = NULL;
+ memset(&environment, 0, sizeof(environment));
+ environment.Version = 1;
+ environment.Pool = pool;
+ environment.CleanupGroup = group;
+ status = pTpAllocWork(&work2, work2_cb, &userdata, &environment);
+ ok(!status, "TpAllocWork failed with status %x\n", status);
+ ok(work2 != NULL, "expected work2 != NULL\n");
+
+ /* the 'work' callbacks are not blocking execution of 'work2' callbacks */
+ userdata = 0;
+ for (i = 0; i < 10; i++)
+ pTpPostWork(work);
+ for (i = 0; i < 10; i++)
+ pTpPostWork(work2);
+ Sleep(30);
+ pTpWaitForWork(work, TRUE);
+ pTpWaitForWork(work2, TRUE);
+ ok(userdata & 0xffff, "expected userdata & 0xffff != 0, got %u\n", userdata & 0xffff);
+ ok(userdata >> 16, "expected userdata >> 16 != 0, got %u\n", userdata >> 16);
+
+ /* test ReleaseCleanupGroupMembers on a work item */
+ userdata = 0;
+ for (i = 0; i < 100; i++)
+ pTpPostWork(work);
+ for (i = 0; i < 10; i++)
+ pTpPostWork(work2);
+ pTpReleaseCleanupGroupMembers(group, FALSE, NULL);
+ pTpWaitForWork(work, TRUE);
+ ok((userdata & 0xffff) < 100, "expected userdata & 0xffff < 100, got %u\n", userdata & 0xffff);
+ ok((userdata >> 16) == 10, "expected userdata >> 16 == 10, got %u\n", userdata >> 16);
+
+ /* cleanup */
+ pTpReleaseWork(work);
+ pTpReleaseCleanupGroup(group);
+ pTpReleasePool(pool);
+}
+
START_TEST(threadpool)
{
if (!init_threadpool())
@@ -232,4 +314,5 @@ START_TEST(threadpool)
test_tp_simple();
test_tp_work();
+ test_tp_work_scheduler();
}
--
2.3.0

View File

@ -0,0 +1,82 @@
From af02399ac80682e6ca5c093150460dca20c7323b Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Wed, 4 Mar 2015 06:57:53 +0100
Subject: ntdll: Add support for threadpool group cancel callback.
---
dlls/ntdll/threadpool2.c | 17 ++++++++++++++---
1 file changed, 14 insertions(+), 3 deletions(-)
diff --git a/dlls/ntdll/threadpool2.c b/dlls/ntdll/threadpool2.c
index 31b5e2e..5233271 100644
--- a/dlls/ntdll/threadpool2.c
+++ b/dlls/ntdll/threadpool2.c
@@ -81,6 +81,7 @@ struct threadpool_object
struct threadpool *pool;
struct threadpool_group *group;
PVOID userdata;
+ PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
/* information about the group, locked via .group->cs */
struct list group_entry;
BOOL is_group_member;
@@ -317,6 +318,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
object->pool = pool;
object->group = NULL;
object->userdata = userdata;
+ object->group_cancel_callback = NULL;
memset( &object->group_entry, 0, sizeof(object->group_entry) );
object->is_group_member = FALSE;
@@ -332,6 +334,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
FIXME("unsupported environment version %u\n", environment->Version);
object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
+ object->group_cancel_callback = environment->CleanupGroupCancelCallback;
WARN("environment not fully implemented yet\n");
}
@@ -460,7 +463,7 @@ out:
RtlLeaveCriticalSection( &pool->cs );
}
-static void tp_object_cancel( struct threadpool_object *object )
+static void tp_object_cancel( struct threadpool_object *object, BOOL group_cancel, PVOID userdata )
{
struct threadpool *pool = object->pool;
LONG pending_callbacks = 0;
@@ -477,6 +480,14 @@ static void tp_object_cancel( struct threadpool_object *object )
RtlLeaveCriticalSection( &pool->cs );
+ /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
+ if (pending_callbacks && group_cancel && object->group_cancel_callback)
+ {
+ TRACE( "executing group cancel callback %p(%p, %p)\n", object->group_cancel_callback, object, userdata );
+ object->group_cancel_callback( object, userdata );
+ TRACE( "callback %p returned\n", object->group_cancel_callback );
+ }
+
/* Release references */
while (pending_callbacks--)
tp_object_release( object );
@@ -627,7 +638,7 @@ static void tp_group_release_members( struct threadpool_group *group, BOOL cance
{
LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
{
- tp_object_cancel( object );
+ tp_object_cancel( object, TRUE, userdata );
}
}
@@ -808,7 +819,7 @@ VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
if (this)
{
if (cancel_pending)
- tp_object_cancel( this );
+ tp_object_cancel( this, FALSE, NULL );
tp_object_wait( this );
}
}
--
2.3.0

View File

@ -0,0 +1,56 @@
From 7fb1d9cfd7dbae474fa0965a143be2408240eeb2 Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Wed, 4 Mar 2015 07:03:15 +0100
Subject: ntdll: Add support for threadpool finalization callback.
---
dlls/ntdll/threadpool2.c | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git a/dlls/ntdll/threadpool2.c b/dlls/ntdll/threadpool2.c
index 5233271..d9e08ac 100644
--- a/dlls/ntdll/threadpool2.c
+++ b/dlls/ntdll/threadpool2.c
@@ -82,6 +82,7 @@ struct threadpool_object
struct threadpool_group *group;
PVOID userdata;
PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
+ PTP_SIMPLE_CALLBACK finalization_callback;
/* information about the group, locked via .group->cs */
struct list group_entry;
BOOL is_group_member;
@@ -281,6 +282,15 @@ static void CALLBACK threadpool_worker_proc( void *param )
break;
}
+ /* Execute finalization callback */
+ if (object->finalization_callback)
+ {
+ TRACE( "executing finalization callback %p(NULL, %p)\n",
+ object->finalization_callback, object->userdata );
+ object->finalization_callback( NULL, object->userdata );
+ TRACE( "callback %p returned\n", object->finalization_callback );
+ }
+
RtlEnterCriticalSection( &pool->cs );
pool->num_busy_workers--;
object->num_running_callbacks--;
@@ -319,6 +329,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
object->group = NULL;
object->userdata = userdata;
object->group_cancel_callback = NULL;
+ object->finalization_callback = NULL;
memset( &object->group_entry, 0, sizeof(object->group_entry) );
object->is_group_member = FALSE;
@@ -335,6 +346,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
object->group_cancel_callback = environment->CleanupGroupCancelCallback;
+ object->finalization_callback = environment->FinalizationCallback;
WARN("environment not fully implemented yet\n");
}
--
2.3.0

View File

@ -0,0 +1,59 @@
From 3080d2d1a5526d61c02efee05d8804611f860976 Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Wed, 4 Mar 2015 07:07:07 +0100
Subject: ntdll: Implement threadpool RaceDll environment variable.
---
dlls/ntdll/threadpool2.c | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git a/dlls/ntdll/threadpool2.c b/dlls/ntdll/threadpool2.c
index d9e08ac..a5203f0 100644
--- a/dlls/ntdll/threadpool2.c
+++ b/dlls/ntdll/threadpool2.c
@@ -83,6 +83,7 @@ struct threadpool_object
PVOID userdata;
PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
PTP_SIMPLE_CALLBACK finalization_callback;
+ HMODULE race_dll;
/* information about the group, locked via .group->cs */
struct list group_entry;
BOOL is_group_member;
@@ -330,6 +331,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
object->userdata = userdata;
object->group_cancel_callback = NULL;
object->finalization_callback = NULL;
+ object->race_dll = NULL;
memset( &object->group_entry, 0, sizeof(object->group_entry) );
object->is_group_member = FALSE;
@@ -347,10 +349,15 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
object->group_cancel_callback = environment->CleanupGroupCancelCallback;
object->finalization_callback = environment->FinalizationCallback;
+ object->race_dll = environment->RaceDll;
WARN("environment not fully implemented yet\n");
}
+ /* Increase dll refcount */
+ if (object->race_dll)
+ LdrAddRefDll( 0, object->race_dll );
+
/* Increase reference-count on the pool */
interlocked_inc( &pool->refcount );
@@ -555,6 +562,10 @@ static BOOL tp_object_release( struct threadpool_object *object )
/* release reference to threadpool */
tp_threadpool_release( object->pool );
+ /* release reference to library */
+ if (object->race_dll)
+ LdrUnloadDll( object->race_dll );
+
RtlFreeHeap( GetProcessHeap(), 0, object );
return TRUE;
}
--
2.3.0

View File

@ -0,0 +1,223 @@
From 750a6fed12e8b667325b93919a05daf6cbc648b4 Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Wed, 4 Mar 2015 07:25:35 +0100
Subject: ntdll: Implement TpCallbackMayRunLong and instance structure.
---
dlls/ntdll/ntdll.spec | 1 +
dlls/ntdll/threadpool2.c | 104 ++++++++++++++++++++++++++++++++++++++++++-----
2 files changed, 95 insertions(+), 10 deletions(-)
diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec
index 1c768c9..38e6fe8 100644
--- a/dlls/ntdll/ntdll.spec
+++ b/dlls/ntdll/ntdll.spec
@@ -970,6 +970,7 @@
@ stdcall TpAllocCleanupGroup(ptr)
@ stdcall TpAllocPool(ptr ptr)
@ stdcall TpAllocWork(ptr ptr ptr ptr)
+@ stdcall TpCallbackMayRunLong(ptr)
@ stdcall TpPostWork(ptr)
@ stdcall TpReleaseCleanupGroup(ptr)
@ stdcall TpReleaseCleanupGroupMembers(ptr long ptr)
diff --git a/dlls/ntdll/threadpool2.c b/dlls/ntdll/threadpool2.c
index a5203f0..90f2ab7 100644
--- a/dlls/ntdll/threadpool2.c
+++ b/dlls/ntdll/threadpool2.c
@@ -83,6 +83,7 @@ struct threadpool_object
PVOID userdata;
PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
PTP_SIMPLE_CALLBACK finalization_callback;
+ BOOL may_run_long;
HMODULE race_dll;
/* information about the group, locked via .group->cs */
struct list group_entry;
@@ -106,6 +107,14 @@ struct threadpool_object
} u;
};
+/* internal threadpool instance representation */
+struct threadpool_instance
+{
+ struct threadpool_object *object;
+ DWORD threadid;
+ BOOL may_run_long;
+};
+
/* internal threadpool group representation */
struct threadpool_group
{
@@ -133,6 +142,11 @@ static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GR
return (struct threadpool_group *)group;
}
+static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE *instance )
+{
+ return (struct threadpool_instance *)instance;
+}
+
static void CALLBACK threadpool_worker_proc( void *param );
static NTSTATUS tp_threadpool_alloc( struct threadpool **out );
static void tp_threadpool_shutdown( struct threadpool *pool );
@@ -140,6 +154,7 @@ static BOOL tp_threadpool_release( struct threadpool *pool );
static void tp_object_submit( struct threadpool_object *object );
static void tp_object_shutdown( struct threadpool_object *object );
static BOOL tp_object_release( struct threadpool_object *object );
+static void tp_instance_initialize( struct threadpool_instance *instance, struct threadpool_object *object );
static BOOL tp_group_release( struct threadpool_group *group );
static struct threadpool *default_threadpool = NULL;
@@ -235,6 +250,8 @@ static BOOL tp_threadpool_release( struct threadpool *pool )
/* threadpool worker function */
static void CALLBACK threadpool_worker_proc( void *param )
{
+ struct threadpool_instance instance;
+ TP_CALLBACK_INSTANCE *cb_instance = (TP_CALLBACK_INSTANCE *)&instance;
struct threadpool *pool = param;
LARGE_INTEGER timeout;
struct list *ptr;
@@ -257,23 +274,24 @@ static void CALLBACK threadpool_worker_proc( void *param )
object->num_running_callbacks++;
pool->num_busy_workers++;
RtlLeaveCriticalSection( &pool->cs );
+ tp_instance_initialize( &instance, object );
switch (object->type)
{
case TP_OBJECT_TYPE_SIMPLE:
{
- TRACE( "executing simple callback %p(NULL, %p)\n",
- object->u.simple.callback, object->userdata );
- object->u.simple.callback( NULL, object->userdata );
+ TRACE( "executing simple callback %p(%p, %p)\n",
+ object->u.simple.callback, cb_instance, object->userdata );
+ object->u.simple.callback( cb_instance, object->userdata );
TRACE( "callback %p returned\n", object->u.simple.callback );
break;
}
case TP_OBJECT_TYPE_WORK:
{
- TRACE( "executing work callback %p(NULL, %p, %p)\n",
- object->u.work.callback, object->userdata, object );
- object->u.work.callback( NULL, object->userdata, (TP_WORK *)object );
+ TRACE( "executing work callback %p(%p, %p, %p)\n",
+ object->u.work.callback, cb_instance, object->userdata, object );
+ object->u.work.callback( cb_instance, object->userdata, (TP_WORK *)object );
TRACE( "callback %p returned\n", object->u.work.callback );
break;
}
@@ -286,9 +304,9 @@ static void CALLBACK threadpool_worker_proc( void *param )
/* Execute finalization callback */
if (object->finalization_callback)
{
- TRACE( "executing finalization callback %p(NULL, %p)\n",
- object->finalization_callback, object->userdata );
- object->finalization_callback( NULL, object->userdata );
+ TRACE( "executing finalization callback %p(%p, %p)\n",
+ object->finalization_callback, cb_instance, object->userdata );
+ object->finalization_callback( cb_instance, object->userdata );
TRACE( "callback %p returned\n", object->finalization_callback );
}
@@ -331,6 +349,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
object->userdata = userdata;
object->group_cancel_callback = NULL;
object->finalization_callback = NULL;
+ object->may_run_long = 0;
object->race_dll = NULL;
memset( &object->group_entry, 0, sizeof(object->group_entry) );
@@ -349,9 +368,14 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
object->group_cancel_callback = environment->CleanupGroupCancelCallback;
object->finalization_callback = environment->FinalizationCallback;
+ object->may_run_long = environment->u.s.LongFunction != 0;
object->race_dll = environment->RaceDll;
- WARN("environment not fully implemented yet\n");
+ if (environment->ActivationContext)
+ FIXME("activation context not supported yet\n");
+
+ if (environment->u.s.Persistent)
+ FIXME("persistent thread support not supported yet\n");
}
/* Increase dll refcount */
@@ -570,6 +594,52 @@ static BOOL tp_object_release( struct threadpool_object *object )
return TRUE;
}
+/* initializes a threadpool instance structure */
+static void tp_instance_initialize( struct threadpool_instance *instance, struct threadpool_object *object )
+{
+ instance->object = object;
+ instance->threadid = GetCurrentThreadId();
+ instance->may_run_long = object->may_run_long;
+}
+
+/* hint for the threadpool that the execution might take long, spawn additional workers */
+static BOOL tp_instance_may_run_long( struct threadpool_instance *instance )
+{
+ struct threadpool_object *object;
+ struct threadpool *pool;
+ NTSTATUS status = STATUS_SUCCESS;
+
+ if (instance->threadid != GetCurrentThreadId())
+ {
+ ERR("called from wrong thread, ignoring\n");
+ return FALSE;
+ }
+
+ if (instance->may_run_long)
+ return TRUE;
+
+ object = instance->object;
+ pool = object->pool;
+ RtlEnterCriticalSection( &pool->cs );
+
+ if (pool->num_busy_workers >= pool->num_workers && pool->num_workers < pool->max_workers)
+ {
+ HANDLE thread;
+ status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
+ threadpool_worker_proc, pool, &thread, NULL );
+ if (status == STATUS_SUCCESS)
+ {
+ interlocked_inc( &pool->refcount );
+ pool->num_workers++;
+ NtClose( thread );
+ }
+ }
+
+ RtlLeaveCriticalSection( &pool->cs );
+ instance->may_run_long = TRUE;
+ return !status;
+}
+
/* allocates a new cleanup group */
static NTSTATUS tp_group_alloc( struct threadpool_group **out )
{
@@ -715,6 +785,20 @@ NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID us
}
/***********************************************************************
+ * TpCallbackMayRunLong (NTDLL.@)
+ */
+NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
+{
+ struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
+ TRACE("%p\n", instance);
+
+ if (!this)
+ return STATUS_ACCESS_VIOLATION;
+
+ return tp_instance_may_run_long( this );
+}
+
+/***********************************************************************
* TpPostWork (NTDLL.@)
*/
VOID WINAPI TpPostWork( TP_WORK *work )
--
2.3.0

View File

@ -0,0 +1,112 @@
From c3f8e7288538d526ece6b07ff69790e6ff7b1d9d Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Wed, 4 Mar 2015 07:31:27 +0100
Subject: ntdll: Implement TpDisassociateCallback.
---
dlls/ntdll/ntdll.spec | 1 +
dlls/ntdll/threadpool2.c | 52 +++++++++++++++++++++++++++++++++++++++++++++---
2 files changed, 50 insertions(+), 3 deletions(-)
diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec
index 38e6fe8..aaf1c64 100644
--- a/dlls/ntdll/ntdll.spec
+++ b/dlls/ntdll/ntdll.spec
@@ -971,6 +971,7 @@
@ stdcall TpAllocPool(ptr ptr)
@ stdcall TpAllocWork(ptr ptr ptr ptr)
@ stdcall TpCallbackMayRunLong(ptr)
+@ stdcall TpDisassociateCallback(ptr)
@ stdcall TpPostWork(ptr)
@ stdcall TpReleaseCleanupGroup(ptr)
@ stdcall TpReleaseCleanupGroupMembers(ptr long ptr)
diff --git a/dlls/ntdll/threadpool2.c b/dlls/ntdll/threadpool2.c
index 90f2ab7..c40c06a 100644
--- a/dlls/ntdll/threadpool2.c
+++ b/dlls/ntdll/threadpool2.c
@@ -112,6 +112,7 @@ struct threadpool_instance
{
struct threadpool_object *object;
DWORD threadid;
+ BOOL disassociated;
BOOL may_run_long;
};
@@ -312,9 +313,12 @@ static void CALLBACK threadpool_worker_proc( void *param )
RtlEnterCriticalSection( &pool->cs );
pool->num_busy_workers--;
- object->num_running_callbacks--;
- if (!object->num_pending_callbacks && !object->num_running_callbacks)
- RtlWakeAllConditionVariable( &object->finished_event );
+ if (!instance.disassociated)
+ {
+ object->num_running_callbacks--;
+ if (!object->num_pending_callbacks && !object->num_running_callbacks)
+ RtlWakeAllConditionVariable( &object->finished_event );
+ }
tp_object_release( object );
}
@@ -599,9 +603,37 @@ static void tp_instance_initialize( struct threadpool_instance *instance, struct
{
instance->object = object;
instance->threadid = GetCurrentThreadId();
+ instance->disassociated = FALSE;
instance->may_run_long = object->may_run_long;
}
+/* disassociates the current thread from the threadpool object */
+static void tp_instance_disassociate_thread( struct threadpool_instance *instance )
+{
+ struct threadpool_object *object;
+ struct threadpool *pool;
+
+ if (instance->threadid != GetCurrentThreadId())
+ {
+ ERR("called from wrong thread, ignoring\n");
+ return;
+ }
+
+ if (instance->disassociated)
+ return;
+
+ object = instance->object;
+ pool = object->pool;
+ RtlEnterCriticalSection( &pool->cs );
+
+ object->num_running_callbacks--;
+ if (!object->num_pending_callbacks && !object->num_running_callbacks)
+ RtlWakeAllConditionVariable( &object->finished_event );
+
+ RtlLeaveCriticalSection( &pool->cs );
+ instance->disassociated = TRUE;
+}
+
/* hint for the threadpool that the execution might take long, spawn additional workers */
static BOOL tp_instance_may_run_long( struct threadpool_instance *instance )
{
@@ -799,6 +831,20 @@ NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
}
/***********************************************************************
+ * TpDisassociateCallback (NTDLL.@)
+ */
+VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
+{
+ struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
+ TRACE("%p\n", instance);
+
+ if (this)
+ {
+ tp_instance_disassociate_thread( this );
+ }
+}
+
+/***********************************************************************
* TpPostWork (NTDLL.@)
*/
VOID WINAPI TpPostWork( TP_WORK *work )
--
2.3.0

View File

@ -0,0 +1,230 @@
From 6229d9139d7b30f00e78c6f1f8410c1c99712295 Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Wed, 4 Mar 2015 07:40:02 +0100
Subject: ntdll: Implement various TpCallback*OnCompletion functions.
---
dlls/ntdll/ntdll.spec | 5 ++
dlls/ntdll/threadpool2.c | 154 +++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 159 insertions(+)
diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec
index aaf1c64..0c5ad39 100644
--- a/dlls/ntdll/ntdll.spec
+++ b/dlls/ntdll/ntdll.spec
@@ -970,7 +970,12 @@
@ stdcall TpAllocCleanupGroup(ptr)
@ stdcall TpAllocPool(ptr ptr)
@ stdcall TpAllocWork(ptr ptr ptr ptr)
+@ stdcall TpCallbackLeaveCriticalSectionOnCompletion(ptr ptr)
@ stdcall TpCallbackMayRunLong(ptr)
+@ stdcall TpCallbackReleaseMutexOnCompletion(ptr long)
+@ stdcall TpCallbackReleaseSemaphoreOnCompletion(ptr long long)
+@ stdcall TpCallbackSetEventOnCompletion(ptr long)
+@ stdcall TpCallbackUnloadDllOnCompletion(ptr long)
@ stdcall TpDisassociateCallback(ptr)
@ stdcall TpPostWork(ptr)
@ stdcall TpReleaseCleanupGroup(ptr)
diff --git a/dlls/ntdll/threadpool2.c b/dlls/ntdll/threadpool2.c
index c40c06a..ebf8b94 100644
--- a/dlls/ntdll/threadpool2.c
+++ b/dlls/ntdll/threadpool2.c
@@ -114,6 +114,15 @@ struct threadpool_instance
DWORD threadid;
BOOL disassociated;
BOOL may_run_long;
+ struct
+ {
+ CRITICAL_SECTION *critical_section;
+ HANDLE mutex;
+ HANDLE semaphore;
+ LONG semaphore_count;
+ HANDLE event;
+ HMODULE library;
+ } cleanup;
};
/* internal threadpool group representation */
@@ -156,6 +165,7 @@ static void tp_object_submit( struct threadpool_object *object );
static void tp_object_shutdown( struct threadpool_object *object );
static BOOL tp_object_release( struct threadpool_object *object );
static void tp_instance_initialize( struct threadpool_instance *instance, struct threadpool_object *object );
+static NTSTATUS tp_instance_cleanup( struct threadpool_instance *instance );
static BOOL tp_group_release( struct threadpool_group *group );
static struct threadpool *default_threadpool = NULL;
@@ -311,6 +321,7 @@ static void CALLBACK threadpool_worker_proc( void *param )
TRACE( "callback %p returned\n", object->finalization_callback );
}
+ tp_instance_cleanup( &instance );
RtlEnterCriticalSection( &pool->cs );
pool->num_busy_workers--;
if (!instance.disassociated)
@@ -605,6 +616,48 @@ static void tp_instance_initialize( struct threadpool_instance *instance, struct
instance->threadid = GetCurrentThreadId();
instance->disassociated = FALSE;
instance->may_run_long = object->may_run_long;
+ instance->cleanup.critical_section = NULL;
+ instance->cleanup.mutex = NULL;
+ instance->cleanup.semaphore = NULL;
+ instance->cleanup.semaphore_count = 0;
+ instance->cleanup.event = NULL;
+ instance->cleanup.library = NULL;
+}
+
+static NTSTATUS tp_instance_cleanup( struct threadpool_instance *instance )
+{
+ NTSTATUS status;
+
+ if (instance->cleanup.critical_section)
+ {
+ RtlLeaveCriticalSection( instance->cleanup.critical_section );
+ }
+ if (instance->cleanup.mutex)
+ {
+ status = NtReleaseMutant( instance->cleanup.mutex, NULL );
+ if (status != STATUS_SUCCESS)
+ return status;
+ }
+ if (instance->cleanup.semaphore)
+ {
+ status = NtReleaseSemaphore( instance->cleanup.semaphore, instance->cleanup.semaphore_count, NULL );
+ if (status != STATUS_SUCCESS)
+ return status;
+ }
+ if (instance->cleanup.event)
+ {
+ status = NtSetEvent( instance->cleanup.event, NULL );
+ if (status != STATUS_SUCCESS)
+ return status;
+ }
+ if (instance->cleanup.library)
+ {
+ status = LdrUnloadDll( instance->cleanup.library );
+ if (status != STATUS_SUCCESS)
+ return status;
+ }
+
+ return STATUS_SUCCESS;
}
/* disassociates the current thread from the threadpool object */
@@ -817,6 +870,26 @@ NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID us
}
/***********************************************************************
+ * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
+ */
+VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit )
+{
+ struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
+ TRACE("%p %p\n", instance, crit);
+
+ if (!this)
+ return;
+
+ if (this->cleanup.critical_section)
+ {
+ FIXME("attempt to set multiple cleanup critical sections\n");
+ return;
+ }
+
+ this->cleanup.critical_section = crit;
+}
+
+/***********************************************************************
* TpCallbackMayRunLong (NTDLL.@)
*/
NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
@@ -831,6 +904,87 @@ NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
}
/***********************************************************************
+ * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
+ */
+VOID WINAPI TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE mutex )
+{
+ struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
+ TRACE("%p %p\n", instance, mutex);
+
+ if (!this)
+ return;
+
+ if (this->cleanup.mutex)
+ {
+ FIXME("attempt to set multiple cleanup mutexes\n");
+ return;
+ }
+
+ this->cleanup.mutex = mutex;
+}
+
+/***********************************************************************
+ * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
+ */
+VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count )
+{
+ struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
+ TRACE("%p %p %u\n", instance, semaphore, count);
+
+ if (!this)
+ return;
+
+ if (this->cleanup.semaphore)
+ {
+ FIXME("attempt to set multiple cleanup semaphores\n");
+ return;
+ }
+
+ this->cleanup.semaphore = semaphore;
+ this->cleanup.semaphore_count = count;
+}
+
+/***********************************************************************
+ * TpCallbackSetEventOnCompletion (NTDLL.@)
+ */
+VOID WINAPI TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE event )
+{
+ struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
+ TRACE("%p %p\n", instance, event);
+
+ if (!this)
+ return;
+
+ if (this->cleanup.event)
+ {
+ FIXME("attempt to set multiple cleanup events\n");
+ return;
+ }
+
+ this->cleanup.event = event;
+}
+
+/***********************************************************************
+ * TpCallbackUnloadDllOnCompletion (NTDLL.@)
+ */
+VOID WINAPI TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE *instance, HMODULE module )
+{
+ struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
+ TRACE("%p %p\n", instance, module);
+
+ if (!this)
+ return;
+
+ if (this->cleanup.library)
+ {
+ FIXME("attempt to set multiple cleanup libraries\n");
+ return;
+ }
+
+ this->cleanup.library = module;
+}
+
+/***********************************************************************
* TpDisassociateCallback (NTDLL.@)
*/
VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
--
2.3.0

View File

@ -1,63 +1,65 @@
From 31df48db7d885ca5679ccc010d954d5df7fba288 Mon Sep 17 00:00:00 2001
From 0fbfe7ef91e7c0614cf45b75211ab70b500029db Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Sat, 31 Jan 2015 02:26:17 +0100
Subject: ntdll: Add threadpool stub functions to specfile.
Date: Wed, 4 Mar 2015 08:01:00 +0100
Subject: ntdll: Add remaining threadpool functions to specfile.
---
dlls/ntdll/ntdll.spec | 62 +++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 62 insertions(+)
dlls/ntdll/ntdll.spec | 43 +++++++++++++++++++++++++++++++++++++++++++
1 file changed, 43 insertions(+)
diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec
index 51de6e7..771f669 100644
index 0c5ad39..970bdeb 100644
--- a/dlls/ntdll/ntdll.spec
+++ b/dlls/ntdll/ntdll.spec
@@ -969,2 +969,64 @@
@@ -967,24 +967,67 @@
@ stdcall RtlxOemStringToUnicodeSize(ptr) RtlOemStringToUnicodeSize
@ stdcall RtlxUnicodeStringToAnsiSize(ptr) RtlUnicodeStringToAnsiSize
@ stdcall RtlxUnicodeStringToOemSize(ptr) RtlUnicodeStringToOemSize
+# @ stub TpAllocAlpcCompletion
+# @ stub TpAllocAlpcCompletionEx
+# @ stub TpAllocCleanupGroup
@ stdcall TpAllocCleanupGroup(ptr)
+# @ stub TpAllocIoCompletion
+# @ stub TpAllocJobNotification
+# @ stub TpAllocPool
@ stdcall TpAllocPool(ptr ptr)
+# @ stub TpAllocTimer
+# @ stub TpAllocWait
+# @ stub TpAllocWork
@ stdcall TpAllocWork(ptr ptr ptr ptr)
+# @ stub TpAlpcRegisterCompletionList
+# @ stub TpAlpcUnregisterCompletionList
+# @ stub TpCallbackDetectedUnrecoverableError
+# @ stub TpCallbackIndependent
+# @ stub TpCallbackLeaveCriticalSectionOnCompletion
+# @ stub TpCallbackMayRunLong
+# @ stub TpCallbackReleaseMutexOnCompletion
+# @ stub TpCallbackReleaseSemaphoreOnCompletion
@ stdcall TpCallbackLeaveCriticalSectionOnCompletion(ptr ptr)
@ stdcall TpCallbackMayRunLong(ptr)
@ stdcall TpCallbackReleaseMutexOnCompletion(ptr long)
@ stdcall TpCallbackReleaseSemaphoreOnCompletion(ptr long long)
+# @ stub TpCallbackSendAlpcMessageOnCompletion
+# @ stub TpCallbackSendPendingAlpcMessage
+# @ stub TpCallbackSetEventOnCompletion
+# @ stub TpCallbackUnloadDllOnCompletion
@ stdcall TpCallbackSetEventOnCompletion(ptr long)
@ stdcall TpCallbackUnloadDllOnCompletion(ptr long)
+# @ stub TpCancelAsyncIoOperation
+# @ stub TpCaptureCaller
+# @ stub TpCheckTerminateWorker
+# @ stub TpDbgDumpHeapUsage
+# @ stub TpDbgSetLogRoutine
+# @ stub TpDisablePoolCallbackChecks
+# @ stub TpDisassociateCallback
@ stdcall TpDisassociateCallback(ptr)
+# @ stub TpIsTimerSet
+# @ stub TpPostWork
@ stdcall TpPostWork(ptr)
+# @ stub TpQueryPoolStackInformation
+# @ stub TpReleaseAlpcCompletion
+# @ stub TpReleaseCleanupGroup
+# @ stub TpReleaseCleanupGroupMembers
@ stdcall TpReleaseCleanupGroup(ptr)
@ stdcall TpReleaseCleanupGroupMembers(ptr long ptr)
+# @ stub TpReleaseIoCompletion
+# @ stub TpReleaseJobNotification
+# @ stub TpReleasePool
@ stdcall TpReleasePool(ptr)
+# @ stub TpReleaseTimer
+# @ stub TpReleaseWait
+# @ stub TpReleaseWork
@ stdcall TpReleaseWork(ptr)
+# @ stub TpSetDefaultPoolMaxThreads
+# @ stub TpSetDefaultPoolStackInformation
+# @ stub TpSetPoolMaxThreads
@ stdcall TpSetPoolMaxThreads(ptr long)
+# @ stub TpSetPoolMaxThreadsSoftLimit
+# @ stub TpSetPoolMinThreads
@ stdcall TpSetPoolMinThreads(ptr long)
+# @ stub TpSetPoolStackInformation
+# @ stub TpSetPoolThreadBasePriority
+# @ stub TpSetPoolWorkerThreadIdleTimeout
@ -65,7 +67,7 @@ index 51de6e7..771f669 100644
+# @ stub TpSetTimerEx
+# @ stub TpSetWait
+# @ stub TpSetWaitEx
+# @ stub TpSimpleTryPost
@ stdcall TpSimpleTryPost(ptr ptr ptr)
+# @ stub TpStartAsyncIoOperation
+# @ stub TpTimerOutstandingCallbackCount
+# @ stub TpTrimPools
@ -74,8 +76,9 @@ index 51de6e7..771f669 100644
+# @ stub TpWaitForJobNotification
+# @ stub TpWaitForTimer
+# @ stub TpWaitForWait
+# @ stub TpWaitForWork
@ stdcall TpWaitForWork(ptr long)
@ stdcall -ret64 VerSetConditionMask(int64 long long)
@ stdcall ZwAcceptConnectPort(ptr long ptr long long ptr) NtAcceptConnectPort
--
2.2.2
2.3.0

View File

@ -1,15 +1,15 @@
From e494481628d87eab64df7e8cb04047d2330c2847 Mon Sep 17 00:00:00 2001
From badc2951cd6764ce4115097874dadcdfa084354e Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Sun, 1 Feb 2015 18:21:28 +0100
Date: Wed, 4 Mar 2015 08:19:47 +0100
Subject: ntdll: Implement threadpool timer functions. (rev 2)
---
dlls/ntdll/ntdll.spec | 10 +-
dlls/ntdll/threadpool2.c | 399 ++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 403 insertions(+), 6 deletions(-)
dlls/ntdll/threadpool2.c | 390 ++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 394 insertions(+), 6 deletions(-)
diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec
index bf9e795..f4328a9 100644
index 970bdeb..24b4c30 100644
--- a/dlls/ntdll/ntdll.spec
+++ b/dlls/ntdll/ntdll.spec
@@ -973,7 +973,7 @@
@ -58,58 +58,52 @@ index bf9e795..f4328a9 100644
@ stdcall TpWaitForWork(ptr long)
@ stdcall -ret64 VerSetConditionMask(int64 long long)
diff --git a/dlls/ntdll/threadpool2.c b/dlls/ntdll/threadpool2.c
index a6fd141..19096a0 100644
index ebf8b94..a1089a4 100644
--- a/dlls/ntdll/threadpool2.c
+++ b/dlls/ntdll/threadpool2.c
@@ -124,7 +124,8 @@ struct threadpool_object
{
TP_OBJECT_TYPE_UNDEFINED,
TP_OBJECT_TYPE_SIMPLE,
- TP_OBJECT_TYPE_WORK
+ TP_OBJECT_TYPE_WORK,
+ TP_OBJECT_TYPE_TIMER
} type;
@@ -68,7 +68,8 @@ struct threadpool
enum threadpool_objtype
{
TP_OBJECT_TYPE_SIMPLE,
- TP_OBJECT_TYPE_WORK
+ TP_OBJECT_TYPE_WORK,
+ TP_OBJECT_TYPE_TIMER
};
/* arguments for callback */
@@ -140,6 +141,21 @@ struct threadpool_object
/* internal threadpool object representation */
@@ -104,6 +105,18 @@ struct threadpool_object
{
PTP_WORK_CALLBACK callback;
} work;
+ /* timer callback */
+ struct
+ {
+ PTP_TIMER_CALLBACK callback;
+
+ /* information about the timer, locked via timerqueue.cs */
+ BOOL timer_initialized;
+ BOOL timer_pending;
+ struct list timer_entry;
+
+ BOOL timer_set;
+ ULONGLONG timeout;
+ LONG period;
+ LONG window_length;
+ BOOL timer_initialized;
+ BOOL timer_pending;
+ struct list timer_entry;
+ BOOL timer_set;
+ ULONGLONG timeout;
+ LONG period;
+ LONG window_length;
+ } timer;
} u;
};
@@ -154,6 +170,35 @@ struct threadpool_group
struct list members;
@@ -135,6 +148,33 @@ struct threadpool_group
struct list members;
};
+/* global timerqueue object */
+static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
+
+static struct
+{
+ CRITICAL_SECTION cs;
+
+ /* number of timer objects total */
+ BOOL thread_running;
+ LONG num_timers;
+
+ /* list of pending timers */
+ struct list pending_timers;
+ RTL_CONDITION_VARIABLE update_event;
+ CRITICAL_SECTION cs;
+ BOOL thread_running;
+ LONG num_timers;
+ struct list pending_timers;
+ RTL_CONDITION_VARIABLE update_event;
+}
+timerqueue =
+{
@ -119,6 +113,7 @@ index a6fd141..19096a0 100644
+ LIST_INIT( timerqueue.pending_timers ),
+ RTL_CONDITION_VARIABLE_INIT
+};
+
+static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
+{
+ 0, 0, &timerqueue.cs,
@ -129,7 +124,7 @@ index a6fd141..19096a0 100644
static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
{
return (struct threadpool *)pool;
@@ -166,6 +211,13 @@ static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
@@ -147,6 +187,13 @@ static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
return object;
}
@ -143,40 +138,30 @@ index a6fd141..19096a0 100644
static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
{
return (struct threadpool_group *)group;
@@ -177,6 +229,7 @@ static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CAL
@@ -158,6 +205,7 @@ static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CAL
}
static void CALLBACK threadpool_worker_proc( void *param );
+static void CALLBACK timerqueue_thread_proc( void *param );
static NTSTATUS tp_threadpool_alloc( struct threadpool **out );
static void tp_threadpool_shutdown( struct threadpool *pool );
static BOOL tp_threadpool_release( struct threadpool *pool );
@@ -189,6 +242,243 @@ static void tp_object_shutdown( struct threadpool_object *object );
@@ -168,6 +216,230 @@ static void tp_instance_initialize( struct threadpool_instance *instance, struct
static NTSTATUS tp_instance_cleanup( struct threadpool_instance *instance );
static BOOL tp_group_release( struct threadpool_group *group );
/***********************************************************************
+ * TIMERQUEUE IMPLEMENTATION
+ ***********************************************************************
+ *
+ * Based on [1] there is only one (persistent) thread which handles
+ * timer events. There is a similar implementation in ntdll/
+ * threadpool.c, but its not directly possible to merge them because of
+ * specific implementation differences, like handling several events at
+ * once using a windowlength parameter. */
+
+static NTSTATUS tp_timerqueue_acquire( struct threadpool_object *timer )
+{
+ NTSTATUS status = STATUS_SUCCESS;
+ assert( timer->type == TP_OBJECT_TYPE_TIMER );
+
+ timer->u.timer.timer_initialized = TRUE;
+ timer->u.timer.timer_pending = FALSE;
+ timer->u.timer.timer_initialized = TRUE;
+ timer->u.timer.timer_pending = FALSE;
+ memset( &timer->u.timer.timer_entry, 0, sizeof(timer->u.timer.timer_entry) );
+
+ timer->u.timer.timer_set = FALSE;
+ timer->u.timer.timeout = 0;
+ timer->u.timer.period = 0;
+ timer->u.timer.window_length = 0;
+ timer->u.timer.timer_set = FALSE;
+ timer->u.timer.timeout = 0;
+ timer->u.timer.period = 0;
+ timer->u.timer.window_length = 0;
+
+ RtlEnterCriticalSection( &timerqueue.cs );
+
@ -201,9 +186,9 @@ index a6fd141..19096a0 100644
+{
+ assert( timer->type == TP_OBJECT_TYPE_TIMER );
+ RtlEnterCriticalSection( &timerqueue.cs );
+
+ if (timer->u.timer.timer_initialized)
+ {
+
+ if (timer->u.timer.timer_pending)
+ {
+ list_remove( &timer->u.timer.timer_entry );
@ -218,7 +203,6 @@ index a6fd141..19096a0 100644
+
+ timer->u.timer.timer_initialized = FALSE;
+ }
+
+ RtlLeaveCriticalSection( &timerqueue.cs );
+}
+
@ -297,7 +281,7 @@ index a6fd141..19096a0 100644
+ RtlLeaveCriticalSection( &timerqueue.cs );
+
+ if (submit_timer)
+ tp_object_submit( new_timer );
+ tp_object_submit( new_timer );
+}
+
+static void CALLBACK timerqueue_thread_proc( void *param )
@ -390,19 +374,16 @@ index a6fd141..19096a0 100644
+ RtlLeaveCriticalSection( &timerqueue.cs );
+}
+
+
+/***********************************************************************
* THREADPOOL INSTANCE IMPLEMENTATION
***********************************************************************/
static struct threadpool *default_threadpool = NULL;
@@ -446,6 +736,16 @@ static void CALLBACK threadpool_worker_proc( void *param )
/* allocates or returns the default threadpool */
@@ -307,6 +579,15 @@ static void CALLBACK threadpool_worker_proc( void *param )
break;
}
+ case TP_OBJECT_TYPE_TIMER:
+ {
+ TP_CALLBACK_INSTANCE *cb_instance = (TP_CALLBACK_INSTANCE *)&instance;
+ TRACE( "executing callback %p(%p, %p, %p)\n",
+ TRACE( "executing timer callback %p(%p, %p, %p)\n",
+ object->u.timer.callback, cb_instance, object->userdata, object );
+ object->u.timer.callback( cb_instance, object->userdata, (TP_TIMER *)object );
+ TRACE( "callback %p returned\n", object->u.timer.callback );
@ -410,9 +391,9 @@ index a6fd141..19096a0 100644
+ }
+
default:
FIXME( "callback type %u not implemented\n", object->type );
assert(0);
break;
@@ -624,6 +924,38 @@ static NTSTATUS tp_object_alloc_work( struct threadpool_object **out, PTP_WORK_C
@@ -482,6 +763,43 @@ static NTSTATUS tp_object_alloc_work( struct threadpool_object **out, PTP_WORK_C
return STATUS_SUCCESS;
}
@ -420,13 +401,18 @@ index a6fd141..19096a0 100644
+ PVOID userdata, TP_CALLBACK_ENVIRON *environment )
+{
+ struct threadpool_object *object;
+ struct threadpool *pool;
+ struct threadpool *pool = NULL;
+ NTSTATUS status;
+
+ /* determine threadpool */
+ pool = environment ? (struct threadpool *)environment->Pool : NULL;
+ if (!pool) pool = get_default_threadpool();
+ if (!pool) return STATUS_NO_MEMORY;
+ if (environment)
+ pool = (struct threadpool *)environment->Pool;
+
+ if (!pool)
+ {
+ pool = get_default_threadpool();
+ if (!pool)
+ return STATUS_NO_MEMORY;
+ }
+
+ object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
+ if (!object)
@ -442,29 +428,26 @@ index a6fd141..19096a0 100644
+ return status;
+ }
+
+ tp_object_initialize( object, pool, userdata, environment, FALSE );
+ tp_object_initialize( object, pool, userdata, environment );
+
+ *out = object;
+ return STATUS_SUCCESS;
+}
+
static BOOL tp_object_release( struct threadpool_object *object )
/* submits an object to a threadpool */
static void tp_object_submit( struct threadpool_object *object )
{
struct threadpool_group *group;
@@ -659,6 +991,12 @@ static BOOL tp_object_release( struct threadpool_object *object )
@@ -567,6 +885,9 @@ static void tp_object_wait( struct threadpool_object *object )
/* mark an object as 'shutdown', submitting is no longer possible */
static void tp_object_shutdown( struct threadpool_object *object )
{
+ if (object->type == TP_OBJECT_TYPE_TIMER)
+ {
+ /* release reference on the timerqueue */
+ tp_timerqueue_release( object );
+ }
+
object->shutdown = TRUE;
}
@@ -862,6 +1200,16 @@ NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
@@ -858,6 +1179,18 @@ NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
}
/***********************************************************************
@ -474,14 +457,16 @@ index a6fd141..19096a0 100644
+ TP_CALLBACK_ENVIRON *environment )
+{
+ TRACE("%p %p %p %p\n", out, callback, userdata, environment);
+ return tp_object_alloc_timer( (struct threadpool_object **)out, callback, userdata, environment );
+
+ return tp_object_alloc_timer( (struct threadpool_object **)out, callback,
+ userdata, environment );
+}
+
+/***********************************************************************
* TpAllocWork (NTDLL.@)
*/
NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
@@ -966,6 +1314,16 @@ VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
@@ -999,6 +1332,17 @@ VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
}
/***********************************************************************
@ -491,6 +476,7 @@ index a6fd141..19096a0 100644
+{
+ struct threadpool_object *this = impl_from_TP_TIMER( timer );
+ TRACE("%p\n", timer);
+
+ return this ? this->u.timer.timer_set : FALSE;
+}
+
@ -498,7 +484,7 @@ index a6fd141..19096a0 100644
* TpPostWork (NTDLL.@)
*/
VOID WINAPI TpPostWork( TP_WORK *work )
@@ -1014,6 +1372,20 @@ VOID WINAPI TpReleasePool( TP_POOL *pool )
@@ -1057,6 +1401,21 @@ VOID WINAPI TpReleasePool( TP_POOL *pool )
}
/***********************************************************************
@ -508,6 +494,7 @@ index a6fd141..19096a0 100644
+{
+ struct threadpool_object *this = impl_from_TP_TIMER( timer );
+ TRACE("%p\n", timer);
+
+ if (this)
+ {
+ tp_object_shutdown( this );
@ -519,7 +506,7 @@ index a6fd141..19096a0 100644
* TpReleaseWork (NTDLL.@)
*/
VOID WINAPI TpReleaseWork( TP_WORK *work )
@@ -1049,6 +1421,16 @@ BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
@@ -1105,6 +1464,20 @@ BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
}
/***********************************************************************
@ -529,14 +516,18 @@ index a6fd141..19096a0 100644
+{
+ struct threadpool_object *this = impl_from_TP_TIMER( timer );
+ TRACE("%p %p %u %u\n", timer, timeout, period, window_length);
+ if (this) tp_timerqueue_update_timer( this, timeout, period, window_length );
+
+ if (this)
+ {
+ tp_timerqueue_update_timer( this, timeout, period, window_length );
+ }
+}
+
+/***********************************************************************
* TpSimpleTryPost (NTDLL.@)
*/
NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment )
@@ -1058,6 +1440,21 @@ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata, T
NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
@@ -1116,6 +1489,21 @@ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
}
/***********************************************************************
@ -559,5 +550,5 @@ index a6fd141..19096a0 100644
*/
VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
--
2.2.2
2.3.0

View File

@ -1,26 +1,25 @@
From 12070adf96edad008609db450efcc2e01b052f87 Mon Sep 17 00:00:00 2001
From 63d215c8d93f67821f4defe1dcb61f7702639c1a Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Sun, 1 Feb 2015 18:14:09 +0100
Date: Wed, 4 Mar 2015 13:17:04 +0100
Subject: ntdll/tests: Add tests for Tp* threadpool functions.
---
dlls/ntdll/tests/threadpool.c | 664 +++++++++++++++++++++++++++++++++++++++++-
1 file changed, 663 insertions(+), 1 deletion(-)
dlls/ntdll/tests/threadpool.c | 470 ++++++++++++++++++++++++++++++++++++++++++
1 file changed, 470 insertions(+)
diff --git a/dlls/ntdll/tests/threadpool.c b/dlls/ntdll/tests/threadpool.c
index 2e31b34..d27627e 100644
index 1cfd67b..b44b0f5 100644
--- a/dlls/ntdll/tests/threadpool.c
+++ b/dlls/ntdll/tests/threadpool.c
@@ -21,9 +21,39 @@
#include "ntdll_test.h"
@@ -22,15 +22,37 @@
static HMODULE hntdll = 0;
+static NTSTATUS (WINAPI *pTpAllocCleanupGroup)(TP_CLEANUP_GROUP **);
static NTSTATUS (WINAPI *pTpAllocCleanupGroup)(TP_CLEANUP_GROUP **);
+static NTSTATUS (WINAPI *pTpAllocIoCompletion)(TP_IO **,HANDLE,PTP_WIN32_IO_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
static NTSTATUS (WINAPI *pTpAllocPool)(TP_POOL **,PVOID);
+static NTSTATUS (WINAPI *pTpAllocTimer)(TP_TIMER **,PTP_TIMER_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
+static NTSTATUS (WINAPI *pTpAllocWait)(TP_WAIT **,PTP_WAIT_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
+static NTSTATUS (WINAPI *pTpAllocWork)(TP_WORK **,PTP_WORK_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
static NTSTATUS (WINAPI *pTpAllocWork)(TP_WORK **,PTP_WORK_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
+static VOID (WINAPI *pTpCallbackLeaveCriticalSectionOnCompletion)(TP_CALLBACK_INSTANCE *,CRITICAL_SECTION *);
+static NTSTATUS (WINAPI *pTpCallbackMayRunLong)(TP_CALLBACK_INSTANCE *);
+static VOID (WINAPI *pTpCallbackReleaseMutexOnCompletion)(TP_CALLBACK_INSTANCE *,HANDLE);
@ -30,15 +29,15 @@ index 2e31b34..d27627e 100644
+static VOID (WINAPI *pTpCancelAsyncIoOperation)(TP_IO *);
+static VOID (WINAPI *pTpDisassociateCallback)(TP_CALLBACK_INSTANCE *);
+static BOOL (WINAPI *pTpIsTimerSet)(TP_TIMER *);
+static VOID (WINAPI *pTpPostWork)(TP_WORK *);
+static VOID (WINAPI *pTpReleaseCleanupGroup)(TP_CLEANUP_GROUP *);
+static VOID (WINAPI *pTpReleaseCleanupGroupMembers)(TP_CLEANUP_GROUP *,BOOL,PVOID);
static VOID (WINAPI *pTpPostWork)(TP_WORK *);
static VOID (WINAPI *pTpReleaseCleanupGroup)(TP_CLEANUP_GROUP *);
static VOID (WINAPI *pTpReleaseCleanupGroupMembers)(TP_CLEANUP_GROUP *,BOOL,PVOID);
+static VOID (WINAPI *pTpReleaseIoCompletion)(TP_IO *);
static VOID (WINAPI *pTpReleasePool)(TP_POOL *);
+static VOID (WINAPI *pTpReleaseTimer)(TP_TIMER *);
+static VOID (WINAPI *pTpReleaseWait)(TP_WAIT *);
+static VOID (WINAPI *pTpReleaseWork)(TP_WORK *);
+static VOID (WINAPI *pTpSetPoolMaxThreads)(TP_POOL *,DWORD);
static VOID (WINAPI *pTpReleaseWork)(TP_WORK *);
static VOID (WINAPI *pTpSetPoolMaxThreads)(TP_POOL *,DWORD);
+static BOOL (WINAPI *pTpSetPoolMinThreads)(TP_POOL *,DWORD);
+static VOID (WINAPI *pTpSetTimer)(TP_TIMER *,LARGE_INTEGER *,LONG,LONG);
+static VOID (WINAPI *pTpSetWait)(TP_WAIT *,HANDLE,LARGE_INTEGER *);
@ -47,20 +46,18 @@ index 2e31b34..d27627e 100644
+static VOID (WINAPI *pTpWaitForIoCompletion)(TP_IO *,BOOL);
+static VOID (WINAPI *pTpWaitForTimer)(TP_TIMER *,BOOL);
+static VOID (WINAPI *pTpWaitForWait)(TP_WAIT *,BOOL);
+static VOID (WINAPI *pTpWaitForWork)(TP_WORK *,BOOL);
static VOID (WINAPI *pTpWaitForWork)(TP_WORK *,BOOL);
#define NTDLL_GET_PROC(func) \
do \
@@ -42,13 +72,43 @@ static BOOL init_threadpool(void)
return FALSE;
@@ -51,15 +73,37 @@ static BOOL init_threadpool(void)
}
+ NTDLL_GET_PROC(TpAllocCleanupGroup);
NTDLL_GET_PROC(TpAllocCleanupGroup);
+ NTDLL_GET_PROC(TpAllocIoCompletion);
NTDLL_GET_PROC(TpAllocPool);
+ NTDLL_GET_PROC(TpAllocTimer);
+ NTDLL_GET_PROC(TpAllocWait);
+ NTDLL_GET_PROC(TpAllocWork);
NTDLL_GET_PROC(TpAllocWork);
+ NTDLL_GET_PROC(TpCallbackLeaveCriticalSectionOnCompletion);
+ NTDLL_GET_PROC(TpCallbackMayRunLong);
+ NTDLL_GET_PROC(TpCallbackReleaseMutexOnCompletion);
@ -70,15 +67,15 @@ index 2e31b34..d27627e 100644
+ NTDLL_GET_PROC(TpCancelAsyncIoOperation);
+ NTDLL_GET_PROC(TpDisassociateCallback);
+ NTDLL_GET_PROC(TpIsTimerSet);
+ NTDLL_GET_PROC(TpPostWork);
+ NTDLL_GET_PROC(TpReleaseCleanupGroup);
+ NTDLL_GET_PROC(TpReleaseCleanupGroupMembers);
NTDLL_GET_PROC(TpPostWork);
NTDLL_GET_PROC(TpReleaseCleanupGroup);
NTDLL_GET_PROC(TpReleaseCleanupGroupMembers);
+ NTDLL_GET_PROC(TpReleaseIoCompletion);
NTDLL_GET_PROC(TpReleasePool);
+ NTDLL_GET_PROC(TpReleaseTimer);
+ NTDLL_GET_PROC(TpReleaseWait);
+ NTDLL_GET_PROC(TpReleaseWork);
+ NTDLL_GET_PROC(TpSetPoolMaxThreads);
NTDLL_GET_PROC(TpReleaseWork);
NTDLL_GET_PROC(TpSetPoolMaxThreads);
+ NTDLL_GET_PROC(TpSetPoolMinThreads);
+ NTDLL_GET_PROC(TpSetTimer);
+ NTDLL_GET_PROC(TpSetWait);
@ -87,219 +84,13 @@ index 2e31b34..d27627e 100644
+ NTDLL_GET_PROC(TpWaitForIoCompletion);
+ NTDLL_GET_PROC(TpWaitForTimer);
+ NTDLL_GET_PROC(TpWaitForWait);
+ NTDLL_GET_PROC(TpWaitForWork);
NTDLL_GET_PROC(TpWaitForWork);
if (!pTpAllocPool)
{
- skip("Threadpool functions not supported, skipping tests\n");
+ win_skip("Threadpool functions not supported, skipping tests\n");
return FALSE;
}
@@ -65,13 +125,23 @@ static void CALLBACK simple_cb(TP_CALLBACK_INSTANCE *instance, void *userdata)
ReleaseSemaphore(semaphore, 1, NULL);
@@ -307,6 +351,425 @@ static void test_tp_work_scheduler(void)
pTpReleasePool(pool);
}
+static void CALLBACK simple2_cb(TP_CALLBACK_INSTANCE *instance, void *userdata)
+{
+ trace("Running simple callback\n");
+ Sleep(100);
+ InterlockedIncrement((LONG *)userdata);
+}
+
static void test_tp_simple(void)
{
TP_CALLBACK_ENVIRON environment;
+ TP_CLEANUP_GROUP *group;
HANDLE semaphore;
NTSTATUS status;
TP_POOL *pool;
+ LONG userdata;
DWORD result;
+ int i;
semaphore = CreateSemaphoreA(NULL, 0, 1, NULL);
ok(semaphore != NULL, "CreateSemaphoreA failed %u\n", GetLastError());
@@ -105,6 +175,7 @@ static void test_tp_simple(void)
environment.Version = 9999;
environment.Pool = pool;
status = pTpSimpleTryPost(simple_cb, semaphore, &environment);
+ todo_wine
ok(status == STATUS_INVALID_PARAMETER || broken(!status) /* Vista/2008 */,
"TpSimpleTryPost unexpectedly returned status %x\n", status);
if (!status)
@@ -113,9 +184,591 @@ static void test_tp_simple(void)
ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
}
+ /* allocate a cleanup group for synchronization */
+ group = NULL;
+ status = pTpAllocCleanupGroup(&group);
+ ok(!status, "TpAllocCleanupGroup failed with status %x\n", status);
+ ok(group != NULL, "expected pool != NULL\n");
+
+ userdata = 0;
+ memset(&environment, 0, sizeof(environment));
+ environment.Version = 1;
+ environment.Pool = pool;
+ environment.CleanupGroup = group;
+ status = pTpSimpleTryPost(simple2_cb, &userdata, &environment);
+ ok(!status, "TpSimpleTryPost failed with status %x\n", status);
+
+ pTpReleaseCleanupGroupMembers(group, FALSE, NULL);
+ ok(userdata == 1, "expected userdata = 1, got %u\n", userdata);
+
+ /* Test cancellation of pending simple callbacks. */
+ userdata = 0;
+ memset(&environment, 0, sizeof(environment));
+ environment.Version = 1;
+ environment.Pool = pool;
+ environment.CleanupGroup = group;
+ for (i = 0; i < 100; i++)
+ {
+ status = pTpSimpleTryPost(simple2_cb, &userdata, &environment);
+ ok(!status, "TpSimpleTryPost failed with status %x\n", status);
+ }
+
+ pTpReleaseCleanupGroupMembers(group, TRUE, NULL);
+ ok(userdata < 100, "expected userdata < 100, got %u\n", userdata);
+
/* cleanup */
+ pTpReleaseCleanupGroup(group);
+ pTpReleasePool(pool);
+ CloseHandle(semaphore);
+}
+
+static void CALLBACK work_cb(TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WORK *work)
+{
+ trace("Running work callback\n");
+ Sleep(10);
+ InterlockedIncrement( (LONG *)userdata );
+}
+
+static void CALLBACK work2_cb(TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WORK *work)
+{
+ trace("Running work2 callback\n");
+ Sleep(10);
+ InterlockedExchangeAdd( (LONG *)userdata, 0x10000 );
+}
+
+static void test_tp_work(void)
+{
+ TP_CALLBACK_ENVIRON environment;
+ TP_WORK *work;
+ TP_POOL *pool;
+ NTSTATUS status;
+ LONG userdata;
+ int i;
+
+ /* Allocate new threadpool */
+ pool = NULL;
+ status = pTpAllocPool(&pool, NULL);
+ ok(!status, "TpAllocPool failed with status %x\n", status);
+ ok(pool != NULL, "expected pool != NULL\n");
+
+ /* Allocate new work item */
+ work = NULL;
+ memset(&environment, 0, sizeof(environment));
+ environment.Version = 1;
+ environment.Pool = pool;
+ status = pTpAllocWork(&work, work_cb, &userdata, &environment);
+ ok(!status, "TpAllocWork failed with status %x\n", status);
+ ok(work != NULL, "expected work != NULL\n");
+
+ /* Post 10 identical work items at once */
+ userdata = 0;
+ for (i = 0; i < 10; i++)
+ pTpPostWork(work);
+ pTpWaitForWork(work, FALSE);
+ ok(userdata == 10, "expected userdata = 10, got %u\n", userdata);
+
+ /* Add more tasks and cancel them immediately */
+ userdata = 0;
+ for (i = 0; i < 10; i++)
+ pTpPostWork(work);
+ pTpWaitForWork(work, TRUE);
+ ok(userdata < 10, "expected userdata < 10, got %u\n", userdata);
+
+ /* Cleanup */
+ pTpReleaseWork(work);
+ pTpReleasePool(pool);
+}
+
+static void test_tp_work_scheduler(void)
+{
+ TP_CALLBACK_ENVIRON environment;
+ TP_CLEANUP_GROUP *group;
+ TP_WORK *work, *work2;
+ TP_POOL *pool;
+ NTSTATUS status;
+ LONG userdata;
+ int i;
+
+ /* Allocate new threadpool */
+ pool = NULL;
+ status = pTpAllocPool(&pool, NULL);
+ ok(!status, "TpAllocPool failed with status %x\n", status);
+ ok(pool != NULL, "expected pool != NULL\n");
+
+ /* We limit the pool to a single thread */
+ pTpSetPoolMaxThreads(pool, 1);
+
+ /* Create a cleanup group */
+ group = NULL;
+ status = pTpAllocCleanupGroup(&group);
+ ok(!status, "TpAllocCleanupGroup failed with status %x\n", status);
+ ok(group != NULL, "expected pool != NULL\n");
+
+ /* The first work item has no cleanup group associated */
+ work = NULL;
+ memset(&environment, 0, sizeof(environment));
+ environment.Version = 1;
+ environment.Pool = pool;
+ status = pTpAllocWork(&work, work_cb, &userdata, &environment);
+ ok(!status, "TpAllocWork failed with status %x\n", status);
+ ok(work != NULL, "expected work != NULL\n");
+
+ /* Allocate a second work item with a cleanup group */
+ work2 = NULL;
+ memset(&environment, 0, sizeof(environment));
+ environment.Version = 1;
+ environment.Pool = pool;
+ environment.CleanupGroup = group;
+ status = pTpAllocWork(&work2, work2_cb, &userdata, &environment);
+ ok(!status, "TpAllocWork failed with status %x\n", status);
+ ok(work2 != NULL, "expected work2 != NULL\n");
+
+ /* The 'work' callbacks are not blocking execution of work2 callbacks */
+ userdata = 0;
+ for (i = 0; i < 10; i++)
+ pTpPostWork(work);
+ for (i = 0; i < 10; i++)
+ pTpPostWork(work2);
+ Sleep(30);
+ pTpWaitForWork(work, TRUE);
+ pTpWaitForWork(work2, TRUE);
+ ok(userdata & 0xffff, "expected userdata & 0xffff != 0, got %u\n", userdata & 0xffff);
+ ok(userdata >> 16, "expected userdata >> 16 != 0, got %u\n", userdata >> 16);
+
+ /* Test ReleaseCleanupGroupMembers on a work item */
+ userdata = 0;
+ for (i = 0; i < 100; i++)
+ pTpPostWork(work);
+ for (i = 0; i < 10; i++)
+ pTpPostWork(work2);
+ pTpReleaseCleanupGroupMembers(group, FALSE, NULL);
+ pTpWaitForWork(work, TRUE);
+ ok((userdata & 0xffff) < 100, "expected userdata & 0xffff < 100, got %u\n", userdata & 0xffff);
+ ok(userdata >> 16 == 10, "expected userdata >> 16 == 10, got %u\n", userdata >> 16);
+
+ /* Cleanup */
+ pTpReleaseWork(work);
+ pTpReleaseCleanupGroup(group);
pTpReleasePool(pool);
+}
+
+static void CALLBACK instance_cb(TP_CALLBACK_INSTANCE *instance, void *userdata)
+{
+ trace("Running instance callback\n");
@ -427,7 +218,7 @@ index 2e31b34..d27627e 100644
+ ok(ret == WAIT_OBJECT_0, "expected ret = WAIT_OBJECT_0, got %u\n", ret);
+ while (userdata != 2) Sleep(10);
+
CloseHandle(semaphore);
+ CloseHandle(semaphore);
+
+ /* Cleanup */
+ pTpReleasePool(pool);
@ -717,15 +508,15 @@ index 2e31b34..d27627e 100644
+ pTpReleaseTimer(timer);
+ pTpReleaseTimer(timer2);
+ pTpReleasePool(pool);
}
+}
+
START_TEST(threadpool)
@@ -124,4 +777,13 @@ START_TEST(threadpool)
return;
{
if (!init_threadpool())
@@ -315,4 +778,11 @@ START_TEST(threadpool)
test_tp_simple();
+ test_tp_work();
+ test_tp_work_scheduler();
test_tp_work();
test_tp_work_scheduler();
+ test_tp_instance();
+ test_tp_group_cancel();
+ test_tp_timer();

View File

@ -1,4 +1,4 @@
From 9775c66000942a2ae9e52abe0d27dcca20a24470 Mon Sep 17 00:00:00 2001
From 62fdd332539cd14b48077665d6c88f12ac12b091 Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Sun, 1 Feb 2015 19:41:13 +0100
Subject: kernel32: Forward various threadpool functions to ntdll.
@ -11,7 +11,7 @@ Subject: kernel32: Forward various threadpool functions to ntdll.
4 files changed, 155 insertions(+), 27 deletions(-)
diff --git a/dlls/kernel32/kernel32.spec b/dlls/kernel32/kernel32.spec
index 3719505..4d170d5 100644
index c95c446..54d83da 100644
--- a/dlls/kernel32/kernel32.spec
+++ b/dlls/kernel32/kernel32.spec
@@ -204,7 +204,7 @@
@ -300,10 +300,10 @@ index 0abfdf1..085b011 100644
+ TpSetTimer( timer, due_time ? &timeout : NULL, period, window_length );
+}
diff --git a/include/winternl.h b/include/winternl.h
index 1a694da..a534bd1 100644
index 3992309..7608d21 100644
--- a/include/winternl.h
+++ b/include/winternl.h
@@ -2599,6 +2599,33 @@ NTSYSAPI LONGLONG WINAPI RtlLargeIntegerSubtract(LONGLONG,LONGLONG);
@@ -2604,6 +2604,33 @@ NTSYSAPI LONGLONG WINAPI RtlLargeIntegerSubtract(LONGLONG,LONGLONG);
NTSYSAPI NTSTATUS WINAPI RtlLargeIntegerToChar(const ULONGLONG *,ULONG,ULONG,PCHAR);
#endif
@ -338,5 +338,5 @@ index 1a694da..a534bd1 100644
NTSYSAPI NTSTATUS CDECL wine_nt_to_unix_file_name( const UNICODE_STRING *nameW, ANSI_STRING *unix_name_ret,
--
2.2.2
2.3.0

View File

@ -1,15 +1,15 @@
From 9c751685e15f98981a26275d2102b839fbee3472 Mon Sep 17 00:00:00 2001
From 59afc4395781ac5bddc1c846605771529eb2055b Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Fri, 6 Feb 2015 00:32:26 +0100
Date: Wed, 4 Mar 2015 13:33:25 +0100
Subject: ntdll: Implement threadpool wait objects.
---
dlls/ntdll/ntdll.spec | 8 +-
dlls/ntdll/threadpool2.c | 490 ++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 487 insertions(+), 11 deletions(-)
dlls/ntdll/threadpool2.c | 486 ++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 483 insertions(+), 11 deletions(-)
diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec
index f4328a9..17c71bc 100644
index 24b4c30..b4cfe69 100644
--- a/dlls/ntdll/ntdll.spec
+++ b/dlls/ntdll/ntdll.spec
@@ -974,7 +974,7 @@
@ -39,80 +39,76 @@ index f4328a9..17c71bc 100644
# @ stub TpSetWaitEx
@ stdcall TpSimpleTryPost(ptr ptr ptr)
# @ stub TpStartAsyncIoOperation
@@ -1028,5 +1028,5 @@
@@ -1027,7 +1027,7 @@
# @ stub TpWaitForIoCompletion
# @ stub TpWaitForJobNotification
@ stdcall TpWaitForTimer(ptr long)
-# @ stub TpWaitForWait
+@ stdcall TpWaitForWait(ptr long)
@ stdcall TpWaitForWork(ptr long)
@ stdcall -ret64 VerSetConditionMask(int64 long long)
@ stdcall ZwAcceptConnectPort(ptr long ptr long long ptr) NtAcceptConnectPort
diff --git a/dlls/ntdll/threadpool2.c b/dlls/ntdll/threadpool2.c
index 19096a0..ae98d7a 100644
index a1089a4..9b2a0c8 100644
--- a/dlls/ntdll/threadpool2.c
+++ b/dlls/ntdll/threadpool2.c
@@ -54,6 +54,7 @@ static inline LONG interlocked_dec( PLONG dest )
@@ -48,6 +48,7 @@ static inline LONG interlocked_dec( PLONG dest )
}
#define THREADPOOL_WORKER_TIMEOUT 5000
+#define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
/* allocated on the stack while a callback is running */
struct threadpool_instance
@@ -125,7 +126,8 @@ struct threadpool_object
TP_OBJECT_TYPE_UNDEFINED,
TP_OBJECT_TYPE_SIMPLE,
TP_OBJECT_TYPE_WORK,
- TP_OBJECT_TYPE_TIMER
+ TP_OBJECT_TYPE_TIMER,
+ TP_OBJECT_TYPE_WAIT
} type;
/* internal threadpool representation */
struct threadpool
@@ -69,7 +70,8 @@ enum threadpool_objtype
{
TP_OBJECT_TYPE_SIMPLE,
TP_OBJECT_TYPE_WORK,
- TP_OBJECT_TYPE_TIMER
+ TP_OBJECT_TYPE_TIMER,
+ TP_OBJECT_TYPE_WAIT
};
/* arguments for callback */
@@ -156,6 +158,20 @@ struct threadpool_object
LONG period;
LONG window_length;
/* internal threadpool object representation */
@@ -117,6 +119,17 @@ struct threadpool_object
LONG period;
LONG window_length;
} timer;
+ /* wait callback */
+ struct
+ {
+ PTP_WAIT_CALLBACK callback;
+ LONG signaled;
+
+ LONG signaled;
+ /* information about the wait, locked via waitqueue.cs */
+ struct waitqueue_bucket *bucket;
+ BOOL wait_pending;
+ struct list wait_entry;
+
+ ULONGLONG timeout;
+ HANDLE handle;
+ BOOL wait_pending;
+ struct list wait_entry;
+ ULONGLONG timeout;
+ HANDLE handle;
+ } wait;
} u;
};
@@ -199,6 +215,40 @@ static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
@@ -175,6 +188,38 @@ static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
};
+struct waitqueue_bucket
+{
+ struct list bucket_entry;
+
+ /* list of reserved slots / pending waits */
+ LONG num_waits;
+ struct list reserved;
+ struct list waits;
+ HANDLE update_event;
+ struct list bucket_entry;
+ LONG num_waits;
+ struct list reserved;
+ struct list waits;
+ HANDLE update_event;
+};
+
+/* global waitqueue object */
+static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
+
+static struct
+{
+ CRITICAL_SECTION cs;
+
+ /* list of buckets */
+ LONG num_buckets;
+ struct list buckets;
+ CRITICAL_SECTION cs;
+ LONG num_buckets;
+ struct list buckets;
+}
+waitqueue =
+{
@ -120,6 +116,7 @@ index 19096a0..ae98d7a 100644
+ 0,
+ LIST_INIT( waitqueue.buckets )
+};
+
+static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
+{
+ 0, 0, &waitqueue.cs,
@ -130,7 +127,7 @@ index 19096a0..ae98d7a 100644
static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
{
return (struct threadpool *)pool;
@@ -218,6 +268,13 @@ static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
@@ -194,6 +239,13 @@ static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
return object;
}
@ -144,31 +141,29 @@ index 19096a0..ae98d7a 100644
static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
{
return (struct threadpool_group *)group;
@@ -230,12 +287,13 @@ static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CAL
@@ -206,10 +258,11 @@ static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CAL
static void CALLBACK threadpool_worker_proc( void *param );
static void CALLBACK timerqueue_thread_proc( void *param );
+static void CALLBACK waitqueue_thread_proc( void *param );
static NTSTATUS tp_threadpool_alloc( struct threadpool **out );
static BOOL tp_threadpool_release( struct threadpool *pool );
static void tp_threadpool_shutdown( struct threadpool *pool );
static BOOL tp_threadpool_release( struct threadpool *pool );
-static void tp_object_submit( struct threadpool_object *object );
+static void tp_object_submit( struct threadpool_object *object, BOOL success );
static BOOL tp_object_release( struct threadpool_object *object );
static void tp_object_shutdown( struct threadpool_object *object );
@@ -384,7 +442,7 @@ update_timer:
static BOOL tp_object_release( struct threadpool_object *object );
static void tp_instance_initialize( struct threadpool_instance *instance, struct threadpool_object *object );
@@ -347,7 +400,7 @@ update_timer:
RtlLeaveCriticalSection( &timerqueue.cs );
if (submit_timer)
- tp_object_submit( new_timer );
+ tp_object_submit( new_timer, FALSE );
- tp_object_submit( new_timer );
+ tp_object_submit( new_timer, FALSE );
}
static void CALLBACK timerqueue_thread_proc( void *param )
@@ -411,7 +469,7 @@ static void CALLBACK timerqueue_thread_proc( void *param )
@@ -374,7 +427,7 @@ static void CALLBACK timerqueue_thread_proc( void *param )
/* Queue a new callback in one of the worker threads */
list_remove( &timer->u.timer.timer_entry );
@ -177,14 +172,10 @@ index 19096a0..ae98d7a 100644
/* Requeue the timer, except its marked for shutdown */
if (!timer->shutdown && timer->u.timer.period)
@@ -477,6 +535,306 @@ static void CALLBACK timerqueue_thread_proc( void *param )
@@ -440,6 +493,301 @@ static void CALLBACK timerqueue_thread_proc( void *param )
RtlLeaveCriticalSection( &timerqueue.cs );
}
+/***********************************************************************
+ * WAITQUEUE IMPLEMENTATION
+ ***********************************************************************/
+
+static NTSTATUS tp_waitqueue_acquire( struct threadpool_object *wait )
+{
+ struct waitqueue_bucket *bucket;
@ -192,14 +183,12 @@ index 19096a0..ae98d7a 100644
+ HANDLE thread;
+ assert( wait->type = TP_OBJECT_TYPE_WAIT );
+
+ wait->u.wait.signaled = 0;
+
+ wait->u.wait.bucket = NULL;
+ wait->u.wait.wait_pending = FALSE;
+ wait->u.wait.signaled = 0;
+ wait->u.wait.bucket = NULL;
+ wait->u.wait.wait_pending = FALSE;
+ memset( &wait->u.wait.wait_entry, 0, sizeof(wait->u.wait.wait_entry) );
+
+ wait->u.wait.timeout = 0;
+ wait->u.wait.handle = INVALID_HANDLE_VALUE;
+ wait->u.wait.timeout = 0;
+ wait->u.wait.handle = INVALID_HANDLE_VALUE;
+
+ RtlEnterCriticalSection( &waitqueue.cs );
+
@ -481,20 +470,21 @@ index 19096a0..ae98d7a 100644
+ NtClose( bucket->update_event );
+ RtlFreeHeap( GetProcessHeap(), 0, bucket );
+}
+
static struct threadpool *default_threadpool = NULL;
/***********************************************************************
* THREADPOOL INSTANCE IMPLEMENTATION
@@ -689,6 +1047,7 @@ static void tp_threadpool_shutdown( struct threadpool *pool )
static void CALLBACK threadpool_worker_proc( void *param )
{
/* allocates or returns the default threadpool */
@@ -536,6 +884,7 @@ static void CALLBACK threadpool_worker_proc( void *param )
struct threadpool_instance instance;
TP_CALLBACK_INSTANCE *cb_instance = (TP_CALLBACK_INSTANCE *)&instance;
struct threadpool *pool = param;
+ TP_WAIT_RESULT wait_result;
LARGE_INTEGER timeout;
struct list *ptr;
@@ -708,6 +1067,18 @@ static void CALLBACK threadpool_worker_proc( void *param )
@@ -553,6 +902,18 @@ static void CALLBACK threadpool_worker_proc( void *param )
if (--object->num_pending_callbacks)
list_add_tail( &pool->pool, &object->pool_entry );
object->num_running_callbacks++;
+ /* for wait objects, determine if the object was signaled or if this
+ * is a timeout. */
@ -509,16 +499,15 @@ index 19096a0..ae98d7a 100644
+ }
+
/* Leave critical section and do the actual callback. */
object->num_running_callbacks++;
pool->num_busy_workers++;
RtlLeaveCriticalSection( &pool->cs );
@@ -746,6 +1117,16 @@ static void CALLBACK threadpool_worker_proc( void *param )
@@ -588,6 +949,15 @@ static void CALLBACK threadpool_worker_proc( void *param )
break;
}
+ case TP_OBJECT_TYPE_WAIT:
+ {
+ TP_CALLBACK_INSTANCE *cb_instance = (TP_CALLBACK_INSTANCE *)&instance;
+ TRACE( "executing callback %p(%p, %p, %p, %u)\n",
+ TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
+ object->u.wait.callback, cb_instance, object->userdata, object, wait_result );
+ object->u.wait.callback( cb_instance, object->userdata, (TP_WAIT *)object, wait_result );
+ TRACE( "callback %p returned\n", object->u.wait.callback );
@ -526,18 +515,18 @@ index 19096a0..ae98d7a 100644
+ }
+
default:
FIXME( "callback type %u not implemented\n", object->type );
assert(0);
break;
@@ -857,7 +1238,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
@@ -687,7 +1057,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
* to the cleanup group. As soon as the cleanup group members are released ->shutdown
* will be set, and tp_object_submit would fail with an assertion. */
if (submit_and_release)
if (simple_cb)
- tp_object_submit( object );
+ tp_object_submit( object, FALSE );
/* Assign this object to a specific group. Please note that this has to be done
* as the last step before returning a pointer to the application, otherwise
@@ -956,6 +1337,38 @@ static NTSTATUS tp_object_alloc_timer( struct threadpool_object **out, PTP_TIMER
if (object->group)
{
@@ -800,8 +1170,45 @@ static NTSTATUS tp_object_alloc_timer( struct threadpool_object **out, PTP_TIMER
return STATUS_SUCCESS;
}
@ -545,13 +534,18 @@ index 19096a0..ae98d7a 100644
+ PVOID userdata, TP_CALLBACK_ENVIRON *environment )
+{
+ struct threadpool_object *object;
+ struct threadpool *pool;
+ struct threadpool *pool = NULL;
+ NTSTATUS status;
+
+ /* determine threadpool */
+ pool = environment ? (struct threadpool *)environment->Pool : NULL;
+ if (!pool) pool = get_default_threadpool();
+ if (!pool) return STATUS_NO_MEMORY;
+ if (environment)
+ pool = (struct threadpool *)environment->Pool;
+
+ if (!pool)
+ {
+ pool = get_default_threadpool();
+ if (!pool)
+ return STATUS_NO_MEMORY;
+ }
+
+ object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
+ if (!object)
@ -567,57 +561,21 @@ index 19096a0..ae98d7a 100644
+ return status;
+ }
+
+ tp_object_initialize( object, pool, userdata, environment, FALSE );
+ tp_object_initialize( object, pool, userdata, environment );
+
+ *out = object;
+ return STATUS_SUCCESS;
+}
+
static BOOL tp_object_release( struct threadpool_object *object )
{
struct threadpool_group *group;
@@ -996,6 +1409,11 @@ static void tp_object_shutdown( struct threadpool_object *object )
/* release reference on the timerqueue */
tp_timerqueue_release( object );
}
+ else if (object->type == TP_OBJECT_TYPE_WAIT)
+ {
+ /* release reference to the waitqueue */
+ tp_waitqueue_release( object );
+ }
object->shutdown = TRUE;
}
@@ -1007,12 +1425,17 @@ static void tp_object_cancel( struct threadpool_object *object, BOOL group_cance
/* Remove the pending callbacks from the pool */
RtlEnterCriticalSection( &pool->cs );
+
if (object->num_pending_callbacks)
{
pending_callbacks = object->num_pending_callbacks;
list_remove( &object->pool_entry );
object->num_pending_callbacks = 0;
}
+
+ if (object->type == TP_OBJECT_TYPE_WAIT)
+ object->u.wait.signaled = 0;
+
RtlLeaveCriticalSection( &pool->cs );
/* Execute group cancellation callback if defined, and if this was actually a group cancel. */
@@ -1039,7 +1462,7 @@ static void tp_object_wait( struct threadpool_object *object )
RtlLeaveCriticalSection( &pool->cs );
}
/* submits an object to a threadpool */
-static void tp_object_submit( struct threadpool_object *object )
+static void tp_object_submit( struct threadpool_object *object, BOOL success )
{
struct threadpool *pool = object->pool;
@@ -1076,6 +1499,10 @@ static void tp_object_submit( struct threadpool_object *object )
interlocked_inc( &object->refcount );
@@ -836,6 +1243,10 @@ out:
if (!object->num_pending_callbacks++)
list_add_tail( &pool->pool, &object->pool_entry );
+ /* increment success counter by one */
+ if (object->type == TP_OBJECT_TYPE_WAIT && success)
@ -626,7 +584,26 @@ index 19096a0..ae98d7a 100644
RtlLeaveCriticalSection( &pool->cs );
}
@@ -1210,6 +1637,16 @@ NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID
@@ -854,6 +1265,9 @@ static void tp_object_cancel( struct threadpool_object *object, BOOL group_cance
list_remove( &object->pool_entry );
}
+ if (object->type == TP_OBJECT_TYPE_WAIT)
+ object->u.wait.signaled = 0;
+
RtlLeaveCriticalSection( &pool->cs );
/* Execute group cancellation callback if defined, and if this was actually a group cancel. */
@@ -887,6 +1301,8 @@ static void tp_object_shutdown( struct threadpool_object *object )
{
if (object->type == TP_OBJECT_TYPE_TIMER)
tp_timerqueue_release( object );
+ else if (object->type == TP_OBJECT_TYPE_WAIT)
+ tp_waitqueue_release( object );
object->shutdown = TRUE;
}
@@ -1191,6 +1607,18 @@ NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID
}
/***********************************************************************
@ -636,23 +613,25 @@ index 19096a0..ae98d7a 100644
+ TP_CALLBACK_ENVIRON *environment )
+{
+ TRACE("%p %p %p %p\n", out, callback, userdata, environment);
+ return tp_object_alloc_wait( (struct threadpool_object **)out, callback, userdata, environment );
+
+ return tp_object_alloc_wait( (struct threadpool_object **)out, callback,
+ userdata, environment );
+}
+
+/***********************************************************************
* TpAllocWork (NTDLL.@)
*/
NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
@@ -1330,7 +1767,7 @@ VOID WINAPI TpPostWork( TP_WORK *work )
{
struct threadpool_object *this = impl_from_TP_WORK( work );
TRACE("%p\n", work);
- if (this) tp_object_submit( this );
+ if (this) tp_object_submit( this, FALSE );
@@ -1352,7 +1780,7 @@ VOID WINAPI TpPostWork( TP_WORK *work )
if (this)
{
- tp_object_submit( this );
+ tp_object_submit( this, FALSE );
}
}
/***********************************************************************
@@ -1386,6 +1823,20 @@ VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
@@ -1416,6 +1844,21 @@ VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
}
/***********************************************************************
@ -662,6 +641,7 @@ index 19096a0..ae98d7a 100644
+{
+ struct threadpool_object *this = impl_from_TP_WAIT( wait );
+ TRACE("%p\n", wait);
+
+ if (this)
+ {
+ tp_object_shutdown( this );
@ -673,7 +653,7 @@ index 19096a0..ae98d7a 100644
* TpReleaseWork (NTDLL.@)
*/
VOID WINAPI TpReleaseWork( TP_WORK *work )
@@ -1431,6 +1882,16 @@ VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LO
@@ -1478,6 +1921,20 @@ VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LO
}
/***********************************************************************
@ -683,14 +663,18 @@ index 19096a0..ae98d7a 100644
+{
+ struct threadpool_object *this = impl_from_TP_WAIT( wait );
+ TRACE("%p %p %p\n", wait, handle, timeout);
+ if (this) tp_waitqueue_update_wait( this, handle, timeout );
+
+ if (this)
+ {
+ tp_waitqueue_update_wait( this, handle, timeout );
+ }
+}
+
+/***********************************************************************
* TpSimpleTryPost (NTDLL.@)
*/
NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata, TP_CALLBACK_ENVIRON *environment )
@@ -1455,6 +1916,21 @@ VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
@@ -1504,6 +1961,21 @@ VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
}
/***********************************************************************
@ -713,5 +697,5 @@ index 19096a0..ae98d7a 100644
*/
VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
--
2.2.2
2.3.0

View File

@ -1,4 +1,4 @@
From c5ee3c98cf283e254fa1d572d8732ce8aa2ad203 Mon Sep 17 00:00:00 2001
From be47a4f14f26fde351a5a2d0bd9d613bb56a1268 Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Fri, 6 Feb 2015 20:09:41 +0100
Subject: ntdll/tests: Add tests for threadpool wait objects.
@ -8,7 +8,7 @@ Subject: ntdll/tests: Add tests for threadpool wait objects.
1 file changed, 287 insertions(+)
diff --git a/dlls/ntdll/tests/threadpool.c b/dlls/ntdll/tests/threadpool.c
index d1223d5..13c4dc8 100644
index b44b0f5..91d1b95 100644
--- a/dlls/ntdll/tests/threadpool.c
+++ b/dlls/ntdll/tests/threadpool.c
@@ -770,6 +770,291 @@ static void test_tp_window_length(void)

View File

@ -1,4 +1,4 @@
From 67ad3c12465ea23c508534d7dc660a3d03a69caf Mon Sep 17 00:00:00 2001
From 921fe6b2ea88ca6be056a8fd733ae099f2839eb5 Mon Sep 17 00:00:00 2001
From: Sebastian Lackner <sebastian@fds-team.de>
Date: Fri, 6 Feb 2015 20:24:27 +0100
Subject: kernel32: Forward threadpool wait functions to ntdll.
@ -10,7 +10,7 @@ Subject: kernel32: Forward threadpool wait functions to ntdll.
3 files changed, 51 insertions(+), 4 deletions(-)
diff --git a/dlls/kernel32/kernel32.spec b/dlls/kernel32/kernel32.spec
index 4d170d5..b4af160 100644
index 54d83da..abee11b 100644
--- a/dlls/kernel32/kernel32.spec
+++ b/dlls/kernel32/kernel32.spec
@@ -233,7 +233,7 @@
@ -108,10 +108,10 @@ index 085b011..77037ef 100644
+ TpSetWait( wait, handle, due_time ? &timeout : NULL );
+}
diff --git a/include/winternl.h b/include/winternl.h
index a534bd1..c93931b 100644
index 7608d21..dca7f46 100644
--- a/include/winternl.h
+++ b/include/winternl.h
@@ -2604,6 +2604,7 @@ NTSYSAPI NTSTATUS WINAPI RtlLargeIntegerToChar(const ULONGLONG *,ULONG,ULONG,PC
@@ -2609,6 +2609,7 @@ NTSYSAPI NTSTATUS WINAPI RtlLargeIntegerToChar(const ULONGLONG *,ULONG,ULONG,PC
NTSYSAPI NTSTATUS WINAPI TpAllocCleanupGroup(TP_CLEANUP_GROUP **);
NTSYSAPI NTSTATUS WINAPI TpAllocPool(TP_POOL **,PVOID);
NTSYSAPI NTSTATUS WINAPI TpAllocTimer(TP_TIMER **,PTP_TIMER_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
@ -119,7 +119,7 @@ index a534bd1..c93931b 100644
NTSYSAPI NTSTATUS WINAPI TpAllocWork(TP_WORK **,PTP_WORK_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
NTSYSAPI void WINAPI TpCallbackLeaveCriticalSectionOnCompletion(TP_CALLBACK_INSTANCE *,RTL_CRITICAL_SECTION *);
NTSYSAPI NTSTATUS WINAPI TpCallbackMayRunLong(TP_CALLBACK_INSTANCE *);
@@ -2618,12 +2619,15 @@ NTSYSAPI void WINAPI TpReleaseCleanupGroup(TP_CLEANUP_GROUP *);
@@ -2623,12 +2624,15 @@ NTSYSAPI void WINAPI TpReleaseCleanupGroup(TP_CLEANUP_GROUP *);
NTSYSAPI void WINAPI TpReleaseCleanupGroupMembers(TP_CLEANUP_GROUP *,BOOL,PVOID);
NTSYSAPI void WINAPI TpReleasePool(TP_POOL *);
NTSYSAPI void WINAPI TpReleaseTimer(TP_TIMER *);
@ -136,5 +136,5 @@ index a534bd1..c93931b 100644
/* Wine internal functions */
--
2.2.2
2.3.0

View File

@ -2962,19 +2962,41 @@ fi
# | dlls/ntdll/ntdll.spec, dlls/ntdll/tests/threadpool.c, dlls/ntdll/threadpool2.c, include/winternl.h
# |
if test "$enable_ntdll_Vista_Threadpool" -eq 1; then
patch_apply ntdll-Vista_Threadpool/0001-ntdll-Add-threadpool-stub-functions-to-specfile.patch
patch_apply ntdll-Vista_Threadpool/0002-ntdll-Implement-threadpool-cleanup-group-and-callbac.patch
patch_apply ntdll-Vista_Threadpool/0003-ntdll-Implement-additional-threadpool-work-item-func.patch
patch_apply ntdll-Vista_Threadpool/0004-ntdll-Implement-threadpool-timer-functions.-rev-2.patch
patch_apply ntdll-Vista_Threadpool/0005-ntdll-tests-Add-tests-for-Tp-threadpool-functions.patch
patch_apply ntdll-Vista_Threadpool/0006-kernel32-Forward-various-threadpool-functions-to-ntd.patch
patch_apply ntdll-Vista_Threadpool/0007-ntdll-Implement-threadpool-wait-objects.patch
patch_apply ntdll-Vista_Threadpool/0008-ntdll-tests-Add-tests-for-threadpool-wait-objects.patch
patch_apply ntdll-Vista_Threadpool/0009-kernel32-Forward-threadpool-wait-functions-to-ntdll.patch
patch_apply ntdll-Vista_Threadpool/0001-ntdll-Implement-TpSimpleTryPost-and-basic-threadpool.patch
patch_apply ntdll-Vista_Threadpool/0002-ntdll-Implement-TpSetPool-Min-Max-Threads.patch
patch_apply ntdll-Vista_Threadpool/0003-ntdll-Implement-threadpool-cleanup-group-functions.patch
patch_apply ntdll-Vista_Threadpool/0004-ntdll-tests-Add-tests-for-TpAllocCleanupGroup-and-re.patch
patch_apply ntdll-Vista_Threadpool/0005-ntdll-Implement-threadpool-work-item-functions.patch
patch_apply ntdll-Vista_Threadpool/0006-ntdll-tests-Add-basic-tests-for-threadpool-work-item.patch
patch_apply ntdll-Vista_Threadpool/0007-ntdll-tests-Add-threadpool-scheduler-tests-for-work-.patch
patch_apply ntdll-Vista_Threadpool/0008-ntdll-Add-support-for-threadpool-group-cancel-callba.patch
patch_apply ntdll-Vista_Threadpool/0009-ntdll-Add-support-for-threadpool-finalization-callba.patch
patch_apply ntdll-Vista_Threadpool/0010-ntdll-Implement-threadpool-RaceDll-environment-varia.patch
patch_apply ntdll-Vista_Threadpool/0011-ntdll-Implement-TpCallbackMayRunLong-and-instance-st.patch
patch_apply ntdll-Vista_Threadpool/0012-ntdll-Implement-TpDisassociateCallback.patch
patch_apply ntdll-Vista_Threadpool/0013-ntdll-Implement-various-TpCallback-OnCompletion-func.patch
patch_apply ntdll-Vista_Threadpool/0014-ntdll-Add-remaining-threadpool-functions-to-specfile.patch
patch_apply ntdll-Vista_Threadpool/0015-ntdll-Implement-threadpool-timer-functions.-rev-2.patch
patch_apply ntdll-Vista_Threadpool/0016-ntdll-tests-Add-tests-for-Tp-threadpool-functions.patch
patch_apply ntdll-Vista_Threadpool/0017-kernel32-Forward-various-threadpool-functions-to-ntd.patch
patch_apply ntdll-Vista_Threadpool/0018-ntdll-Implement-threadpool-wait-objects.patch
patch_apply ntdll-Vista_Threadpool/0019-ntdll-tests-Add-tests-for-threadpool-wait-objects.patch
patch_apply ntdll-Vista_Threadpool/0020-kernel32-Forward-threadpool-wait-functions-to-ntdll.patch
(
echo '+ { "Sebastian Lackner", "ntdll: Add threadpool stub functions to specfile.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Implement threadpool, cleanup group and callback instance functions.", 2 },';
echo '+ { "Sebastian Lackner", "ntdll: Implement additional threadpool work item functions.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Implement TpSimpleTryPost and basic threadpool infrastructure.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Implement TpSetPool[Min|Max]Threads.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Implement threadpool cleanup group functions.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll/tests: Add tests for TpAllocCleanupGroup and related functions.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Implement threadpool work item functions.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll/tests: Add basic tests for threadpool work items.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll/tests: Add threadpool scheduler tests for work items.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Add support for threadpool group cancel callback.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Add support for threadpool finalization callback.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Implement threadpool RaceDll environment variable.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Implement TpCallbackMayRunLong and instance structure.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Implement TpDisassociateCallback.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Implement various TpCallback*OnCompletion functions.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Add remaining threadpool functions to specfile.", 1 },';
echo '+ { "Sebastian Lackner", "ntdll: Implement threadpool timer functions.", 2 },';
echo '+ { "Sebastian Lackner", "ntdll/tests: Add tests for Tp* threadpool functions.", 1 },';
echo '+ { "Sebastian Lackner", "kernel32: Forward various threadpool functions to ntdll.", 1 },';