1 #undef G_DISABLE_ASSERT
3 #undef G_DISABLE_DEPRECATED
10 #define DEBUG_MSG(args)
11 /* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n"); */
12 #define PRINT_MSG(args)
13 /* #define PRINT_MSG(args) g_printerr args ; g_printerr ("\n"); */
15 #define MAX_THREADS 50
16 #define MAX_SORTS 5 /* only applies if
17 ASYC_QUEUE_DO_SORT is set to 1 */
18 #define MAX_TIME 20 /* seconds */
19 #define MIN_TIME 5 /* seconds */
21 #define SORT_QUEUE_AFTER 1
22 #define SORT_QUEUE_ON_PUSH 1 /* if this is done, the
23 SORT_QUEUE_AFTER is ignored */
24 #define QUIT_WHEN_DONE 1
27 #if SORT_QUEUE_ON_PUSH == 1
28 # undef SORT_QUEUE_AFTER
29 # define SORT_QUEUE_AFTER 0
33 static GMainLoop
*main_loop
= NULL
;
34 static GThreadPool
*thread_pool
= NULL
;
35 static GAsyncQueue
*async_queue
= NULL
;
39 sort_compare (gconstpointer p1
, gconstpointer p2
, gpointer user_data
)
44 id1
= GPOINTER_TO_INT (p1
);
45 id2
= GPOINTER_TO_INT (p2
);
47 DEBUG_MSG (("comparing #1:%d and #2:%d, returning %d",
48 id1
, id2
, (id1
> id2
? +1 : id1
== id2
? 0 : -1)));
50 return (id1
> id2
? +1 : id1
== id2
? 0 : -1);
54 sort_queue (gpointer user_data
)
56 static gint sorts
= 0;
57 static gpointer last_p
= NULL
;
59 gboolean can_quit
= FALSE
;
64 sort_multiplier
= GPOINTER_TO_INT (user_data
);
66 if (SORT_QUEUE_AFTER
) {
67 PRINT_MSG (("sorting async queue..."));
68 g_async_queue_sort (async_queue
, sort_compare
, NULL
);
72 if (sorts
>= sort_multiplier
) {
76 g_async_queue_sort (async_queue
, sort_compare
, NULL
);
77 len
= g_async_queue_length (async_queue
);
79 PRINT_MSG (("sorted queue (for %d/%d times, size:%d)...", sorts
, MAX_SORTS
, len
));
82 len
= g_async_queue_length (async_queue
);
83 DEBUG_MSG (("printing queue (size:%d)...", len
));
86 for (i
= 0, last_p
= NULL
; i
< len
; i
++) {
87 p
= g_async_queue_pop (async_queue
);
88 DEBUG_MSG (("item %d ---> %d", i
, GPOINTER_TO_INT (p
)));
91 g_assert (GPOINTER_TO_INT (last_p
) <= GPOINTER_TO_INT (p
));
97 if (can_quit
&& QUIT_WHEN_DONE
) {
98 g_main_loop_quit (main_loop
);
105 enter_thread (gpointer data
, gpointer user_data
)
107 gint len G_GNUC_UNUSED
;
111 id
= GPOINTER_TO_INT (data
);
113 ms
= g_random_int_range (MIN_TIME
* 1000, MAX_TIME
* 1000);
114 DEBUG_MSG (("entered thread with id:%d, adding to queue in:%ld ms", id
, ms
));
116 g_usleep (ms
* 1000);
118 if (SORT_QUEUE_ON_PUSH
) {
119 g_async_queue_push_sorted (async_queue
, GINT_TO_POINTER (id
), sort_compare
, NULL
);
121 g_async_queue_push (async_queue
, GINT_TO_POINTER (id
));
124 len
= g_async_queue_length (async_queue
);
126 DEBUG_MSG (("thread id:%d added to async queue (size:%d)",
130 static gint destroy_count
= 0;
133 counting_destroy (gpointer item
)
146 q
= g_async_queue_new_full (counting_destroy
);
147 g_async_queue_lock (q
);
148 g_async_queue_ref (q
);
149 g_async_queue_unlock (q
);
150 g_async_queue_lock (q
);
151 g_async_queue_ref_unlocked (q
);
152 g_async_queue_unref_and_unlock (q
);
154 item
= g_async_queue_try_pop (q
);
155 g_assert (item
== NULL
);
157 g_async_queue_lock (q
);
158 item
= g_async_queue_try_pop_unlocked (q
);
159 g_async_queue_unlock (q
);
160 g_assert (item
== NULL
);
162 g_async_queue_push (q
, GINT_TO_POINTER (1));
163 g_async_queue_push (q
, GINT_TO_POINTER (2));
164 g_async_queue_push (q
, GINT_TO_POINTER (3));
165 g_assert_cmpint (destroy_count
, ==, 0);
167 g_async_queue_unref (q
);
168 g_assert_cmpint (destroy_count
, ==, 0);
170 item
= g_async_queue_pop (q
);
171 g_assert_cmpint (GPOINTER_TO_INT (item
), ==, 1);
172 g_assert_cmpint (destroy_count
, ==, 0);
174 g_async_queue_unref (q
);
175 g_assert_cmpint (destroy_count
, ==, 2);
179 main (int argc
, char *argv
[])
182 gint max_threads
= MAX_THREADS
;
183 gint max_unused_threads
= MAX_THREADS
;
184 gint sort_multiplier
= MAX_SORTS
;
186 gchar
*msg G_GNUC_UNUSED
;
190 PRINT_MSG (("creating async queue..."));
191 async_queue
= g_async_queue_new ();
193 g_return_val_if_fail (async_queue
!= NULL
, EXIT_FAILURE
);
195 PRINT_MSG (("creating thread pool with max threads:%d, max unused threads:%d...",
196 max_threads
, max_unused_threads
));
197 thread_pool
= g_thread_pool_new (enter_thread
,
203 g_return_val_if_fail (thread_pool
!= NULL
, EXIT_FAILURE
);
205 g_thread_pool_set_max_unused_threads (max_unused_threads
);
207 PRINT_MSG (("creating threads..."));
208 for (i
= 1; i
<= max_threads
; i
++) {
209 GError
*error
= NULL
;
211 g_thread_pool_push (thread_pool
, GINT_TO_POINTER (i
), &error
);
213 g_assert_no_error (error
);
216 if (!SORT_QUEUE_AFTER
) {
220 sort_interval
= ((MAX_TIME
/ sort_multiplier
) + 2) * 1000;
221 g_timeout_add (sort_interval
, sort_queue
, GINT_TO_POINTER (sort_multiplier
));
223 if (SORT_QUEUE_ON_PUSH
) {
224 msg
= "sorting when pushing into the queue, checking queue is sorted";
229 PRINT_MSG (("%s %d %s %d ms",
232 sort_multiplier
== 1 ? "time in" : "times, once every",
235 DEBUG_MSG (("entering main event loop"));
237 main_loop
= g_main_loop_new (NULL
, FALSE
);
238 g_main_loop_run (main_loop
);
240 g_main_loop_unref (main_loop
);
241 g_thread_pool_free (thread_pool
, TRUE
, TRUE
);
242 g_async_queue_unref (async_queue
);