1 /* This software was written by Dirk Engling <erdgeist@erdgeist.org>
2 It is considered beerware. Prost. Skol. Cheers or whatever.
22 #include "trackerlogic.h"
24 /* #define MTX_DBG( STRING ) fprintf( stderr, STRING ) */
25 #define MTX_DBG(STRING)
27 /* Our global all torrents list */
28 static ot_vector all_torrents
[OT_BUCKET_COUNT
];
29 static pthread_mutex_t bucket_mutex
[OT_BUCKET_COUNT
];
30 static size_t g_torrent_count
;
32 /* Self pipe from opentracker.c */
33 extern int g_self_pipe
[2];
35 ot_vector
*mutex_bucket_lock(int bucket
) {
36 pthread_mutex_lock(bucket_mutex
+ bucket
);
37 return all_torrents
+ bucket
;
40 ot_vector
*mutex_bucket_lock_by_hash(ot_hash
const hash
) { return mutex_bucket_lock(uint32_read_big((const char *)hash
) >> OT_BUCKET_COUNT_SHIFT
); }
42 void mutex_bucket_unlock(int bucket
, int delta_torrentcount
) {
43 pthread_mutex_unlock(bucket_mutex
+ bucket
);
44 g_torrent_count
+= delta_torrentcount
;
47 void mutex_bucket_unlock_by_hash(ot_hash
const hash
, int delta_torrentcount
) {
48 mutex_bucket_unlock(uint32_read_big((char *)hash
) >> OT_BUCKET_COUNT_SHIFT
, delta_torrentcount
);
51 size_t mutex_get_torrent_count() { return g_torrent_count
; }
64 static ot_taskid next_free_taskid
= 1;
65 static struct ot_task
*tasklist
;
66 static pthread_mutex_t tasklist_mutex
;
67 static pthread_cond_t tasklist_being_filled
;
69 int mutex_workqueue_pushtask(int64 sock
, ot_tasktype tasktype
) {
70 struct ot_task
**tmptask
, *task
;
72 task
= malloc(sizeof(struct ot_task
));
77 task
->tasktype
= tasktype
;
79 task
->iovec_entries
= 0;
83 /* Want exclusive access to tasklist */
84 pthread_mutex_lock(&tasklist_mutex
);
86 /* Skip to end of list */
89 tmptask
= &(*tmptask
)->next
;
92 /* Inform waiting workers and release lock */
93 pthread_cond_broadcast(&tasklist_being_filled
);
94 pthread_mutex_unlock(&tasklist_mutex
);
98 void mutex_workqueue_canceltask(int64 sock
) {
99 struct ot_task
**task
;
101 /* Want exclusive access to tasklist */
102 pthread_mutex_lock(&tasklist_mutex
);
104 for (task
= &tasklist
; *task
; task
= &((*task
)->next
))
105 if ((*task
)->sock
== sock
) {
106 struct iovec
*iovec
= (*task
)->iovec
;
107 struct ot_task
*ptask
= *task
;
110 /* Free task's iovec */
111 for (i
= 0; i
< (*task
)->iovec_entries
; ++i
)
112 free(iovec
[i
].iov_base
);
114 *task
= (*task
)->next
;
120 pthread_mutex_unlock(&tasklist_mutex
);
123 ot_taskid
mutex_workqueue_poptask(ot_tasktype
*tasktype
) {
124 struct ot_task
*task
;
125 ot_taskid taskid
= 0;
127 /* Want exclusive access to tasklist */
128 pthread_mutex_lock(&tasklist_mutex
);
131 /* Skip to the first unassigned task this worker wants to do */
132 for (task
= tasklist
; task
; task
= task
->next
)
133 if (!task
->taskid
&& (TASK_CLASS_MASK
& task
->tasktype
) == *tasktype
) {
134 /* If we found an outstanding task, assign a taskid to it
135 and leave the loop */
136 task
->taskid
= taskid
= ++next_free_taskid
;
137 *tasktype
= task
->tasktype
;
141 /* Wait until the next task is being fed */
143 pthread_cond_wait(&tasklist_being_filled
, &tasklist_mutex
);
147 pthread_mutex_unlock(&tasklist_mutex
);
152 void mutex_workqueue_pushsuccess(ot_taskid taskid
) {
153 struct ot_task
**task
;
155 /* Want exclusive access to tasklist */
156 pthread_mutex_lock(&tasklist_mutex
);
158 for (task
= &tasklist
; *task
; task
= &((*task
)->next
))
159 if ((*task
)->taskid
== taskid
) {
160 struct ot_task
*ptask
= *task
;
161 *task
= (*task
)->next
;
167 pthread_mutex_unlock(&tasklist_mutex
);
170 int mutex_workqueue_pushresult(ot_taskid taskid
, int iovec_entries
, struct iovec
*iovec
) {
171 struct ot_task
*task
;
172 const char byte
= 'o';
174 /* Want exclusive access to tasklist */
175 pthread_mutex_lock(&tasklist_mutex
);
177 for (task
= tasklist
; task
; task
= task
->next
)
178 if (task
->taskid
== taskid
) {
179 task
->iovec_entries
= iovec_entries
;
181 task
->tasktype
= TASK_DONE
;
186 pthread_mutex_unlock(&tasklist_mutex
);
188 io_trywrite(g_self_pipe
[1], &byte
, 1);
190 /* Indicate whether the worker has to throw away results */
191 return task
? 0 : -1;
194 int mutex_workqueue_pushchunked(ot_taskid taskid
, struct iovec
*iovec
) {
195 struct ot_task
*task
;
196 const char byte
= 'o';
198 /* Want exclusive access to tasklist */
199 pthread_mutex_lock(&tasklist_mutex
);
201 for (task
= tasklist
; task
; task
= task
->next
)
202 if (task
->taskid
== taskid
) {
204 if (iovec_append(&task
->iovec_entries
, &task
->iovec
, iovec
))
205 task
->tasktype
= TASK_DONE_PARTIAL
;
209 task
->tasktype
= TASK_DONE
;
214 pthread_mutex_unlock(&tasklist_mutex
);
216 io_trywrite(g_self_pipe
[1], &byte
, 1);
218 /* Indicate whether the worker has to throw away results */
219 return task
? 0 : -1;
222 int64
mutex_workqueue_popresult(int *iovec_entries
, struct iovec
**iovec
, int *is_partial
) {
223 struct ot_task
**task
;
228 /* Want exclusive access to tasklist */
229 pthread_mutex_lock(&tasklist_mutex
);
231 for (task
= &tasklist
; *task
; task
= &((*task
)->next
))
232 if (((*task
)->tasktype
& TASK_CLASS_MASK
) == TASK_DONE
) {
233 struct ot_task
*ptask
= *task
;
234 *iovec_entries
= ptask
->iovec_entries
;
235 *iovec
= ptask
->iovec
;
238 if ((*task
)->tasktype
== TASK_DONE
) {
242 ptask
->iovec_entries
= 0;
245 /* Prevent task from showing up immediately again unless new data was added */
246 (*task
)->tasktype
= TASK_FULLSCRAPE
;
252 pthread_mutex_unlock(&tasklist_mutex
);
258 pthread_mutex_init(&tasklist_mutex
, NULL
);
259 pthread_cond_init(&tasklist_being_filled
, NULL
);
260 for (i
= 0; i
< OT_BUCKET_COUNT
; ++i
)
261 pthread_mutex_init(bucket_mutex
+ i
, NULL
);
262 byte_zero(all_torrents
, sizeof(all_torrents
));
265 void mutex_deinit() {
267 for (i
= 0; i
< OT_BUCKET_COUNT
; ++i
)
268 pthread_mutex_destroy(bucket_mutex
+ i
);
269 pthread_mutex_destroy(&tasklist_mutex
);
270 pthread_cond_destroy(&tasklist_being_filled
);
271 byte_zero(all_torrents
, sizeof(all_torrents
));