1 #undef G_DISABLE_ASSERT
13 #include <fcntl.h> /* For _O_BINARY used by pipe() macro */
14 #include <io.h> /* for _pipe() */
15 #define pipe(fds) _pipe(fds, 4096, _O_BINARY)
22 #define CRAWLER_TIMEOUT_RANGE 40
23 #define RECURSER_TIMEOUT 50
25 /* The partial ordering between the context array mutex and
26 * crawler array mutex is that the crawler array mutex cannot
27 * be locked while the context array mutex is locked
29 GPtrArray
*context_array
;
30 GMutex context_array_mutex
;
31 GCond context_array_cond
;
35 G_LOCK_DEFINE_STATIC (crawler_array_lock
);
36 GPtrArray
*crawler_array
;
38 typedef struct _AddrData AddrData
;
39 typedef struct _TestData TestData
;
55 static void cleanup_crawlers (GMainContext
*context
);
58 read_all (GIOChannel
*channel
, char *buf
, gsize len
)
64 while (bytes_read
< len
)
66 err
= g_io_channel_read (channel
, buf
+ bytes_read
, len
- bytes_read
, &count
);
69 if (err
!= G_IO_ERROR_AGAIN
)
82 write_all (GIOChannel
*channel
, char *buf
, gsize len
)
84 gsize bytes_written
= 0;
88 while (bytes_written
< len
)
90 err
= g_io_channel_write (channel
, buf
+ bytes_written
, len
- bytes_written
, &count
);
91 if (err
&& err
!= G_IO_ERROR_AGAIN
)
94 bytes_written
+= count
;
101 adder_callback (GIOChannel
*source
,
102 GIOCondition condition
,
108 char result
[32] = { 0, };
110 AddrData
*addr_data
= data
;
112 if (!read_all (source
, buf1
, 32) ||
113 !read_all (source
, buf2
, 32))
115 g_main_loop_quit (addr_data
->loop
);
119 sprintf (result
, "%d", atoi(buf1
) + atoi(buf2
));
120 write_all (addr_data
->dest
, result
, 32);
126 timeout_callback (gpointer data
)
128 AddrData
*addr_data
= data
;
136 adder_thread (gpointer data
)
138 GMainContext
*context
;
139 GSource
*adder_source
;
140 GSource
*timeout_source
;
142 GIOChannel
**channels
= data
;
145 context
= g_main_context_new ();
147 g_mutex_lock (&context_array_mutex
);
149 g_ptr_array_add (context_array
, context
);
151 if (context_array
->len
== NTHREADS
)
152 g_cond_broadcast (&context_array_cond
);
154 g_mutex_unlock (&context_array_mutex
);
156 addr_data
.dest
= channels
[1];
157 addr_data
.loop
= g_main_loop_new (context
, FALSE
);
160 adder_source
= g_io_create_watch (channels
[0], G_IO_IN
| G_IO_HUP
);
161 g_source_set_name (adder_source
, "Adder I/O");
162 g_source_set_callback (adder_source
, (GSourceFunc
)adder_callback
, &addr_data
, NULL
);
163 g_source_attach (adder_source
, context
);
164 g_source_unref (adder_source
);
166 timeout_source
= g_timeout_source_new (10);
167 g_source_set_name (timeout_source
, "Adder timeout");
168 g_source_set_callback (timeout_source
, (GSourceFunc
)timeout_callback
, &addr_data
, NULL
);
169 g_source_set_priority (timeout_source
, G_PRIORITY_HIGH
);
170 g_source_attach (timeout_source
, context
);
171 g_source_unref (timeout_source
);
173 g_main_loop_run (addr_data
.loop
);
175 g_io_channel_unref (channels
[0]);
176 g_io_channel_unref (channels
[1]);
180 g_main_loop_unref (addr_data
.loop
);
183 g_print ("Timeout run %d times\n", addr_data
.count
);
186 g_mutex_lock (&context_array_mutex
);
187 g_ptr_array_remove (context_array
, context
);
188 if (context_array
->len
== 0)
189 g_main_loop_quit (main_loop
);
190 g_mutex_unlock (&context_array_mutex
);
192 cleanup_crawlers (context
);
193 g_main_context_unref (context
);
199 io_pipe (GIOChannel
**channels
)
206 g_warning ("Cannot create pipe %s\n", g_strerror (errsv
));
210 channels
[0] = g_io_channel_unix_new (fds
[0]);
211 channels
[1] = g_io_channel_unix_new (fds
[1]);
213 g_io_channel_set_close_on_unref (channels
[0], TRUE
);
214 g_io_channel_set_close_on_unref (channels
[1], TRUE
);
218 do_add (GIOChannel
*in
, gint a
, gint b
)
220 char buf1
[32] = { 0, };
221 char buf2
[32] = { 0, };
223 sprintf (buf1
, "%d", a
);
224 sprintf (buf2
, "%d", b
);
226 write_all (in
, buf1
, 32);
227 write_all (in
, buf2
, 32);
231 adder_response (GIOChannel
*source
,
232 GIOCondition condition
,
236 TestData
*test_data
= data
;
238 if (!read_all (source
, result
, 32))
241 test_data
->current_val
= atoi (result
);
244 if (test_data
->iters
== 0)
246 if (test_data
->current_val
!= ITERS
* INCREMENT
)
248 g_print ("Addition failed: %d != %d\n",
249 test_data
->current_val
, ITERS
* INCREMENT
);
253 g_io_channel_unref (source
);
254 g_io_channel_unref (test_data
->in
);
261 do_add (test_data
->in
, test_data
->current_val
, INCREMENT
);
267 create_adder_thread (void)
272 GIOChannel
*in_channels
[2];
273 GIOChannel
*out_channels
[2];
275 GIOChannel
**sub_channels
;
277 sub_channels
= g_new (GIOChannel
*, 2);
279 io_pipe (in_channels
);
280 io_pipe (out_channels
);
282 sub_channels
[0] = in_channels
[0];
283 sub_channels
[1] = out_channels
[1];
285 g_thread_create (adder_thread
, sub_channels
, FALSE
, &err
);
289 g_warning ("Cannot create thread: %s", err
->message
);
293 test_data
= g_new (TestData
, 1);
294 test_data
->in
= in_channels
[1];
295 test_data
->current_val
= 0;
296 test_data
->iters
= ITERS
;
298 g_io_add_watch (out_channels
[0], G_IO_IN
| G_IO_HUP
,
299 adder_response
, test_data
);
301 do_add (test_data
->in
, test_data
->current_val
, INCREMENT
);
304 static void create_crawler (void);
307 remove_crawler (void)
309 GSource
*other_source
;
311 if (crawler_array
->len
> 0)
313 other_source
= crawler_array
->pdata
[g_random_int_range (0, crawler_array
->len
)];
314 g_source_destroy (other_source
);
315 g_assert (g_ptr_array_remove_fast (crawler_array
, other_source
));
320 crawler_callback (gpointer data
)
322 GSource
*source
= data
;
324 G_LOCK (crawler_array_lock
);
326 if (!g_ptr_array_remove_fast (crawler_array
, source
))
330 G_UNLOCK (crawler_array_lock
);
339 create_crawler (void)
341 GSource
*source
= g_timeout_source_new (g_random_int_range (0, CRAWLER_TIMEOUT_RANGE
));
342 g_source_set_name (source
, "Crawler timeout");
343 g_source_set_callback (source
, (GSourceFunc
)crawler_callback
, source
, NULL
);
345 G_LOCK (crawler_array_lock
);
346 g_ptr_array_add (crawler_array
, source
);
348 g_mutex_lock (&context_array_mutex
);
349 g_source_attach (source
, context_array
->pdata
[g_random_int_range (0, context_array
->len
)]);
350 g_source_unref (source
);
351 g_mutex_unlock (&context_array_mutex
);
353 G_UNLOCK (crawler_array_lock
);
357 cleanup_crawlers (GMainContext
*context
)
361 G_LOCK (crawler_array_lock
);
362 for (i
=0; i
< crawler_array
->len
; i
++)
364 if (g_source_get_context (crawler_array
->pdata
[i
]) == context
)
366 g_source_destroy (g_ptr_array_remove_index (crawler_array
, i
));
370 G_UNLOCK (crawler_array_lock
);
374 recurser_idle (gpointer data
)
376 GMainContext
*context
= data
;
379 for (i
= 0; i
< 10; i
++)
380 g_main_context_iteration (context
, FALSE
);
386 recurser_start (gpointer data
)
388 GMainContext
*context
;
391 g_mutex_lock (&context_array_mutex
);
392 context
= context_array
->pdata
[g_random_int_range (0, context_array
->len
)];
393 source
= g_idle_source_new ();
394 g_source_set_name (source
, "Recursing idle source");
395 g_source_set_callback (source
, recurser_idle
, context
, NULL
);
396 g_source_attach (source
, context
);
397 g_source_unref (source
);
398 g_mutex_unlock (&context_array_mutex
);
409 context_array
= g_ptr_array_new ();
411 crawler_array
= g_ptr_array_new ();
413 main_loop
= g_main_loop_new (NULL
, FALSE
);
415 for (i
= 0; i
< NTHREADS
; i
++)
416 create_adder_thread ();
418 /* Wait for all threads to start
420 g_mutex_lock (&context_array_mutex
);
422 if (context_array
->len
< NTHREADS
)
423 g_cond_wait (&context_array_cond
, &context_array_mutex
);
425 g_mutex_unlock (&context_array_mutex
);
427 for (i
= 0; i
< NCRAWLERS
; i
++)
430 g_timeout_add (RECURSER_TIMEOUT
, recurser_start
, NULL
);
432 g_main_loop_run (main_loop
);
433 g_main_loop_unref (main_loop
);
435 g_ptr_array_unref (crawler_array
);
436 g_ptr_array_unref (context_array
);