Pull in patch that properly includes stdint.h
[pkg-k5-afs_openafs.git] / src / mcas / replay.c
blob613efbf4bc79713ac68b21389d9a042f130c4daf
1 /******************************************************************************
2 * replay.c
4 * Replay the log output of search-structure runs.
5 * Must build set_harness.c with DO_WRITE_LOG defined.
7 * Copyright (c) 2002-2003, K A Fraser
10 Redistribution and use in source and binary forms, with or without
11 modification, are permitted provided that the following conditions are
12 met:
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
16 * Redistributions in binary form must reproduce the above
17 * copyright notice, this list of conditions and the following
18 * disclaimer in the documentation and/or other materials provided
19 * with the distribution. Neither the name of the Keir Fraser
20 * nor the names of its contributors may be used to endorse or
21 * promote products derived from this software without specific
22 * prior written permission.
24 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <limits.h>
40 #include <errno.h>
41 #include <stdlib.h>
42 #include <assert.h>
43 #include <sys/types.h>
44 #include <sys/wait.h>
45 #include <sys/stat.h>
46 #include <fcntl.h>
47 #include <unistd.h>
49 #include "portable_defns.h"
51 #define RMAX_THREADS 256
52 #define VERIFY_ORDERINGS
54 #define LOG_REPLAYED (1<<26)
55 #define LOG_KEY_MASK 0xffffff
57 typedef struct log_st
59 interval_t start, end;
60 unsigned int data; /* key, and replay flag */
61 void *val, *old_val; /* op changed mapping from old_val to val */
62 } log_t;
64 #define REPLAYED(_l) ((_l)->data & LOG_REPLAYED)
66 static log_t *global_log;
67 static int nr_threads, nr_updates, nr_keys;
68 static int *key_offsets;
69 static int *success;
70 static unsigned int next_key = 0;
71 static pthread_mutex_t key_lock;
75 * GLOBAL LOGS SORTED ON:
76 * 1. Key value
77 * 2. Start time
79 * Replayer deals with each key value in turn.
81 static int compare(const void *t1, const void *t2)
83 const log_t *l1 = t1;
84 const log_t *l2 = t2;
85 const int k1 = l1->data & LOG_KEY_MASK;
86 const int k2 = l2->data & LOG_KEY_MASK;
88 if ( k1 < k2 ) return(-1);
89 if ( k1 > k2 ) return(+1);
91 if ( l1->start < l2->start ) return(-1);
93 return(+1);
97 static int do_op(log_t *log, void **key_state)
99 if ( REPLAYED(log) || (log->old_val != *key_state) ) return(0);
100 *key_state = log->val;
101 log->data |= LOG_REPLAYED;
102 return(1);
106 static void undo_op(log_t *log, void **key_state)
108 assert(REPLAYED(log));
109 log->data &= ~LOG_REPLAYED;
110 *key_state = log->old_val;
114 /* Sink down element @pos of @heap. */
115 static void down_heap(log_t **heap, int *heap_offsets, log_t *log, int pos)
117 int sz = (int)heap[0], nxt;
118 log_t *tmp;
119 while ( (nxt = (pos << 1)) <= sz )
121 if ( ((nxt+1) <= sz) && (heap[nxt+1]->end < heap[nxt]->end) ) nxt++;
122 if ( heap[nxt]->end > heap[pos]->end ) break;
123 heap_offsets[heap[pos] - log] = nxt;
124 heap_offsets[heap[nxt] - log] = pos;
125 tmp = heap[pos];
126 heap[pos] = heap[nxt];
127 heap[nxt] = tmp;
128 pos = nxt;
132 /* Float element @pos up @heap. */
133 static void up_heap(log_t **heap, int *heap_offsets, log_t *log, int pos)
135 log_t *tmp;
136 while ( pos > 1 )
138 if ( heap[pos]->end > heap[pos>>1]->end ) break;
139 heap_offsets[heap[pos] - log] = pos >> 1;
140 heap_offsets[heap[pos>>1] - log] = pos;
141 tmp = heap[pos];
142 heap[pos] = heap[pos>>1];
143 heap[pos>>1] = tmp;
144 pos >>= 1;
149 /* Delete @entry from @heap. */
150 static void remove_entry(log_t **heap, int *heap_offsets,
151 log_t *log, log_t *entry)
153 int sz = (int)heap[0];
154 int pos = heap_offsets[entry - log];
155 heap_offsets[heap[sz] - log] = pos;
156 heap[pos] = heap[sz];
157 heap[0] = (void *)(--sz);
158 if ( (pos > 1) && (heap[pos]->end < heap[pos>>1]->end) )
160 up_heap(heap, heap_offsets, log, pos);
162 else
164 down_heap(heap, heap_offsets, log, pos);
169 /* Add new entry @new to @heap. */
170 static void add_entry(log_t **heap, int *heap_offsets, log_t *log, log_t *new)
172 int sz = (int)heap[0];
173 heap[0] = (void *)(++sz);
174 heap_offsets[new - log] = sz;
175 heap[sz] = new;
176 up_heap(heap, heap_offsets, log, sz);
181 * This linearisation algorithm is a depth-first search of all feasible
182 * orderings. At each step, the next available operation is selected.
183 * The set of "available" operations is those which:
184 * (1) have not already been selected on this search path
185 * (2) are operations whose results are correct given current state
186 * (eg. a failed delete couldn't be selected if the key is in the set!)
187 * (3) have start times <= the earliest end time in the set.
188 * (1) ensures that each operation happens only once. (2) ensures that
189 * abstract state is consistent between operations. (3) ensures that time
190 * ordering is conserved.
192 static int linearise_ops_for_key(
193 log_t *log, int nr_items, log_t **stack,
194 log_t **cutoff_heap, int *heap_offsets, void **key_state)
196 int i;
197 log_t **sp = stack;
198 interval_t cutoff;
200 /* Construct cutoff heap. */
201 cutoff_heap[0] = (void *)nr_items;
202 for ( i = 0; i < nr_items; i++ )
204 cutoff_heap[i+1] = log + i;
205 heap_offsets[i] = i+1;
207 for ( i = nr_items>>1; i > 0; i-- )
209 down_heap(cutoff_heap, heap_offsets, log, i);
212 cutoff = cutoff_heap[1]->end;
214 for ( i = 0; ; )
216 while ( (i < nr_items) && (log[i].start <= cutoff) )
218 if ( !do_op(&log[i], key_state) ) { i++; continue; }
220 *sp++ = &log[i];
222 /* Done? */
223 if ( (sp - stack) == nr_items ) goto success;
225 remove_entry(cutoff_heap, heap_offsets, log, &log[i]);
226 cutoff = cutoff_heap[1]->end;
227 i = 0;
230 /* Failure? */
231 if ( (sp - stack) == 0 )
233 for ( i = -3; i < nr_items + 3; i++ )
235 #if 1
236 printf("%08x -> %08x -- %d: %08x -> %08x\n",
237 (unsigned int)log[i].start,
238 (unsigned int)log[i].end,
239 log[i].data & LOG_KEY_MASK,
240 (unsigned int)log[i].old_val,
241 (unsigned int)log[i].val);
242 #endif
244 return(0);
247 i = *--sp - log;
248 undo_op(&log[i], key_state);
249 add_entry(cutoff_heap, heap_offsets, log, &log[i]);
250 cutoff = cutoff_heap[1]->end;
251 i++;
254 success:
255 return(1);
259 static void *thread_start(void *arg)
261 unsigned long tid = (unsigned long)arg;
262 unsigned int our_key;
263 int ch_start, ch_end, start, end, nr_items, *heap_offsets;
264 log_t **stack;
265 log_t **cutoff_heap;
266 interval_t cutoff;
267 void *key_state;
268 #ifdef VERIFY_ORDERINGS
269 int i;
270 #endif
272 stack = malloc((nr_threads*nr_updates+1)*sizeof(log_t*));
273 cutoff_heap = malloc((nr_threads*nr_updates+1)*sizeof(*cutoff_heap));
274 heap_offsets = malloc((nr_threads*nr_updates+1)*sizeof(*heap_offsets));
275 if ( !stack || !cutoff_heap || !heap_offsets )
277 fprintf(stderr, "Error allocating space for stacks\n");
278 return(NULL);
281 again:
282 pthread_mutex_lock(&key_lock);
283 our_key = next_key++;
284 pthread_mutex_unlock(&key_lock);
285 if ( our_key >= nr_keys ) goto out;
287 start = key_offsets[our_key];
288 end = key_offsets[our_key+1];
289 nr_items = end - start;
291 printf("[Thread %lu] ++ Linearising key %d (%d events)\n",
292 tid, our_key, nr_items);
294 #if 0
296 int i;
297 for ( i = start; i < end; i++ )
299 printf("%04d/%04d -- %08x -> %08x -- %d: %08x -> %08x\n",
300 our_key, i - start,
301 (unsigned int)global_log[i].start,
302 (unsigned int)global_log[i].end,
303 global_log[i].data & LOG_KEY_MASK,
304 (unsigned int)global_log[i].old_val,
305 (unsigned int)global_log[i].val);
308 #endif
311 * We divide operations into independent chunks. A chunk is a maximal
312 * sequence of operations, ordered on start time, that does not
313 * overlap with any operation in any other chunk. Clearly, finding
314 * a linearisation for each chunk produces a total schedule.
316 success[our_key] = 1;
317 key_state = 0;
318 for ( ch_start = start; ch_start < end; ch_start = ch_end )
320 cutoff = global_log[ch_start].end;
321 for ( ch_end = ch_start; ch_end < end; ch_end++ )
323 if ( global_log[ch_end].start > cutoff ) break;
324 if ( global_log[ch_end].end > cutoff )
325 cutoff = global_log[ch_end].end;
328 /* Linearise chunk ch_start -> ch_end. */
329 success[our_key] = linearise_ops_for_key(
330 &global_log[ch_start],
331 ch_end - ch_start,
332 &stack[ch_start - start],
333 cutoff_heap,
334 heap_offsets,
335 &key_state);
337 if ( !success[our_key] )
339 printf("[Thread %lu] -- Linearisation FAILED for key %d\n",
340 tid, our_key);
341 goto again;
345 printf("[Thread %lu] -- Linearisation %s for key %d\n",
346 tid, (success[our_key] ? "found" : "FAILED"), our_key);
348 #ifdef VERIFY_ORDERINGS
349 printf("[Thread %lu] ++ Verifying key %d\n", tid, our_key);
350 cutoff = 0;
351 key_state = 0;
352 for ( i = 0; i < nr_items; i++ )
354 stack[i]->data &= ~LOG_REPLAYED; /* stop valid_op() from choking */
355 if ( !do_op(stack[i], &key_state) || (stack[i]->end < cutoff) )
357 int j;
358 fprintf(stderr, "\t*** INTERNAL ERROR: "
359 "Assigned ordering is invalid!\n");
360 for ( j = (i < 2) ? 0 : (i-2); j < i+6; j++ )
362 printf("%08x -> %08x -- %d: %08x -> %08x\n",
363 (unsigned int)stack[j]->start,
364 (unsigned int)stack[j]->end,
365 stack[j]->data & LOG_KEY_MASK,
366 (unsigned int)stack[j]->old_val,
367 (unsigned int)stack[j]->val);
369 exit(-1);
371 if ( stack[i]->start > cutoff ) cutoff = stack[i]->start;
373 printf("[Thread %lu] -- Verified key %d\n", tid, our_key);
374 #endif
376 goto again;
378 out:
379 return(NULL);
383 int main(int argc, char **argv)
385 pthread_t thread[RMAX_THREADS];
386 int fd, i, j, failed = 0, nr_cpus;
387 unsigned long log_header[3];
389 if ( argc != 2 )
391 fprintf(stderr, "%s <log name>\n", argv[0]);
392 exit(1);
395 nr_cpus = (int)sysconf(_SC_NPROCESSORS_ONLN);
396 if ( nr_cpus > RMAX_THREADS ) nr_cpus = RMAX_THREADS;
398 if ( (fd = open(argv[1], O_RDONLY, 0)) == -1 )
400 fprintf(stderr, "Error opening log\n");
401 exit(-1);
404 /* Grok the log header. */
405 read(fd, log_header, sizeof(log_header));
406 nr_threads = log_header[0];
407 nr_updates = log_header[1];
408 nr_keys = log_header[2];
409 printf("Read log header: nr_updates=%d, nr_threads=%d, nr_keys=%d\n",
410 nr_updates, nr_threads, nr_keys);
412 /* Allocate state for processing log entries. */
413 global_log = malloc((nr_threads*nr_updates+1)*sizeof(log_t));
414 key_offsets = malloc((nr_keys+1)*sizeof(*key_offsets));
415 success = malloc(nr_keys*sizeof(*success));
416 if ( !global_log || !key_offsets || !success )
418 fprintf(stderr, "Error allocating space for log\n");
419 exit(-1);
422 /* Read log entries, and sort into key and timestamp order. */
423 read(fd, global_log, nr_threads*nr_updates*sizeof(log_t));
424 global_log[nr_threads*nr_updates].data = LOG_KEY_MASK; /* sentinel */
426 printf("Sorting logs..."); fflush(stdout);
427 qsort(global_log, nr_threads*nr_updates, sizeof(log_t), compare);
428 printf(" done\n");
430 /* Find offsets of key regions in global table. */
431 key_offsets[0] = 0;
432 nr_keys = 0;
433 for ( i = 0; i < (nr_threads * nr_updates); i = j )
435 j = i+1;
436 while ( (global_log[j].data & LOG_KEY_MASK) ==
437 (global_log[i].data & LOG_KEY_MASK) ) j++;
438 key_offsets[++nr_keys] = j;
441 /* Set up a bunch of worker threads.... */
442 pthread_mutex_init(&key_lock, NULL);
443 for ( i = 0; i < nr_cpus; i++ )
445 if ( pthread_create(&thread[i], NULL, thread_start, (void *)i) )
447 fprintf(stderr, "Error creating thread %d (%d)\n", i, errno);
448 exit(1);
452 /* ...and wait for them all to complete. */
453 for ( i = 0; i < nr_cpus; i++ )
455 pthread_join(thread[i], NULL);
458 /* Summarise results from worker threads. */
459 for ( i = 0; i < nr_keys; i++ )
461 if ( success[i] ) continue;
462 printf("FAILED on key %d\n", i);
463 failed++;
466 if ( failed )
468 printf("Failed on %d keys\n", failed);
469 return(1);
472 printf("All assigned orderings are valid\n");
473 return(0);