mpsc_queue: Make dequeue lock-free
[libnbds.git] / test / spmc_queue_m.c
blobafde321bbab74d6beaa85fda01463f4daf2a7cac
1 /*
2 libnbds
3 Copyright (C) 2014 Paweł Dziepak
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdint.h>
20 #include <stdlib.h>
21 #include <string.h>
23 #include <pthread.h>
24 #include <time.h>
26 #include "spmc_queue.h"
28 struct spmc_queue queue;
30 pthread_barrier_t start;
32 #define THREADS 4u
33 #define ITERATIONS (1024 * 1024 * 8u)
35 struct thread_data {
36 uintptr_t read_data[ITERATIONS];
37 size_t pointer;
39 struct thread_data thread_datas[THREADS];
40 uintptr_t already_read;
42 void* thread_func(void* ptr)
44 uintptr_t idx = *(uintptr_t*)ptr;
45 struct thread_data* data = thread_datas + idx;
46 uintptr_t read_count = 0;
47 uintptr_t val;
49 pthread_barrier_wait(&start);
51 while (1) {
52 do {
53 val = (uintptr_t)spmc_queue_dequeue(&queue);
55 if (!val) {
56 if (read_count) {
57 __sync_fetch_and_add(&already_read, read_count);
58 read_count = 0;
61 if (*(volatile uintptr_t*)&already_read == ITERATIONS)
62 return NULL;
64 } while (!val);
66 if (data->pointer && data->read_data[data->pointer - 1] >= val)
67 exit(1);
69 data->read_data[data->pointer++] = val;
70 read_count++;
73 return NULL;
76 int main(int argc, char** argv)
78 uintptr_t i, j;
79 int error;
80 uintptr_t ids[THREADS];
81 pthread_t threads[THREADS];
82 uintptr_t current[THREADS];
84 (void)argc;
85 (void)argv;
87 for (i = 0; i < THREADS; i++)
88 ids[i] = i;
90 error = spmc_queue_init(&queue);
91 if (error)
92 return 1;
94 if (spmc_queue_dequeue(&queue))
95 return 1;
97 pthread_barrier_init(&start, NULL, THREADS + 1);
99 for (i = 0; i < THREADS; i++)
100 pthread_create(&threads[i], NULL, thread_func, &ids[i]);
102 pthread_barrier_wait(&start);
104 for (i = 1; i <= ITERATIONS; i++) {
105 error = spmc_queue_enqueue(&queue, (void*)i);
106 if (error)
107 return 1;
110 for (i = 0; i < THREADS; i++)
111 pthread_join(threads[i], NULL);
113 if (spmc_queue_dequeue(&queue))
114 return 1;
116 spmc_queue_destroy(&queue);
118 memset(current, 0, sizeof(current));
119 for (i = 1; i <= ITERATIONS; i++) {
120 error = 0;
121 for (j = 0; j < THREADS; j++) {
122 if (thread_datas[j].pointer == current[j])
123 continue;
125 if (thread_datas[j].read_data[current[j]] == i) {
126 if (!error) {
127 current[j]++;
128 error = 1;
129 } else
130 return 1;
135 for (i = 0; i < THREADS; i++) {
136 if (thread_datas[i].pointer != current[i])
137 return 1;