address linter warnings. Thanks to gagath@debian.org
[opentracker.git] / ot_mutex.c
blob3011987573b21306af7880eaf37484b7c8cf48bc
1 /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
2 It is considered beerware. Prost. Skol. Cheers or whatever.
4 $id$ */
6 /* System */
7 #include <pthread.h>
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <sys/mman.h>
11 #include <sys/uio.h>
13 /* Libowfat */
14 #include "byte.h"
15 #include "io.h"
16 #include "uint32.h"
18 /* Opentracker */
19 #include "ot_iovec.h"
20 #include "ot_mutex.h"
21 #include "ot_stats.h"
22 #include "trackerlogic.h"
24 /* #define MTX_DBG( STRING ) fprintf( stderr, STRING ) */
25 #define MTX_DBG(STRING)
27 /* Our global all torrents list */
28 static ot_vector all_torrents[OT_BUCKET_COUNT];
29 static pthread_mutex_t bucket_mutex[OT_BUCKET_COUNT];
30 static size_t g_torrent_count;
32 /* Self pipe from opentracker.c */
33 extern int g_self_pipe[2];
35 ot_vector *mutex_bucket_lock(int bucket) {
36 pthread_mutex_lock(bucket_mutex + bucket);
37 return all_torrents + bucket;
40 ot_vector *mutex_bucket_lock_by_hash(ot_hash const hash) { return mutex_bucket_lock(uint32_read_big((const char *)hash) >> OT_BUCKET_COUNT_SHIFT); }
42 void mutex_bucket_unlock(int bucket, int delta_torrentcount) {
43 pthread_mutex_unlock(bucket_mutex + bucket);
44 g_torrent_count += delta_torrentcount;
47 void mutex_bucket_unlock_by_hash(ot_hash const hash, int delta_torrentcount) {
48 mutex_bucket_unlock(uint32_read_big((char *)hash) >> OT_BUCKET_COUNT_SHIFT, delta_torrentcount);
51 size_t mutex_get_torrent_count() { return g_torrent_count; }
53 /* TaskQueue Magic */
55 struct ot_task {
56 ot_taskid taskid;
57 ot_tasktype tasktype;
58 int64 sock;
59 int iovec_entries;
60 struct iovec *iovec;
61 struct ot_task *next;
64 static ot_taskid next_free_taskid = 1;
65 static struct ot_task *tasklist;
66 static pthread_mutex_t tasklist_mutex;
67 static pthread_cond_t tasklist_being_filled;
69 int mutex_workqueue_pushtask(int64 sock, ot_tasktype tasktype) {
70 struct ot_task **tmptask, *task;
72 task = malloc(sizeof(struct ot_task));
73 if (!task)
74 return -1;
76 task->taskid = 0;
77 task->tasktype = tasktype;
78 task->sock = sock;
79 task->iovec_entries = 0;
80 task->iovec = NULL;
81 task->next = 0;
83 /* Want exclusive access to tasklist */
84 pthread_mutex_lock(&tasklist_mutex);
86 /* Skip to end of list */
87 tmptask = &tasklist;
88 while (*tmptask)
89 tmptask = &(*tmptask)->next;
90 *tmptask = task;
92 /* Inform waiting workers and release lock */
93 pthread_cond_broadcast(&tasklist_being_filled);
94 pthread_mutex_unlock(&tasklist_mutex);
95 return 0;
98 void mutex_workqueue_canceltask(int64 sock) {
99 struct ot_task **task;
101 /* Want exclusive access to tasklist */
102 pthread_mutex_lock(&tasklist_mutex);
104 for (task = &tasklist; *task; task = &((*task)->next))
105 if ((*task)->sock == sock) {
106 struct iovec *iovec = (*task)->iovec;
107 struct ot_task *ptask = *task;
108 int i;
110 /* Free task's iovec */
111 for (i = 0; i < (*task)->iovec_entries; ++i)
112 free(iovec[i].iov_base);
114 *task = (*task)->next;
115 free(ptask);
116 break;
119 /* Release lock */
120 pthread_mutex_unlock(&tasklist_mutex);
123 ot_taskid mutex_workqueue_poptask(ot_tasktype *tasktype) {
124 struct ot_task *task;
125 ot_taskid taskid = 0;
127 /* Want exclusive access to tasklist */
128 pthread_mutex_lock(&tasklist_mutex);
130 while (!taskid) {
131 /* Skip to the first unassigned task this worker wants to do */
132 for (task = tasklist; task; task = task->next)
133 if (!task->taskid && (TASK_CLASS_MASK & task->tasktype) == *tasktype) {
134 /* If we found an outstanding task, assign a taskid to it
135 and leave the loop */
136 task->taskid = taskid = ++next_free_taskid;
137 *tasktype = task->tasktype;
138 break;
141 /* Wait until the next task is being fed */
142 if (!taskid)
143 pthread_cond_wait(&tasklist_being_filled, &tasklist_mutex);
146 /* Release lock */
147 pthread_mutex_unlock(&tasklist_mutex);
149 return taskid;
152 void mutex_workqueue_pushsuccess(ot_taskid taskid) {
153 struct ot_task **task;
155 /* Want exclusive access to tasklist */
156 pthread_mutex_lock(&tasklist_mutex);
158 for (task = &tasklist; *task; task = &((*task)->next))
159 if ((*task)->taskid == taskid) {
160 struct ot_task *ptask = *task;
161 *task = (*task)->next;
162 free(ptask);
163 break;
166 /* Release lock */
167 pthread_mutex_unlock(&tasklist_mutex);
170 int mutex_workqueue_pushresult(ot_taskid taskid, int iovec_entries, struct iovec *iovec) {
171 struct ot_task *task;
172 const char byte = 'o';
174 /* Want exclusive access to tasklist */
175 pthread_mutex_lock(&tasklist_mutex);
177 for (task = tasklist; task; task = task->next)
178 if (task->taskid == taskid) {
179 task->iovec_entries = iovec_entries;
180 task->iovec = iovec;
181 task->tasktype = TASK_DONE;
182 break;
185 /* Release lock */
186 pthread_mutex_unlock(&tasklist_mutex);
188 io_trywrite(g_self_pipe[1], &byte, 1);
190 /* Indicate whether the worker has to throw away results */
191 return task ? 0 : -1;
194 int mutex_workqueue_pushchunked(ot_taskid taskid, struct iovec *iovec) {
195 struct ot_task *task;
196 const char byte = 'o';
198 /* Want exclusive access to tasklist */
199 pthread_mutex_lock(&tasklist_mutex);
201 for (task = tasklist; task; task = task->next)
202 if (task->taskid == taskid) {
203 if (iovec) {
204 if (iovec_append(&task->iovec_entries, &task->iovec, iovec))
205 task->tasktype = TASK_DONE_PARTIAL;
206 else
207 task = NULL;
208 } else
209 task->tasktype = TASK_DONE;
210 break;
213 /* Release lock */
214 pthread_mutex_unlock(&tasklist_mutex);
216 io_trywrite(g_self_pipe[1], &byte, 1);
218 /* Indicate whether the worker has to throw away results */
219 return task ? 0 : -1;
222 int64 mutex_workqueue_popresult(int *iovec_entries, struct iovec **iovec, int *is_partial) {
223 struct ot_task **task;
224 int64 sock = -1;
226 *is_partial = 0;
228 /* Want exclusive access to tasklist */
229 pthread_mutex_lock(&tasklist_mutex);
231 for (task = &tasklist; *task; task = &((*task)->next))
232 if (((*task)->tasktype & TASK_CLASS_MASK) == TASK_DONE) {
233 struct ot_task *ptask = *task;
234 *iovec_entries = ptask->iovec_entries;
235 *iovec = ptask->iovec;
236 sock = ptask->sock;
238 if ((*task)->tasktype == TASK_DONE) {
239 *task = ptask->next;
240 free(ptask);
241 } else {
242 ptask->iovec_entries = 0;
243 ptask->iovec = NULL;
244 *is_partial = 1;
245 /* Prevent task from showing up immediately again unless new data was added */
246 (*task)->tasktype = TASK_FULLSCRAPE;
248 break;
251 /* Release lock */
252 pthread_mutex_unlock(&tasklist_mutex);
253 return sock;
256 void mutex_init() {
257 int i;
258 pthread_mutex_init(&tasklist_mutex, NULL);
259 pthread_cond_init(&tasklist_being_filled, NULL);
260 for (i = 0; i < OT_BUCKET_COUNT; ++i)
261 pthread_mutex_init(bucket_mutex + i, NULL);
262 byte_zero(all_torrents, sizeof(all_torrents));
265 void mutex_deinit() {
266 int i;
267 for (i = 0; i < OT_BUCKET_COUNT; ++i)
268 pthread_mutex_destroy(bucket_mutex + i);
269 pthread_mutex_destroy(&tasklist_mutex);
270 pthread_cond_destroy(&tasklist_being_filled);
271 byte_zero(all_torrents, sizeof(all_torrents));