1 /* Test program that performs producer-consumer style communication through
2 * a circular buffer. This test program is a slightly modified version of the
3 * test program made available by Miguel Ojeda
4 * -- see also http://article.gmane.org/gmane.comp.debugging.valgrind/8782.
14 #include <semaphore.h>
16 #include "../../config.h"
19 /** gcc versions 4.1.0 and later have support for atomic builtins. */
21 #ifndef HAVE_BUILTIN_ATOMIC
22 #error Sorry, but this test program can only be compiled by a compiler that\
23 has built-in functions for atomic memory access.
27 #define BUFFER_MAX (2)
28 #define DATA_SEMAPHORE_NAME "cb-data-semaphore"
29 #define FREE_SEMAPHORE_NAME "cb-free-semaphore"
35 /* Counting semaphore representing the number of data items in the buffer. */
37 /* Counting semaphore representing the number of free elements. */
39 /* Position where a new elements should be written. */
41 /* Position from where an element can be removed. */
43 /* Mutex that protects 'in'. */
44 pthread_mutex_t mutex_in
;
45 /* Mutex that protects 'out'. */
46 pthread_mutex_t mutex_out
;
48 data_t buffer
[BUFFER_MAX
];
52 static int use_locking
= 1;
55 int fetch_and_add(int* p
, int i
)
57 return __sync_fetch_and_add(p
, i
);
60 static sem_t
* create_semaphore(const char* const name
, const int value
)
63 char name_and_pid
[32];
64 snprintf(name_and_pid
, sizeof(name_and_pid
), "%s-%d", name
, getpid());
65 sem_t
* p
= sem_open(name_and_pid
, O_CREAT
| O_EXCL
, 0600, value
);
66 if (p
== SEM_FAILED
) {
72 sem_t
* p
= malloc(sizeof(*p
));
74 sem_init(p
, 0, value
);
79 static void destroy_semaphore(const char* const name
, sem_t
* p
)
90 static void buffer_init(buffer_t
* b
)
92 b
->data
= create_semaphore(DATA_SEMAPHORE_NAME
, 0);
93 b
->free
= create_semaphore(FREE_SEMAPHORE_NAME
, BUFFER_MAX
);
95 pthread_mutex_init(&b
->mutex_in
, NULL
);
96 pthread_mutex_init(&b
->mutex_out
, NULL
);
102 static void buffer_recv(buffer_t
* b
, data_t
* d
)
107 pthread_mutex_lock(&b
->mutex_out
);
108 out
= fetch_and_add(&b
->out
, 1);
109 if (out
>= BUFFER_MAX
)
111 fetch_and_add(&b
->out
, -BUFFER_MAX
);
116 pthread_mutex_unlock(&b
->mutex_out
);
119 printf("received %d from buffer[%d]\n", *d
, out
);
125 static void buffer_send(buffer_t
* b
, data_t
* d
)
130 pthread_mutex_lock(&b
->mutex_in
);
131 in
= fetch_and_add(&b
->in
, 1);
132 if (in
>= BUFFER_MAX
)
134 fetch_and_add(&b
->in
, -BUFFER_MAX
);
139 pthread_mutex_unlock(&b
->mutex_in
);
142 printf("sent %d to buffer[%d]\n", *d
, in
);
148 static void buffer_destroy(buffer_t
* b
)
150 destroy_semaphore(DATA_SEMAPHORE_NAME
, b
->data
);
151 destroy_semaphore(FREE_SEMAPHORE_NAME
, b
->free
);
153 pthread_mutex_destroy(&b
->mutex_in
);
154 pthread_mutex_destroy(&b
->mutex_out
);
159 static void producer(int* id
)
165 #define MAXSLEEP (100 * 1000)
167 static void consumer(int* id
)
170 usleep(rand() % MAXSLEEP
);
174 printf("%i: %i\n", *id
, d
);
182 int main(int argc
, char** argv
)
184 pthread_t producers
[THREADS
];
185 pthread_t consumers
[THREADS
];
186 int thread_arg
[THREADS
];
190 while ((optchar
= getopt(argc
, argv
, "nq")) != EOF
)
207 for (i
= 0; i
< THREADS
; ++i
)
210 pthread_create(producers
+ i
, NULL
,
211 (void * (*)(void *)) producer
, &thread_arg
[i
]);
214 for (i
= 0; i
< THREADS
; ++i
)
215 pthread_create(consumers
+ i
, NULL
,
216 (void * (*)(void *)) consumer
, &thread_arg
[i
]);
218 for (i
= 0; i
< THREADS
; ++i
)
220 pthread_join(producers
[i
], NULL
);
221 pthread_join(consumers
[i
], NULL
);