memcheck/tests/sh-mem-random.c: Set huge_addr to 240GB
[valgrind.git] / drd / tests / circular_buffer.c
blobd84ac01273162bf36450feebd9c078f39a3d53d7
1 /* Test program that performs producer-consumer style communication through
2 * a circular buffer. This test program is a slightly modified version of the
3 * test program made available by Miguel Ojeda
4 * -- see also http://article.gmane.org/gmane.comp.debugging.valgrind/8782.
5 */
8 #include <stdio.h>
9 #include <string.h>
10 #include <stdlib.h>
11 #include <unistd.h>
12 #include <time.h>
13 #include <pthread.h>
14 #include <semaphore.h>
15 #include <fcntl.h>
16 #include "../../config.h"
19 /** gcc versions 4.1.0 and later have support for atomic builtins. */
21 #ifndef HAVE_BUILTIN_ATOMIC
22 #error Sorry, but this test program can only be compiled by a compiler that\
23 has built-in functions for atomic memory access.
24 #endif
27 #define BUFFER_MAX (2)
28 #define DATA_SEMAPHORE_NAME "cb-data-semaphore"
29 #define FREE_SEMAPHORE_NAME "cb-free-semaphore"
32 typedef int data_t;
34 typedef struct {
35 /* Counting semaphore representing the number of data items in the buffer. */
36 sem_t* data;
37 /* Counting semaphore representing the number of free elements. */
38 sem_t* free;
39 /* Position where a new elements should be written. */
40 int in;
41 /* Position from where an element can be removed. */
42 int out;
43 /* Mutex that protects 'in'. */
44 pthread_mutex_t mutex_in;
45 /* Mutex that protects 'out'. */
46 pthread_mutex_t mutex_out;
47 /* Data buffer. */
48 data_t buffer[BUFFER_MAX];
49 } buffer_t;
51 typedef struct id_and_wait_s
53 int id;
54 int wait_time;
55 } id_and_wait_t;
57 static int quiet = 0;
58 static int use_locking = 1;
60 static __inline__
61 int fetch_and_add(int* p, int i)
63 return __sync_fetch_and_add(p, i);
66 static sem_t* create_semaphore(const char* const name, const int value)
68 #ifdef VGO_darwin
69 char name_and_pid[32];
70 snprintf(name_and_pid, sizeof(name_and_pid), "%s-%d", name, getpid());
71 sem_t* p = sem_open(name_and_pid, O_CREAT | O_EXCL, 0600, value);
72 if (p == SEM_FAILED) {
73 perror("sem_open");
74 return NULL;
76 return p;
77 #else
78 sem_t* p = malloc(sizeof(*p));
79 if (p)
80 sem_init(p, 0, value);
81 return p;
82 #endif
85 static void destroy_semaphore(const char* const name, sem_t* p)
87 #ifdef VGO_darwin
88 sem_close(p);
89 sem_unlink(name);
90 #else
91 sem_destroy(p);
92 free(p);
93 #endif
96 static void buffer_init(buffer_t * b)
98 b->data = create_semaphore(DATA_SEMAPHORE_NAME, 0);
99 b->free = create_semaphore(FREE_SEMAPHORE_NAME, BUFFER_MAX);
101 pthread_mutex_init(&b->mutex_in, NULL);
102 pthread_mutex_init(&b->mutex_out, NULL);
104 b->in = 0;
105 b->out = 0;
108 static void buffer_recv(buffer_t* b, data_t* d)
110 int out;
111 sem_wait(b->data);
112 if (use_locking)
113 pthread_mutex_lock(&b->mutex_out);
114 out = fetch_and_add(&b->out, 1);
115 if (out >= BUFFER_MAX)
117 fetch_and_add(&b->out, -BUFFER_MAX);
118 out -= BUFFER_MAX;
120 *d = b->buffer[out];
121 if (use_locking)
122 pthread_mutex_unlock(&b->mutex_out);
123 if (! quiet)
125 printf("received %d from buffer[%d]\n", *d, out);
126 fflush(stdout);
128 sem_post(b->free);
131 static void buffer_send(buffer_t* b, data_t* d)
133 int in;
134 sem_wait(b->free);
135 if (use_locking)
136 pthread_mutex_lock(&b->mutex_in);
137 in = fetch_and_add(&b->in, 1);
138 if (in >= BUFFER_MAX)
140 fetch_and_add(&b->in, -BUFFER_MAX);
141 in -= BUFFER_MAX;
143 b->buffer[in] = *d;
144 if (use_locking)
145 pthread_mutex_unlock(&b->mutex_in);
146 if (! quiet)
148 printf("sent %d to buffer[%d]\n", *d, in);
149 fflush(stdout);
151 sem_post(b->data);
154 static void buffer_destroy(buffer_t* b)
156 destroy_semaphore(DATA_SEMAPHORE_NAME, b->data);
157 destroy_semaphore(FREE_SEMAPHORE_NAME, b->free);
159 pthread_mutex_destroy(&b->mutex_in);
160 pthread_mutex_destroy(&b->mutex_out);
163 static buffer_t b;
165 static void *producer(void* arg)
167 id_and_wait_t* ctx = arg;
168 int* id = &ctx->id;
170 buffer_send(&b, id);
171 return NULL;
174 #define MAXSLEEP (100 * 1000)
176 static void *consumer(void* arg)
178 id_and_wait_t* ctx = arg;
179 int id = ctx->id;
180 int wait_time = ctx->wait_time;
181 int d;
183 usleep(wait_time);
184 buffer_recv(&b, &d);
185 if (! quiet)
187 printf("%i: %i\n", id, d);
188 fflush(stdout);
190 return NULL;
193 #define THREADS (10)
195 int main(int argc, char** argv)
197 pthread_t producers[THREADS];
198 pthread_t consumers[THREADS];
199 id_and_wait_t thread_arg[THREADS];
200 int i;
201 int optchar;
203 while ((optchar = getopt(argc, argv, "nq")) != EOF)
205 switch (optchar)
207 case 'n':
208 use_locking = 0;
209 break;
210 case 'q':
211 quiet = 1;
212 break;
216 srand(time(NULL));
218 buffer_init(&b);
220 for (i = 0; i < THREADS; ++i)
222 thread_arg[i].id = i;
223 thread_arg[i].wait_time = rand() % MAXSLEEP;
224 pthread_create(producers + i, NULL, producer, &thread_arg[i]);
227 for (i = 0; i < THREADS; ++i)
228 pthread_create(consumers + i, NULL, consumer, &thread_arg[i]);
230 for (i = 0; i < THREADS; ++i)
232 pthread_join(producers[i], NULL);
233 pthread_join(consumers[i], NULL);
236 buffer_destroy(&b);
238 return 0;