1 #undef G_DISABLE_ASSERT
13 #include <fcntl.h> /* For _O_BINARY used by pipe() macro */
14 #include <io.h> /* for _pipe() */
15 #define pipe(fds) _pipe(fds, 4096, _O_BINARY)
22 #define CRAWLER_TIMEOUT_RANGE 40
23 #define RECURSER_TIMEOUT 50
25 /* The partial ordering between the context array mutex and
26 * crawler array mutex is that the crawler array mutex cannot
27 * be locked while the context array mutex is locked
29 GPtrArray
*context_array
;
30 GMutex context_array_mutex
;
31 GCond context_array_cond
;
35 G_LOCK_DEFINE_STATIC (crawler_array_lock
);
36 GPtrArray
*crawler_array
;
38 typedef struct _AddrData AddrData
;
39 typedef struct _TestData TestData
;
55 static void cleanup_crawlers (GMainContext
*context
);
58 read_all (GIOChannel
*channel
, char *buf
, gsize len
)
64 while (bytes_read
< len
)
66 err
= g_io_channel_read (channel
, buf
+ bytes_read
, len
- bytes_read
, &count
);
69 if (err
!= G_IO_ERROR_AGAIN
)
82 write_all (GIOChannel
*channel
, char *buf
, gsize len
)
84 gsize bytes_written
= 0;
88 while (bytes_written
< len
)
90 err
= g_io_channel_write (channel
, buf
+ bytes_written
, len
- bytes_written
, &count
);
91 if (err
&& err
!= G_IO_ERROR_AGAIN
)
94 bytes_written
+= count
;
101 adder_callback (GIOChannel
*source
,
102 GIOCondition condition
,
110 AddrData
*addr_data
= data
;
112 if (!read_all (source
, buf1
, 32) ||
113 !read_all (source
, buf2
, 32))
115 g_main_loop_quit (addr_data
->loop
);
119 sprintf (result
, "%d", atoi(buf1
) + atoi(buf2
));
120 write_all (addr_data
->dest
, result
, 32);
126 timeout_callback (gpointer data
)
128 AddrData
*addr_data
= data
;
136 adder_thread (gpointer data
)
138 GMainContext
*context
;
139 GSource
*adder_source
;
140 GSource
*timeout_source
;
142 GIOChannel
**channels
= data
;
145 context
= g_main_context_new ();
147 g_mutex_lock (&context_array_mutex
);
149 g_ptr_array_add (context_array
, context
);
151 if (context_array
->len
== NTHREADS
)
152 g_cond_broadcast (&context_array_cond
);
154 g_mutex_unlock (&context_array_mutex
);
156 addr_data
.dest
= channels
[1];
157 addr_data
.loop
= g_main_loop_new (context
, FALSE
);
160 adder_source
= g_io_create_watch (channels
[0], G_IO_IN
| G_IO_HUP
);
161 g_source_set_name (adder_source
, "Adder I/O");
162 g_source_set_callback (adder_source
, (GSourceFunc
)adder_callback
, &addr_data
, NULL
);
163 g_source_attach (adder_source
, context
);
164 g_source_unref (adder_source
);
166 timeout_source
= g_timeout_source_new (10);
167 g_source_set_name (timeout_source
, "Adder timeout");
168 g_source_set_callback (timeout_source
, (GSourceFunc
)timeout_callback
, &addr_data
, NULL
);
169 g_source_set_priority (timeout_source
, G_PRIORITY_HIGH
);
170 g_source_attach (timeout_source
, context
);
171 g_source_unref (timeout_source
);
173 g_main_loop_run (addr_data
.loop
);
175 g_io_channel_unref (channels
[0]);
176 g_io_channel_unref (channels
[1]);
180 g_main_loop_unref (addr_data
.loop
);
183 g_print ("Timeout run %d times\n", addr_data
.count
);
186 g_mutex_lock (&context_array_mutex
);
187 g_ptr_array_remove (context_array
, context
);
188 if (context_array
->len
== 0)
189 g_main_loop_quit (main_loop
);
190 g_mutex_unlock (&context_array_mutex
);
192 cleanup_crawlers (context
);
198 io_pipe (GIOChannel
**channels
)
204 g_warning ("Cannot create pipe %s\n", g_strerror (errno
));
208 channels
[0] = g_io_channel_unix_new (fds
[0]);
209 channels
[1] = g_io_channel_unix_new (fds
[1]);
211 g_io_channel_set_close_on_unref (channels
[0], TRUE
);
212 g_io_channel_set_close_on_unref (channels
[1], TRUE
);
216 do_add (GIOChannel
*in
, gint a
, gint b
)
221 sprintf (buf1
, "%d", a
);
222 sprintf (buf2
, "%d", b
);
224 write_all (in
, buf1
, 32);
225 write_all (in
, buf2
, 32);
229 adder_response (GIOChannel
*source
,
230 GIOCondition condition
,
234 TestData
*test_data
= data
;
236 if (!read_all (source
, result
, 32))
239 test_data
->current_val
= atoi (result
);
242 if (test_data
->iters
== 0)
244 if (test_data
->current_val
!= ITERS
* INCREMENT
)
246 g_print ("Addition failed: %d != %d\n",
247 test_data
->current_val
, ITERS
* INCREMENT
);
251 g_io_channel_unref (source
);
252 g_io_channel_unref (test_data
->in
);
259 do_add (test_data
->in
, test_data
->current_val
, INCREMENT
);
265 create_adder_thread (void)
270 GIOChannel
*in_channels
[2];
271 GIOChannel
*out_channels
[2];
273 GIOChannel
**sub_channels
;
275 sub_channels
= g_new (GIOChannel
*, 2);
277 io_pipe (in_channels
);
278 io_pipe (out_channels
);
280 sub_channels
[0] = in_channels
[0];
281 sub_channels
[1] = out_channels
[1];
283 g_thread_create (adder_thread
, sub_channels
, FALSE
, &err
);
287 g_warning ("Cannot create thread: %s", err
->message
);
291 test_data
= g_new (TestData
, 1);
292 test_data
->in
= in_channels
[1];
293 test_data
->current_val
= 0;
294 test_data
->iters
= ITERS
;
296 g_io_add_watch (out_channels
[0], G_IO_IN
| G_IO_HUP
,
297 adder_response
, test_data
);
299 do_add (test_data
->in
, test_data
->current_val
, INCREMENT
);
302 static void create_crawler (void);
305 remove_crawler (void)
307 GSource
*other_source
;
309 if (crawler_array
->len
> 0)
311 other_source
= crawler_array
->pdata
[g_random_int_range (0, crawler_array
->len
)];
312 g_source_destroy (other_source
);
313 g_assert (g_ptr_array_remove_fast (crawler_array
, other_source
));
318 crawler_callback (gpointer data
)
320 GSource
*source
= data
;
322 G_LOCK (crawler_array_lock
);
324 if (!g_ptr_array_remove_fast (crawler_array
, source
))
328 G_UNLOCK (crawler_array_lock
);
337 create_crawler (void)
339 GSource
*source
= g_timeout_source_new (g_random_int_range (0, CRAWLER_TIMEOUT_RANGE
));
340 g_source_set_name (source
, "Crawler timeout");
341 g_source_set_callback (source
, (GSourceFunc
)crawler_callback
, source
, NULL
);
343 G_LOCK (crawler_array_lock
);
344 g_ptr_array_add (crawler_array
, source
);
346 g_mutex_lock (&context_array_mutex
);
347 g_source_attach (source
, context_array
->pdata
[g_random_int_range (0, context_array
->len
)]);
348 g_source_unref (source
);
349 g_mutex_unlock (&context_array_mutex
);
351 G_UNLOCK (crawler_array_lock
);
355 cleanup_crawlers (GMainContext
*context
)
359 G_LOCK (crawler_array_lock
);
360 for (i
=0; i
< crawler_array
->len
; i
++)
362 if (g_source_get_context (crawler_array
->pdata
[i
]) == context
)
364 g_source_destroy (g_ptr_array_remove_index (crawler_array
, i
));
368 G_UNLOCK (crawler_array_lock
);
372 recurser_idle (gpointer data
)
374 GMainContext
*context
= data
;
377 for (i
= 0; i
< 10; i
++)
378 g_main_context_iteration (context
, FALSE
);
384 recurser_start (gpointer data
)
386 GMainContext
*context
;
389 g_mutex_lock (&context_array_mutex
);
390 context
= context_array
->pdata
[g_random_int_range (0, context_array
->len
)];
391 source
= g_idle_source_new ();
392 g_source_set_name (source
, "Recursing idle source");
393 g_source_set_callback (source
, recurser_idle
, context
, NULL
);
394 g_source_attach (source
, context
);
395 g_source_unref (source
);
396 g_mutex_unlock (&context_array_mutex
);
407 g_thread_init (NULL
);
409 context_array
= g_ptr_array_new ();
411 crawler_array
= g_ptr_array_new ();
413 main_loop
= g_main_loop_new (NULL
, FALSE
);
415 for (i
= 0; i
< NTHREADS
; i
++)
416 create_adder_thread ();
418 /* Wait for all threads to start
420 g_mutex_lock (&context_array_mutex
);
422 if (context_array
->len
< NTHREADS
)
423 g_cond_wait (&context_array_cond
, &context_array_mutex
);
425 g_mutex_unlock (&context_array_mutex
);
427 for (i
= 0; i
< NCRAWLERS
; i
++)
430 g_timeout_add (RECURSER_TIMEOUT
, recurser_start
, NULL
);
432 g_main_loop_run (main_loop
);
433 g_main_loop_unref (main_loop
);