Move markup collect tests to the test framework
[glib.git] / gio / gioscheduler.c
blob61e1afcca36027ecd8b23a101c85e4a76ed8d074
1 /* GIO - GLib Input, Output and Streaming Library
2 *
3 * Copyright (C) 2006-2007 Red Hat, Inc.
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18 * Boston, MA 02111-1307, USA.
20 * Author: Alexander Larsson <alexl@redhat.com>
23 #include "config.h"
25 #include "gioscheduler.h"
26 #include "gcancellable.h"
29 /**
30 * SECTION:gioscheduler
31 * @short_description: I/O Scheduler
32 * @include: gio/gio.h
34 * Schedules asynchronous I/O operations. #GIOScheduler integrates
35 * into the main event loop (#GMainLoop) and may use threads if they
36 * are available.
38 * <para id="io-priority"><indexterm><primary>I/O priority</primary></indexterm>
39 * Each I/O operation has a priority, and the scheduler uses the priorities
40 * to determine the order in which operations are executed. They are
41 * <emphasis>not</emphasis> used to determine system-wide I/O scheduling.
42 * Priorities are integers, with lower numbers indicating higher priority.
43 * It is recommended to choose priorities between %G_PRIORITY_LOW and
44 * %G_PRIORITY_HIGH, with %G_PRIORITY_DEFAULT as a default.
45 * </para>
46 **/
48 struct _GIOSchedulerJob {
49 GSList *active_link;
50 GIOSchedulerJobFunc job_func;
51 GSourceFunc cancel_func; /* Runs under job map lock */
52 gpointer data;
53 GDestroyNotify destroy_notify;
55 gint io_priority;
56 GCancellable *cancellable;
57 GMainContext *context;
59 guint idle_tag;
62 G_LOCK_DEFINE_STATIC(active_jobs);
63 static GSList *active_jobs = NULL;
65 static GThreadPool *job_thread_pool = NULL;
67 static void io_job_thread (gpointer data,
68 gpointer user_data);
70 static void
71 g_io_job_free (GIOSchedulerJob *job)
73 if (job->cancellable)
74 g_object_unref (job->cancellable);
75 if (job->context)
76 g_main_context_unref (job->context);
77 g_free (job);
80 static gint
81 g_io_job_compare (gconstpointer a,
82 gconstpointer b,
83 gpointer user_data)
85 const GIOSchedulerJob *aa = a;
86 const GIOSchedulerJob *bb = b;
88 /* Cancelled jobs are set prio == -1, so that
89 they are executed as quickly as possible */
91 /* Lower value => higher priority */
92 if (aa->io_priority < bb->io_priority)
93 return -1;
94 if (aa->io_priority == bb->io_priority)
95 return 0;
96 return 1;
99 static gpointer
100 init_scheduler (gpointer arg)
102 if (job_thread_pool == NULL)
104 /* TODO: thread_pool_new can fail */
105 job_thread_pool = g_thread_pool_new (io_job_thread,
106 NULL,
108 FALSE,
109 NULL);
110 if (job_thread_pool != NULL)
112 g_thread_pool_set_sort_function (job_thread_pool,
113 g_io_job_compare,
114 NULL);
115 /* It's kinda weird that this is a global setting
116 * instead of per threadpool. However, we really
117 * want to cache some threads, but not keep around
118 * those threads forever. */
119 g_thread_pool_set_max_idle_time (15 * 1000);
120 g_thread_pool_set_max_unused_threads (2);
123 return NULL;
126 static void
127 remove_active_job (GIOSchedulerJob *job)
129 GIOSchedulerJob *other_job;
130 GSList *l;
131 gboolean resort_jobs;
133 G_LOCK (active_jobs);
134 active_jobs = g_slist_delete_link (active_jobs, job->active_link);
136 resort_jobs = FALSE;
137 for (l = active_jobs; l != NULL; l = l->next)
139 other_job = l->data;
140 if (other_job->io_priority >= 0 &&
141 g_cancellable_is_cancelled (other_job->cancellable))
143 other_job->io_priority = -1;
144 resort_jobs = TRUE;
147 G_UNLOCK (active_jobs);
149 if (resort_jobs &&
150 job_thread_pool != NULL)
151 g_thread_pool_set_sort_function (job_thread_pool,
152 g_io_job_compare,
153 NULL);
157 static void
158 job_destroy (gpointer data)
160 GIOSchedulerJob *job = data;
162 if (job->destroy_notify)
163 job->destroy_notify (job->data);
165 remove_active_job (job);
166 g_io_job_free (job);
169 static void
170 io_job_thread (gpointer data,
171 gpointer user_data)
173 GIOSchedulerJob *job = data;
174 gboolean result;
176 if (job->cancellable)
177 g_cancellable_push_current (job->cancellable);
181 result = job->job_func (job, job->cancellable, job->data);
183 while (result);
185 if (job->cancellable)
186 g_cancellable_pop_current (job->cancellable);
188 job_destroy (job);
191 static gboolean
192 run_job_at_idle (gpointer data)
194 GIOSchedulerJob *job = data;
195 gboolean result;
197 if (job->cancellable)
198 g_cancellable_push_current (job->cancellable);
200 result = job->job_func (job, job->cancellable, job->data);
202 if (job->cancellable)
203 g_cancellable_pop_current (job->cancellable);
205 return result;
209 * g_io_scheduler_push_job:
210 * @job_func: a #GIOSchedulerJobFunc.
211 * @user_data: data to pass to @job_func
212 * @notify: a #GDestroyNotify for @user_data, or %NULL
213 * @io_priority: the <link linkend="gioscheduler">I/O priority</link>
214 * of the request.
215 * @cancellable: optional #GCancellable object, %NULL to ignore.
217 * Schedules the I/O job to run.
219 * @notify will be called on @user_data after @job_func has returned,
220 * regardless whether the job was cancelled or has run to completion.
222 * If @cancellable is not %NULL, it can be used to cancel the I/O job
223 * by calling g_cancellable_cancel() or by calling
224 * g_io_scheduler_cancel_all_jobs().
226 void
227 g_io_scheduler_push_job (GIOSchedulerJobFunc job_func,
228 gpointer user_data,
229 GDestroyNotify notify,
230 gint io_priority,
231 GCancellable *cancellable)
233 static GOnce once_init = G_ONCE_INIT;
234 GIOSchedulerJob *job;
236 g_return_if_fail (job_func != NULL);
238 job = g_new0 (GIOSchedulerJob, 1);
239 job->job_func = job_func;
240 job->data = user_data;
241 job->destroy_notify = notify;
242 job->io_priority = io_priority;
244 if (cancellable)
245 job->cancellable = g_object_ref (cancellable);
247 job->context = g_main_context_get_thread_default ();
248 if (job->context)
249 g_main_context_ref (job->context);
251 G_LOCK (active_jobs);
252 active_jobs = g_slist_prepend (active_jobs, job);
253 job->active_link = active_jobs;
254 G_UNLOCK (active_jobs);
256 if (g_thread_supported())
258 g_once (&once_init, init_scheduler, NULL);
259 g_thread_pool_push (job_thread_pool, job, NULL);
261 else
263 /* Threads not available, instead do the i/o sync inside a
264 * low prio idle handler
266 job->idle_tag = g_idle_add_full (io_priority,
267 run_job_at_idle,
268 job, job_destroy);
273 * g_io_scheduler_cancel_all_jobs:
275 * Cancels all cancellable I/O jobs.
277 * A job is cancellable if a #GCancellable was passed into
278 * g_io_scheduler_push_job().
280 void
281 g_io_scheduler_cancel_all_jobs (void)
283 GSList *cancellable_list, *l;
285 G_LOCK (active_jobs);
286 cancellable_list = NULL;
287 for (l = active_jobs; l != NULL; l = l->next)
289 GIOSchedulerJob *job = l->data;
290 if (job->cancellable)
291 cancellable_list = g_slist_prepend (cancellable_list,
292 g_object_ref (job->cancellable));
294 G_UNLOCK (active_jobs);
296 for (l = cancellable_list; l != NULL; l = l->next)
298 GCancellable *c = l->data;
299 g_cancellable_cancel (c);
300 g_object_unref (c);
302 g_slist_free (cancellable_list);
305 typedef struct {
306 GSourceFunc func;
307 gboolean ret_val;
308 gpointer data;
309 GDestroyNotify notify;
311 GMutex *ack_lock;
312 GCond *ack_condition;
313 } MainLoopProxy;
315 static gboolean
316 mainloop_proxy_func (gpointer data)
318 MainLoopProxy *proxy = data;
320 proxy->ret_val = proxy->func (proxy->data);
322 if (proxy->notify)
323 proxy->notify (proxy->data);
325 if (proxy->ack_lock)
327 g_mutex_lock (proxy->ack_lock);
328 g_cond_signal (proxy->ack_condition);
329 g_mutex_unlock (proxy->ack_lock);
332 return FALSE;
335 static void
336 mainloop_proxy_free (MainLoopProxy *proxy)
338 if (proxy->ack_lock)
340 g_mutex_free (proxy->ack_lock);
341 g_cond_free (proxy->ack_condition);
344 g_free (proxy);
348 * g_io_scheduler_job_send_to_mainloop:
349 * @job: a #GIOSchedulerJob
350 * @func: a #GSourceFunc callback that will be called in the original thread
351 * @user_data: data to pass to @func
352 * @notify: a #GDestroyNotify for @user_data, or %NULL
354 * Used from an I/O job to send a callback to be run in the thread
355 * that the job was started from, waiting for the result (and thus
356 * blocking the I/O job).
358 * Returns: The return value of @func
360 gboolean
361 g_io_scheduler_job_send_to_mainloop (GIOSchedulerJob *job,
362 GSourceFunc func,
363 gpointer user_data,
364 GDestroyNotify notify)
366 GSource *source;
367 MainLoopProxy *proxy;
368 gboolean ret_val;
370 g_return_val_if_fail (job != NULL, FALSE);
371 g_return_val_if_fail (func != NULL, FALSE);
373 if (job->idle_tag)
375 /* We just immediately re-enter in the case of idles (non-threads)
376 * Anything else would just deadlock. If you can't handle this, enable threads.
378 ret_val = func (user_data);
379 if (notify)
380 notify (user_data);
381 return ret_val;
384 proxy = g_new0 (MainLoopProxy, 1);
385 proxy->func = func;
386 proxy->data = user_data;
387 proxy->notify = notify;
388 proxy->ack_lock = g_mutex_new ();
389 proxy->ack_condition = g_cond_new ();
390 g_mutex_lock (proxy->ack_lock);
392 source = g_idle_source_new ();
393 g_source_set_priority (source, G_PRIORITY_DEFAULT);
394 g_source_set_callback (source, mainloop_proxy_func, proxy,
395 NULL);
397 g_source_attach (source, job->context);
398 g_source_unref (source);
400 g_cond_wait (proxy->ack_condition, proxy->ack_lock);
401 g_mutex_unlock (proxy->ack_lock);
403 ret_val = proxy->ret_val;
404 mainloop_proxy_free (proxy);
406 return ret_val;
410 * g_io_scheduler_job_send_to_mainloop_async:
411 * @job: a #GIOSchedulerJob
412 * @func: a #GSourceFunc callback that will be called in the original thread
413 * @user_data: data to pass to @func
414 * @notify: a #GDestroyNotify for @user_data, or %NULL
416 * Used from an I/O job to send a callback to be run asynchronously in
417 * the thread that the job was started from. The callback will be run
418 * when the main loop is available, but at that time the I/O job might
419 * have finished. The return value from the callback is ignored.
421 * Note that if you are passing the @user_data from g_io_scheduler_push_job()
422 * on to this function you have to ensure that it is not freed before
423 * @func is called, either by passing %NULL as @notify to
424 * g_io_scheduler_push_job() or by using refcounting for @user_data.
426 void
427 g_io_scheduler_job_send_to_mainloop_async (GIOSchedulerJob *job,
428 GSourceFunc func,
429 gpointer user_data,
430 GDestroyNotify notify)
432 GSource *source;
433 MainLoopProxy *proxy;
435 g_return_if_fail (job != NULL);
436 g_return_if_fail (func != NULL);
438 if (job->idle_tag)
440 /* We just immediately re-enter in the case of idles (non-threads)
441 * Anything else would just deadlock. If you can't handle this, enable threads.
443 func (user_data);
444 if (notify)
445 notify (user_data);
446 return;
449 proxy = g_new0 (MainLoopProxy, 1);
450 proxy->func = func;
451 proxy->data = user_data;
452 proxy->notify = notify;
454 source = g_idle_source_new ();
455 g_source_set_priority (source, G_PRIORITY_DEFAULT);
456 g_source_set_callback (source, mainloop_proxy_func, proxy,
457 (GDestroyNotify)mainloop_proxy_free);
459 g_source_attach (source, job->context);
460 g_source_unref (source);