1 #undef G_DISABLE_ASSERT
13 #include <fcntl.h> /* For _O_BINARY used by pipe() macro */
14 #include <io.h> /* for _pipe() */
15 #define pipe(fds) _pipe(fds, 4096, _O_BINARY)
22 #define CRAWLER_TIMEOUT_RANGE 40
23 #define RECURSER_TIMEOUT 50
25 /* The partial ordering between the context array mutex and
26 * crawler array mutex is that the crawler array mutex cannot
27 * be locked while the context array mutex is locked
29 GPtrArray
*context_array
;
30 GMutex context_array_mutex
;
31 GCond context_array_cond
;
35 G_LOCK_DEFINE_STATIC (crawler_array_lock
);
36 GPtrArray
*crawler_array
;
38 typedef struct _AddrData AddrData
;
39 typedef struct _TestData TestData
;
55 static void cleanup_crawlers (GMainContext
*context
);
58 read_all (GIOChannel
*channel
, char *buf
, gsize len
)
64 while (bytes_read
< len
)
66 err
= g_io_channel_read (channel
, buf
+ bytes_read
, len
- bytes_read
, &count
);
69 if (err
!= G_IO_ERROR_AGAIN
)
82 write_all (GIOChannel
*channel
, char *buf
, gsize len
)
84 gsize bytes_written
= 0;
88 while (bytes_written
< len
)
90 err
= g_io_channel_write (channel
, buf
+ bytes_written
, len
- bytes_written
, &count
);
91 if (err
&& err
!= G_IO_ERROR_AGAIN
)
94 bytes_written
+= count
;
101 adder_callback (GIOChannel
*source
,
102 GIOCondition condition
,
108 char result
[32] = { 0, };
110 AddrData
*addr_data
= data
;
112 if (!read_all (source
, buf1
, 32) ||
113 !read_all (source
, buf2
, 32))
115 g_main_loop_quit (addr_data
->loop
);
119 sprintf (result
, "%d", atoi(buf1
) + atoi(buf2
));
120 write_all (addr_data
->dest
, result
, 32);
126 timeout_callback (gpointer data
)
128 AddrData
*addr_data
= data
;
136 adder_thread (gpointer data
)
138 GMainContext
*context
;
139 GSource
*adder_source
;
140 GSource
*timeout_source
;
142 GIOChannel
**channels
= data
;
145 context
= g_main_context_new ();
147 g_mutex_lock (&context_array_mutex
);
149 g_ptr_array_add (context_array
, context
);
151 if (context_array
->len
== NTHREADS
)
152 g_cond_broadcast (&context_array_cond
);
154 g_mutex_unlock (&context_array_mutex
);
156 addr_data
.dest
= channels
[1];
157 addr_data
.loop
= g_main_loop_new (context
, FALSE
);
160 adder_source
= g_io_create_watch (channels
[0], G_IO_IN
| G_IO_HUP
);
161 g_source_set_name (adder_source
, "Adder I/O");
162 g_source_set_callback (adder_source
, (GSourceFunc
)adder_callback
, &addr_data
, NULL
);
163 g_source_attach (adder_source
, context
);
164 g_source_unref (adder_source
);
166 timeout_source
= g_timeout_source_new (10);
167 g_source_set_name (timeout_source
, "Adder timeout");
168 g_source_set_callback (timeout_source
, (GSourceFunc
)timeout_callback
, &addr_data
, NULL
);
169 g_source_set_priority (timeout_source
, G_PRIORITY_HIGH
);
170 g_source_attach (timeout_source
, context
);
171 g_source_unref (timeout_source
);
173 g_main_loop_run (addr_data
.loop
);
175 g_io_channel_unref (channels
[0]);
176 g_io_channel_unref (channels
[1]);
180 g_main_loop_unref (addr_data
.loop
);
183 g_print ("Timeout run %d times\n", addr_data
.count
);
186 g_mutex_lock (&context_array_mutex
);
187 g_ptr_array_remove (context_array
, context
);
188 if (context_array
->len
== 0)
189 g_main_loop_quit (main_loop
);
190 g_mutex_unlock (&context_array_mutex
);
192 cleanup_crawlers (context
);
193 g_main_context_unref (context
);
199 io_pipe (GIOChannel
**channels
)
205 g_warning ("Cannot create pipe %s\n", g_strerror (errno
));
209 channels
[0] = g_io_channel_unix_new (fds
[0]);
210 channels
[1] = g_io_channel_unix_new (fds
[1]);
212 g_io_channel_set_close_on_unref (channels
[0], TRUE
);
213 g_io_channel_set_close_on_unref (channels
[1], TRUE
);
217 do_add (GIOChannel
*in
, gint a
, gint b
)
219 char buf1
[32] = { 0, };
220 char buf2
[32] = { 0, };
222 sprintf (buf1
, "%d", a
);
223 sprintf (buf2
, "%d", b
);
225 write_all (in
, buf1
, 32);
226 write_all (in
, buf2
, 32);
230 adder_response (GIOChannel
*source
,
231 GIOCondition condition
,
235 TestData
*test_data
= data
;
237 if (!read_all (source
, result
, 32))
240 test_data
->current_val
= atoi (result
);
243 if (test_data
->iters
== 0)
245 if (test_data
->current_val
!= ITERS
* INCREMENT
)
247 g_print ("Addition failed: %d != %d\n",
248 test_data
->current_val
, ITERS
* INCREMENT
);
252 g_io_channel_unref (source
);
253 g_io_channel_unref (test_data
->in
);
260 do_add (test_data
->in
, test_data
->current_val
, INCREMENT
);
266 create_adder_thread (void)
271 GIOChannel
*in_channels
[2];
272 GIOChannel
*out_channels
[2];
274 GIOChannel
**sub_channels
;
276 sub_channels
= g_new (GIOChannel
*, 2);
278 io_pipe (in_channels
);
279 io_pipe (out_channels
);
281 sub_channels
[0] = in_channels
[0];
282 sub_channels
[1] = out_channels
[1];
284 g_thread_create (adder_thread
, sub_channels
, FALSE
, &err
);
288 g_warning ("Cannot create thread: %s", err
->message
);
292 test_data
= g_new (TestData
, 1);
293 test_data
->in
= in_channels
[1];
294 test_data
->current_val
= 0;
295 test_data
->iters
= ITERS
;
297 g_io_add_watch (out_channels
[0], G_IO_IN
| G_IO_HUP
,
298 adder_response
, test_data
);
300 do_add (test_data
->in
, test_data
->current_val
, INCREMENT
);
303 static void create_crawler (void);
306 remove_crawler (void)
308 GSource
*other_source
;
310 if (crawler_array
->len
> 0)
312 other_source
= crawler_array
->pdata
[g_random_int_range (0, crawler_array
->len
)];
313 g_source_destroy (other_source
);
314 g_assert (g_ptr_array_remove_fast (crawler_array
, other_source
));
319 crawler_callback (gpointer data
)
321 GSource
*source
= data
;
323 G_LOCK (crawler_array_lock
);
325 if (!g_ptr_array_remove_fast (crawler_array
, source
))
329 G_UNLOCK (crawler_array_lock
);
338 create_crawler (void)
340 GSource
*source
= g_timeout_source_new (g_random_int_range (0, CRAWLER_TIMEOUT_RANGE
));
341 g_source_set_name (source
, "Crawler timeout");
342 g_source_set_callback (source
, (GSourceFunc
)crawler_callback
, source
, NULL
);
344 G_LOCK (crawler_array_lock
);
345 g_ptr_array_add (crawler_array
, source
);
347 g_mutex_lock (&context_array_mutex
);
348 g_source_attach (source
, context_array
->pdata
[g_random_int_range (0, context_array
->len
)]);
349 g_source_unref (source
);
350 g_mutex_unlock (&context_array_mutex
);
352 G_UNLOCK (crawler_array_lock
);
356 cleanup_crawlers (GMainContext
*context
)
360 G_LOCK (crawler_array_lock
);
361 for (i
=0; i
< crawler_array
->len
; i
++)
363 if (g_source_get_context (crawler_array
->pdata
[i
]) == context
)
365 g_source_destroy (g_ptr_array_remove_index (crawler_array
, i
));
369 G_UNLOCK (crawler_array_lock
);
373 recurser_idle (gpointer data
)
375 GMainContext
*context
= data
;
378 for (i
= 0; i
< 10; i
++)
379 g_main_context_iteration (context
, FALSE
);
385 recurser_start (gpointer data
)
387 GMainContext
*context
;
390 g_mutex_lock (&context_array_mutex
);
391 context
= context_array
->pdata
[g_random_int_range (0, context_array
->len
)];
392 source
= g_idle_source_new ();
393 g_source_set_name (source
, "Recursing idle source");
394 g_source_set_callback (source
, recurser_idle
, context
, NULL
);
395 g_source_attach (source
, context
);
396 g_source_unref (source
);
397 g_mutex_unlock (&context_array_mutex
);
408 context_array
= g_ptr_array_new ();
410 crawler_array
= g_ptr_array_new ();
412 main_loop
= g_main_loop_new (NULL
, FALSE
);
414 for (i
= 0; i
< NTHREADS
; i
++)
415 create_adder_thread ();
417 /* Wait for all threads to start
419 g_mutex_lock (&context_array_mutex
);
421 if (context_array
->len
< NTHREADS
)
422 g_cond_wait (&context_array_cond
, &context_array_mutex
);
424 g_mutex_unlock (&context_array_mutex
);
426 for (i
= 0; i
< NCRAWLERS
; i
++)
429 g_timeout_add (RECURSER_TIMEOUT
, recurser_start
, NULL
);
431 g_main_loop_run (main_loop
);
432 g_main_loop_unref (main_loop
);
434 g_ptr_array_unref (crawler_array
);
435 g_ptr_array_unref (context_array
);