modified: n.fq
[GalaxyCodeBases.git] / tools / torrent / mktorrent_crc / hash_pthreads.c
blobbc9b306a81ff5182efde13ff250efce5b18d761f
1 /*
2 This file is part of mktorrent
3 Copyright (C) 2007, 2009 Emil Renner Berthing
5 mktorrent is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 mktorrent is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
19 #ifndef ALLINONE
20 #include <stdlib.h> /* exit(), malloc() */
21 #include <sys/types.h> /* off_t */
22 #include <errno.h> /* errno */
23 #include <string.h> /* strerror() */
24 #include <stdio.h> /* printf() etc. */
25 #include <fcntl.h> /* open() */
26 #include <unistd.h> /* access(), read(), close() */
27 #ifdef USE_OPENSSL
28 #include <openssl/sha.h> /* SHA1() */
29 #else
30 #include <inttypes.h>
31 #include "sha1.h"
32 #endif
33 #include <pthread.h> /* pthread functions and data structures */
35 #include "mktorrent.h"
37 #define EXPORT
38 #endif /* ALLINONE */
41 #ifndef PROGRESS_PERIOD
42 #define PROGRESS_PERIOD 200000
43 #endif
45 #ifndef O_BINARY
46 #define O_BINARY 0
47 #endif
49 struct piece_s;
50 typedef struct piece_s piece_t;
51 struct piece_s {
52 piece_t *next;
53 unsigned char *dest;
54 unsigned long len;
55 unsigned char data[1];
58 struct queue_s;
59 typedef struct queue_s queue_t;
60 struct queue_s {
61 piece_t *free;
62 piece_t *full;
63 unsigned int buffers_max;
64 unsigned int buffers;
65 pthread_mutex_t mutex_free;
66 pthread_mutex_t mutex_full;
67 pthread_cond_t cond_empty;
68 pthread_cond_t cond_full;
69 unsigned int done;
70 unsigned int pieces;
71 unsigned int pieces_hashed;
74 static piece_t *get_free(queue_t *q, size_t piece_length)
76 piece_t *r;
78 pthread_mutex_lock(&q->mutex_free);
79 if (q->free) {
80 r = q->free;
81 q->free = r->next;
82 } else if (q->buffers < q->buffers_max) {
83 r = malloc(sizeof(piece_t) - 1 + piece_length);
84 if (r == NULL) {
85 fprintf(stderr, "Out of memory.\n");
86 exit(EXIT_FAILURE);
89 q->buffers++;
90 } else {
91 while (q->free == NULL) {
92 pthread_cond_wait(&q->cond_full, &q->mutex_free);
95 r = q->free;
96 q->free = r->next;
98 pthread_mutex_unlock(&q->mutex_free);
100 return r;
103 static piece_t *get_full(queue_t *q)
105 piece_t *r;
107 pthread_mutex_lock(&q->mutex_full);
108 again:
109 if (q->full) {
110 r = q->full;
111 q->full = r->next;
112 } else if (q->done) {
113 r = NULL;
114 } else {
115 pthread_cond_wait(&q->cond_empty, &q->mutex_full);
116 goto again;
118 pthread_mutex_unlock(&q->mutex_full);
120 return r;
123 static void put_free(queue_t *q, piece_t *p, unsigned int hashed)
125 pthread_mutex_lock(&q->mutex_free);
126 p->next = q->free;
127 q->free = p;
128 q->pieces_hashed += hashed;
129 pthread_mutex_unlock(&q->mutex_free);
130 pthread_cond_signal(&q->cond_full);
133 static void put_full(queue_t *q, piece_t *p)
135 pthread_mutex_lock(&q->mutex_full);
136 p->next = q->full;
137 q->full = p;
138 pthread_mutex_unlock(&q->mutex_full);
139 pthread_cond_signal(&q->cond_empty);
142 static void set_done(queue_t *q)
144 pthread_mutex_lock(&q->mutex_full);
145 q->done = 1;
146 pthread_mutex_unlock(&q->mutex_full);
147 pthread_cond_broadcast(&q->cond_empty);
150 static void free_buffers(queue_t *q)
152 piece_t *first = q->free;
154 while (first) {
155 piece_t *p = first;
156 first = p->next;
157 free(p);
160 q->free = NULL;
164 * print the progress in a thread of its own
166 static void *print_progress(void *data)
168 queue_t *q = data;
169 int err;
171 err = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
172 if (err) {
173 fprintf(stderr, "Error setting thread cancel type: %s\n",
174 strerror(err));
175 exit(EXIT_FAILURE);
178 while (1) {
179 /* print progress and flush the buffer immediately */
180 printf("\rHashed %u of %u pieces.", q->pieces_hashed, q->pieces);
181 fflush(stdout);
182 /* now sleep for PROGRESS_PERIOD microseconds */
183 usleep(PROGRESS_PERIOD);
186 return NULL;
189 static void *worker(void *data)
191 queue_t *q = data;
192 piece_t *p;
193 SHA_CTX c;
195 while ((p = get_full(q))) {
196 SHA1_Init(&c);
197 SHA1_Update(&c, p->data, p->len);
198 SHA1_Final(p->dest, &c);
199 put_free(q, p, 1);
202 return NULL;
205 static void read_files(metafile_t *m, queue_t *q, unsigned char *pos)
207 int fd; /* file descriptor */
208 flist_t *f; /* pointer to a place in the file
209 list */
210 ssize_t r = 0; /* number of bytes read from
211 file(s) into the read buffer */
212 #ifndef NO_HASH_CHECK
213 off_t counter = 0; /* number of bytes hashed
214 should match size when done */
215 #endif
216 piece_t *p = get_free(q, m->piece_length);
218 /* go through all the files in the file list */
219 for (f = m->file_list; f; f = f->next) {
221 /* open the current file for reading */
222 #if defined _LARGEFILE_SOURCE && defined O_LARGEFILE
223 if ((fd = open(f->path, O_RDONLY | O_BINARY | O_LARGEFILE)) == -1) {
224 #else
225 if ((fd = open(f->path, O_RDONLY | O_BINARY)) == -1) {
226 #endif
227 fprintf(stderr, "Error opening '%s' for reading: %s\n",
228 f->path, strerror(errno));
229 exit(EXIT_FAILURE);
232 while (1) {
233 ssize_t d = read(fd, p->data + r, m->piece_length - r);
235 if (d < 0) {
236 fprintf(stderr, "Error reading from '%s': %s\n",
237 f->path, strerror(errno));
238 exit(EXIT_FAILURE);
241 if (d == 0) /* end of file */
242 break;
244 r += d;
246 if (r == m->piece_length) {
247 p->dest = pos;
248 p->len = m->piece_length;
249 put_full(q, p);
250 pos += SHA_DIGEST_LENGTH;
251 #ifndef NO_HASH_CHECK
252 counter += r;
253 #endif
254 r = 0;
255 p = get_free(q, m->piece_length);
259 /* now close the file */
260 if (close(fd)) {
261 fprintf(stderr, "Error closing '%s': %s\n",
262 f->path, strerror(errno));
263 exit(EXIT_FAILURE);
267 /* finally append the hash of the last irregular piece to the hash string */
268 if (r) {
269 p->dest = pos;
270 p->len = r;
271 put_full(q, p);
272 } else
273 put_free(q, p, 0);
275 #ifndef NO_HASH_CHECK
276 counter += r;
277 if (counter != m->size) {
278 fprintf(stderr, "Counted %" PRIoff " bytes, "
279 "but hashed %" PRIoff " bytes. "
280 "Something is wrong...\n", m->size, counter);
281 exit(EXIT_FAILURE);
283 #endif
286 EXPORT unsigned char *make_hash(metafile_t *m)
288 queue_t q = {
289 NULL, NULL, 0, 0,
290 PTHREAD_MUTEX_INITIALIZER,
291 PTHREAD_MUTEX_INITIALIZER,
292 PTHREAD_COND_INITIALIZER,
293 PTHREAD_COND_INITIALIZER,
294 0, 0, 0
296 pthread_t print_progress_thread; /* progress printer thread */
297 pthread_t *workers;
298 unsigned char *hash_string; /* the hash string */
299 unsigned int i;
300 int err;
302 workers = malloc(m->threads * sizeof(pthread_t));
303 hash_string = malloc(m->pieces * SHA_DIGEST_LENGTH);
304 if (workers == NULL || hash_string == NULL)
305 return NULL;
307 q.pieces = m->pieces;
308 q.buffers_max = 3*m->threads;
310 /* create worker threads */
311 for (i = 0; i < m->threads; i++) {
312 err = pthread_create(&workers[i], NULL, worker, &q);
313 if (err) {
314 fprintf(stderr, "Error creating thread: %s\n",
315 strerror(err));
316 exit(EXIT_FAILURE);
320 /* now set off the progress printer */
321 err = pthread_create(&print_progress_thread, NULL, print_progress, &q);
322 if (err) {
323 fprintf(stderr, "Error creating thread: %s\n",
324 strerror(err));
325 exit(EXIT_FAILURE);
328 /* read files and feed pieces to the workers */
329 read_files(m, &q, hash_string);
331 /* we're done so stop printing our progress. */
332 err = pthread_cancel(print_progress_thread);
333 if (err) {
334 fprintf(stderr, "Error cancelling thread: %s\n",
335 strerror(err));
336 exit(EXIT_FAILURE);
339 /* inform workers we're done */
340 set_done(&q);
342 /* wait for workers to finish */
343 for (i = 0; i < m->threads; i++) {
344 err = pthread_join(workers[i], NULL);
345 if (err) {
346 fprintf(stderr, "Error joining thread: %s\n",
347 strerror(err));
348 exit(EXIT_FAILURE);
352 free(workers);
354 /* the progress printer should be done by now too */
355 err = pthread_join(print_progress_thread, NULL);
356 if (err) {
357 fprintf(stderr, "Error joining thread: %s\n",
358 strerror(err));
359 exit(EXIT_FAILURE);
362 /* destroy mutexes and condition variables */
363 pthread_mutex_destroy(&q.mutex_full);
364 pthread_mutex_destroy(&q.mutex_free);
365 pthread_cond_destroy(&q.cond_empty);
366 pthread_cond_destroy(&q.cond_full);
368 /* free buffers */
369 free_buffers(&q);
371 /* ok, let the user know we're done too */
372 printf("\rHashed %u of %u pieces.\n", q.pieces_hashed, q.pieces);
374 return hash_string;