1 /* Test program that performs producer-consumer style communication through
2 * a circular buffer. This test program is a slightly modified version of the
3 * test program made available by Miguel Ojeda
4 * -- see also http://article.gmane.org/gmane.comp.debugging.valgrind/8782.
14 #include <semaphore.h>
16 #include "../../config.h"
19 /** gcc versions 4.1.0 and later have support for atomic builtins. */
21 #ifndef HAVE_BUILTIN_ATOMIC
22 #error Sorry, but this test program can only be compiled by a compiler that\
23 has built-in functions for atomic memory access.
27 #define BUFFER_MAX (2)
28 #define DATA_SEMAPHORE_NAME "cb-data-semaphore"
29 #define FREE_SEMAPHORE_NAME "cb-free-semaphore"
35 /* Counting semaphore representing the number of data items in the buffer. */
37 /* Counting semaphore representing the number of free elements. */
39 /* Position where a new elements should be written. */
41 /* Position from where an element can be removed. */
43 /* Mutex that protects 'in'. */
44 pthread_mutex_t mutex_in
;
45 /* Mutex that protects 'out'. */
46 pthread_mutex_t mutex_out
;
48 data_t buffer
[BUFFER_MAX
];
51 typedef struct id_and_wait_s
58 static int use_locking
= 1;
61 int fetch_and_add(int* p
, int i
)
63 return __sync_fetch_and_add(p
, i
);
66 static sem_t
* create_semaphore(const char* const name
, const int value
)
69 char name_and_pid
[32];
70 snprintf(name_and_pid
, sizeof(name_and_pid
), "%s-%d", name
, getpid());
71 sem_t
* p
= sem_open(name_and_pid
, O_CREAT
| O_EXCL
, 0600, value
);
72 if (p
== SEM_FAILED
) {
78 sem_t
* p
= malloc(sizeof(*p
));
80 sem_init(p
, 0, value
);
85 static void destroy_semaphore(const char* const name
, sem_t
* p
)
96 static void buffer_init(buffer_t
* b
)
98 b
->data
= create_semaphore(DATA_SEMAPHORE_NAME
, 0);
99 b
->free
= create_semaphore(FREE_SEMAPHORE_NAME
, BUFFER_MAX
);
101 pthread_mutex_init(&b
->mutex_in
, NULL
);
102 pthread_mutex_init(&b
->mutex_out
, NULL
);
108 static void buffer_recv(buffer_t
* b
, data_t
* d
)
113 pthread_mutex_lock(&b
->mutex_out
);
114 out
= fetch_and_add(&b
->out
, 1);
115 if (out
>= BUFFER_MAX
)
117 fetch_and_add(&b
->out
, -BUFFER_MAX
);
122 pthread_mutex_unlock(&b
->mutex_out
);
125 printf("received %d from buffer[%d]\n", *d
, out
);
131 static void buffer_send(buffer_t
* b
, data_t
* d
)
136 pthread_mutex_lock(&b
->mutex_in
);
137 in
= fetch_and_add(&b
->in
, 1);
138 if (in
>= BUFFER_MAX
)
140 fetch_and_add(&b
->in
, -BUFFER_MAX
);
145 pthread_mutex_unlock(&b
->mutex_in
);
148 printf("sent %d to buffer[%d]\n", *d
, in
);
154 static void buffer_destroy(buffer_t
* b
)
156 destroy_semaphore(DATA_SEMAPHORE_NAME
, b
->data
);
157 destroy_semaphore(FREE_SEMAPHORE_NAME
, b
->free
);
159 pthread_mutex_destroy(&b
->mutex_in
);
160 pthread_mutex_destroy(&b
->mutex_out
);
165 static void *producer(void* arg
)
167 id_and_wait_t
* ctx
= arg
;
174 #define MAXSLEEP (100 * 1000)
176 static void *consumer(void* arg
)
178 id_and_wait_t
* ctx
= arg
;
180 int wait_time
= ctx
->wait_time
;
187 printf("%i: %i\n", id
, d
);
195 int main(int argc
, char** argv
)
197 pthread_t producers
[THREADS
];
198 pthread_t consumers
[THREADS
];
199 id_and_wait_t thread_arg
[THREADS
];
203 while ((optchar
= getopt(argc
, argv
, "nq")) != EOF
)
220 for (i
= 0; i
< THREADS
; ++i
)
222 thread_arg
[i
].id
= i
;
223 thread_arg
[i
].wait_time
= rand() % MAXSLEEP
;
224 pthread_create(producers
+ i
, NULL
, producer
, &thread_arg
[i
]);
227 for (i
= 0; i
< THREADS
; ++i
)
228 pthread_create(consumers
+ i
, NULL
, consumer
, &thread_arg
[i
]);
230 for (i
= 0; i
< THREADS
; ++i
)
232 pthread_join(producers
[i
], NULL
);
233 pthread_join(consumers
[i
], NULL
);