6 void kt_for(int n_threads
, void (*func
)(void*,long,int), void *data
, long n
);
7 void kt_pipeline(int n_threads
, void *(*func
)(void*, int, void*), void *shared_data
, int n_steps
);
11 int max_lines
, buf_size
, n_threads
;
20 static void worker_for(void *_data
, long i
, int tid
) // kt_for() callback
22 step_t
*step
= (step_t
*)_data
;
23 char *s
= step
->lines
[i
];
26 assert(s
[l
] == '\n'); // not supporting long lines
27 for (j
= 0; j
< l
>>1; ++j
)
28 t
= s
[j
], s
[j
] = s
[l
- 1 - j
], s
[l
- 1 - j
] = t
;
31 static void *worker_pipeline(void *shared
, int step
, void *in
) // kt_pipeline() callback
33 pipeline_t
*p
= (pipeline_t
*)shared
;
34 if (step
== 0) { // step 0: read lines into the buffer
36 s
= calloc(1, sizeof(step_t
));
37 s
->lines
= calloc(p
->max_lines
, sizeof(char*));
38 while (fgets(p
->buf
, p
->buf_size
, p
->fp
) != 0) {
39 s
->lines
[s
->n_lines
] = strdup(p
->buf
);
40 if (++s
->n_lines
>= p
->max_lines
)
43 if (s
->n_lines
) return s
;
44 } else if (step
== 1) { // step 1: reverse lines
45 kt_for(p
->n_threads
, worker_for
, in
, ((step_t
*)in
)->n_lines
);
47 } else if (step
== 2) { // step 3: write the buffer to output
48 step_t
*s
= (step_t
*)in
;
49 while (s
->n_lines
> 0) {
50 fputs(s
->lines
[--s
->n_lines
], stdout
);
51 free(s
->lines
[s
->n_lines
]);
53 free(s
->lines
); free(s
);
58 int main(int argc
, char *argv
[])
63 fprintf(stderr
, "Usage: reverse <in.txt> [pipeline_threads [for_threads]]\n");
66 pl
.fp
= strcmp(argv
[1], "-")? fopen(argv
[1], "r") : stdin
;
68 fprintf(stderr
, "ERROR: failed to open the input file.\n");
71 pl_threads
= argc
> 2? atoi(argv
[2]) : 3;
73 pl
.buf_size
= 0x10000;
74 pl
.n_threads
= argc
> 3? atoi(argv
[3]) : 1;
75 pl
.buf
= calloc(pl
.buf_size
, 1);
76 kt_pipeline(pl_threads
, worker_pipeline
, &pl
, 3);
78 if (pl
.fp
!= stdin
) fclose(pl
.fp
);