modified: src1/input.c
[GalaxyCodeBases.git] / c_cpp / lib / klib / kthread.c
blob986f695de6fe72d31a9283120ed74374d1f29464
1 #include <pthread.h>
2 #include <stdlib.h>
3 #include <limits.h>
5 /************
6 * kt_for() *
7 ************/
9 struct kt_for_t;
11 typedef struct {
12 struct kt_for_t *t;
13 long i;
14 } ktf_worker_t;
16 typedef struct kt_for_t {
17 int n_threads;
18 long n;
19 ktf_worker_t *w;
20 void (*func)(void*,long,int);
21 void *data;
22 } kt_for_t;
24 static inline long steal_work(kt_for_t *t)
26 int i, min_i = -1;
27 long k, min = LONG_MAX;
28 for (i = 0; i < t->n_threads; ++i)
29 if (min > t->w[i].i) min = t->w[i].i, min_i = i;
30 k = __sync_fetch_and_add(&t->w[min_i].i, t->n_threads);
31 return k >= t->n? -1 : k;
34 static void *ktf_worker(void *data)
36 ktf_worker_t *w = (ktf_worker_t*)data;
37 long i;
38 for (;;) {
39 i = __sync_fetch_and_add(&w->i, w->t->n_threads);
40 if (i >= w->t->n) break;
41 w->t->func(w->t->data, i, w - w->t->w);
43 while ((i = steal_work(w->t)) >= 0)
44 w->t->func(w->t->data, i, w - w->t->w);
45 pthread_exit(0);
48 void kt_for(int n_threads, void (*func)(void*,long,int), void *data, long n)
50 if (n_threads > 1) {
51 int i;
52 kt_for_t t;
53 pthread_t *tid;
54 t.func = func, t.data = data, t.n_threads = n_threads, t.n = n;
55 t.w = (ktf_worker_t*)alloca(n_threads * sizeof(ktf_worker_t));
56 tid = (pthread_t*)alloca(n_threads * sizeof(pthread_t));
57 for (i = 0; i < n_threads; ++i)
58 t.w[i].t = &t, t.w[i].i = i;
59 for (i = 0; i < n_threads; ++i) pthread_create(&tid[i], 0, ktf_worker, &t.w[i]);
60 for (i = 0; i < n_threads; ++i) pthread_join(tid[i], 0);
61 } else {
62 long j;
63 for (j = 0; j < n; ++j) func(data, j, 0);
67 /***************************
68 * kt_for with thread pool *
69 ***************************/
71 struct kt_forpool_t;
73 typedef struct {
74 struct kt_forpool_t *t;
75 long i;
76 int action;
77 } kto_worker_t;
79 typedef struct kt_forpool_t {
80 int n_threads, n_pending;
81 long n;
82 pthread_t *tid;
83 kto_worker_t *w;
84 void (*func)(void*,long,int);
85 void *data;
86 pthread_mutex_t mutex;
87 pthread_cond_t cv_m, cv_s;
88 } kt_forpool_t;
90 static inline long kt_fp_steal_work(kt_forpool_t *t)
92 int i, min_i = -1;
93 long k, min = LONG_MAX;
94 for (i = 0; i < t->n_threads; ++i)
95 if (min > t->w[i].i) min = t->w[i].i, min_i = i;
96 k = __sync_fetch_and_add(&t->w[min_i].i, t->n_threads);
97 return k >= t->n? -1 : k;
100 static void *kt_fp_worker(void *data)
102 kto_worker_t *w = (kto_worker_t*)data;
103 kt_forpool_t *fp = w->t;
104 for (;;) {
105 long i;
106 int action;
107 pthread_mutex_lock(&fp->mutex);
108 if (--fp->n_pending == 0)
109 pthread_cond_signal(&fp->cv_m);
110 w->action = 0;
111 while (w->action == 0) pthread_cond_wait(&fp->cv_s, &fp->mutex);
112 action = w->action;
113 pthread_mutex_unlock(&fp->mutex);
114 if (action < 0) break;
115 for (;;) { // process jobs allocated to this worker
116 i = __sync_fetch_and_add(&w->i, fp->n_threads);
117 if (i >= fp->n) break;
118 fp->func(fp->data, i, w - fp->w);
120 while ((i = kt_fp_steal_work(fp)) >= 0) // steal jobs allocated to other workers
121 fp->func(fp->data, i, w - fp->w);
123 pthread_exit(0);
126 void *kt_forpool_init(int n_threads)
128 kt_forpool_t *fp;
129 int i;
130 fp = (kt_forpool_t*)calloc(1, sizeof(kt_forpool_t));
131 fp->n_threads = fp->n_pending = n_threads;
132 fp->tid = (pthread_t*)calloc(fp->n_threads, sizeof(pthread_t));
133 fp->w = (kto_worker_t*)calloc(fp->n_threads, sizeof(kto_worker_t));
134 for (i = 0; i < fp->n_threads; ++i) fp->w[i].t = fp;
135 pthread_mutex_init(&fp->mutex, 0);
136 pthread_cond_init(&fp->cv_m, 0);
137 pthread_cond_init(&fp->cv_s, 0);
138 for (i = 0; i < fp->n_threads; ++i) pthread_create(&fp->tid[i], 0, kt_fp_worker, &fp->w[i]);
139 pthread_mutex_lock(&fp->mutex);
140 while (fp->n_pending) pthread_cond_wait(&fp->cv_m, &fp->mutex);
141 pthread_mutex_unlock(&fp->mutex);
142 return fp;
145 void kt_forpool_destroy(void *_fp)
147 kt_forpool_t *fp = (kt_forpool_t*)_fp;
148 int i;
149 for (i = 0; i < fp->n_threads; ++i) fp->w[i].action = -1;
150 pthread_cond_broadcast(&fp->cv_s);
151 for (i = 0; i < fp->n_threads; ++i) pthread_join(fp->tid[i], 0);
152 pthread_cond_destroy(&fp->cv_s);
153 pthread_cond_destroy(&fp->cv_m);
154 pthread_mutex_destroy(&fp->mutex);
155 free(fp->w); free(fp->tid); free(fp);
158 void kt_forpool(void *_fp, void (*func)(void*,long,int), void *data, long n)
160 kt_forpool_t *fp = (kt_forpool_t*)_fp;
161 long i;
162 if (fp && fp->n_threads > 1) {
163 fp->n = n, fp->func = func, fp->data = data, fp->n_pending = fp->n_threads;
164 for (i = 0; i < fp->n_threads; ++i) fp->w[i].i = i, fp->w[i].action = 1;
165 pthread_mutex_lock(&fp->mutex);
166 pthread_cond_broadcast(&fp->cv_s);
167 while (fp->n_pending) pthread_cond_wait(&fp->cv_m, &fp->mutex);
168 pthread_mutex_unlock(&fp->mutex);
169 } else for (i = 0; i < n; ++i) func(data, i, 0);
172 /*****************
173 * kt_pipeline() *
174 *****************/
176 struct ktp_t;
178 typedef struct {
179 struct ktp_t *pl;
180 int64_t index;
181 int step;
182 void *data;
183 } ktp_worker_t;
185 typedef struct ktp_t {
186 void *shared;
187 void *(*func)(void*, int, void*);
188 int64_t index;
189 int n_workers, n_steps;
190 ktp_worker_t *workers;
191 pthread_mutex_t mutex;
192 pthread_cond_t cv;
193 } ktp_t;
195 static void *ktp_worker(void *data)
197 ktp_worker_t *w = (ktp_worker_t*)data;
198 ktp_t *p = w->pl;
199 while (w->step < p->n_steps) {
200 // test whether we can kick off the job with this worker
201 pthread_mutex_lock(&p->mutex);
202 for (;;) {
203 int i;
204 // test whether another worker is doing the same step
205 for (i = 0; i < p->n_workers; ++i) {
206 if (w == &p->workers[i]) continue; // ignore itself
207 if (p->workers[i].step <= w->step && p->workers[i].index < w->index)
208 break;
210 if (i == p->n_workers) break; // no workers with smaller indices are doing w->step or the previous steps
211 pthread_cond_wait(&p->cv, &p->mutex);
213 pthread_mutex_unlock(&p->mutex);
215 // working on w->step
216 w->data = p->func(p->shared, w->step, w->step? w->data : 0); // for the first step, input is NULL
218 // update step and let other workers know
219 pthread_mutex_lock(&p->mutex);
220 w->step = w->step == p->n_steps - 1 || w->data? (w->step + 1) % p->n_steps : p->n_steps;
221 if (w->step == 0) w->index = p->index++;
222 pthread_cond_broadcast(&p->cv);
223 pthread_mutex_unlock(&p->mutex);
225 pthread_exit(0);
228 void kt_pipeline(int n_threads, void *(*func)(void*, int, void*), void *shared_data, int n_steps)
230 ktp_t aux;
231 pthread_t *tid;
232 int i;
234 if (n_threads < 1) n_threads = 1;
235 aux.n_workers = n_threads;
236 aux.n_steps = n_steps;
237 aux.func = func;
238 aux.shared = shared_data;
239 aux.index = 0;
240 pthread_mutex_init(&aux.mutex, 0);
241 pthread_cond_init(&aux.cv, 0);
243 aux.workers = (ktp_worker_t*)alloca(n_threads * sizeof(ktp_worker_t));
244 for (i = 0; i < n_threads; ++i) {
245 ktp_worker_t *w = &aux.workers[i];
246 w->step = 0; w->pl = &aux; w->data = 0;
247 w->index = aux.index++;
250 tid = (pthread_t*)alloca(n_threads * sizeof(pthread_t));
251 for (i = 0; i < n_threads; ++i) pthread_create(&tid[i], 0, ktp_worker, &aux.workers[i]);
252 for (i = 0; i < n_threads; ++i) pthread_join(tid[i], 0);
254 pthread_mutex_destroy(&aux.mutex);
255 pthread_cond_destroy(&aux.cv);