8322 nl: misleading-indentation
[unleashed/tickless.git] / usr / src / cmd / sort / common / streams.c
blobbc00aa93811463f73aafde65b53fcfe92a41c4f1
1 /*
2 * CDDL HEADER START
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
19 * CDDL HEADER END
22 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
26 #pragma ident "%Z%%M% %I% %E% SMI"
28 #include "streams.h"
30 static const stream_ops_t invalid_ops = {
31 NULL,
32 NULL,
33 NULL,
34 NULL,
35 NULL,
36 NULL,
37 NULL,
38 NULL,
39 NULL,
40 NULL
43 stream_t *
44 stream_new(int src)
46 stream_t *str = safe_realloc(NULL, sizeof (stream_t));
48 stream_clear(str);
49 stream_set(str, src);
51 return (str);
54 void
55 stream_set(stream_t *str, flag_t flags)
57 if (flags & STREAM_SOURCE_MASK) {
58 ASSERT((flags & STREAM_SOURCE_MASK) == STREAM_ARRAY ||
59 (flags & STREAM_SOURCE_MASK) == STREAM_SINGLE ||
60 (flags & STREAM_SOURCE_MASK) == STREAM_MMAP ||
61 (flags & STREAM_SOURCE_MASK) == STREAM_WIDE);
63 str->s_status &= ~STREAM_SOURCE_MASK;
64 str->s_status |= flags & STREAM_SOURCE_MASK;
66 switch (flags & STREAM_SOURCE_MASK) {
67 case STREAM_NO_SOURCE:
68 str->s_element_size = 0;
69 str->s_ops = invalid_ops;
70 return;
71 case STREAM_ARRAY:
73 * Array streams inherit element size.
75 str->s_ops = stream_array_ops;
76 break;
77 case STREAM_MMAP:
78 str->s_element_size = sizeof (char);
79 str->s_ops = stream_mmap_ops;
80 break;
81 case STREAM_SINGLE:
82 str->s_element_size = sizeof (char);
83 str->s_ops = stream_stdio_ops;
84 break;
85 case STREAM_WIDE:
86 str->s_element_size = sizeof (wchar_t);
87 str->s_ops = stream_wide_ops;
88 break;
89 default:
90 die(EMSG_UNKN_STREAM, str->s_status);
94 str->s_status |= (flags & ~STREAM_SOURCE_MASK);
96 if (str->s_status & STREAM_UNIQUE)
97 switch (str->s_status & STREAM_SOURCE_MASK) {
98 case STREAM_SINGLE :
99 str->s_ops.sop_put_line =
100 stream_stdio_put_line_unique;
101 break;
102 case STREAM_WIDE :
103 str->s_ops.sop_put_line =
104 stream_wide_put_line_unique;
105 break;
106 default :
107 break;
110 if (str->s_status & STREAM_INSTANT)
111 switch (str->s_status & STREAM_SOURCE_MASK) {
112 case STREAM_SINGLE :
113 str->s_ops.sop_fetch =
114 stream_stdio_fetch_overwrite;
115 break;
116 case STREAM_WIDE :
117 str->s_ops.sop_fetch =
118 stream_wide_fetch_overwrite;
119 break;
120 default :
121 break;
125 void
126 stream_unset(stream_t *streamp, flag_t flags)
128 ASSERT(!(flags & STREAM_SOURCE_MASK));
130 streamp->s_status &= ~(flags & ~STREAM_SOURCE_MASK);
134 stream_is_primed(stream_t *streamp)
136 return (streamp->s_status & STREAM_PRIMED);
139 void
140 stream_clear(stream_t *str)
142 (void) memset(str, 0, sizeof (stream_t));
145 static void
146 stream_copy(stream_t *dest, stream_t *src)
148 (void) memcpy(dest, src, sizeof (stream_t));
151 void
152 stream_stat_chain(stream_t *strp)
154 struct stat buf;
155 stream_t *cur_strp = strp;
157 while (cur_strp != NULL) {
158 if (cur_strp->s_status & STREAM_NOTFILE ||
159 cur_strp->s_status & STREAM_ARRAY) {
160 cur_strp = cur_strp->s_next;
161 continue;
164 if (stat(cur_strp->s_filename, &buf) < 0)
165 die(EMSG_STAT, cur_strp->s_filename);
167 cur_strp->s_dev = buf.st_dev;
168 cur_strp->s_ino = buf.st_ino;
169 cur_strp->s_filesize = buf.st_size;
171 cur_strp = cur_strp->s_next;
175 uint_t
176 stream_count_chain(stream_t *str)
178 uint_t n = 0;
180 while (str != NULL) {
181 n++;
182 str = str->s_next;
185 return (n);
189 stream_open_for_read(sort_t *S, stream_t *str)
191 int fd;
193 ASSERT(!(str->s_status & STREAM_OUTPUT));
196 * STREAM_ARRAY streams are open by definition.
198 if ((str->s_status & STREAM_SOURCE_MASK) == STREAM_ARRAY) {
199 stream_set(str, STREAM_ARRAY | STREAM_OPEN);
200 return (1);
204 * Set data type according to locale for input from stdin.
206 if (str->s_status & STREAM_NOTFILE) {
207 str->s_type.BF.s_fp = stdin;
208 stream_set(str, STREAM_OPEN | (S->m_single_byte_locale ?
209 STREAM_SINGLE : STREAM_WIDE));
210 return (1);
213 ASSERT(str->s_filename);
215 #ifndef DEBUG_DISALLOW_MMAP
216 if (S->m_single_byte_locale &&
217 str->s_filesize > 0 &&
218 str->s_filesize < SSIZE_MAX) {
220 * make mmap() attempt; set s_status and return if successful
222 fd = open(str->s_filename, O_RDONLY);
223 if (fd < 0) {
224 if (errno == EMFILE || errno == ENFILE)
225 return (-1);
226 else
227 die(EMSG_OPEN, str->s_filename);
229 str->s_buffer = mmap(0, str->s_filesize, PROT_READ,
230 MAP_SHARED, fd, 0);
232 if (str->s_buffer != MAP_FAILED) {
233 str->s_buffer_size = str->s_filesize;
234 str->s_type.SF.s_fd = fd;
236 stream_set(str, STREAM_MMAP | STREAM_OPEN);
237 stream_unset(str, STREAM_PRIMED);
238 return (1);
242 * Otherwise the mmap() failed due to address space exhaustion;
243 * since we have already opened the file, we close it and drop
244 * into the normal (STDIO) case.
246 (void) close(fd);
247 str->s_buffer = NULL;
249 #endif /* DEBUG_DISALLOW_MMAP */
251 if ((str->s_type.BF.s_fp = fopen(str->s_filename, "r")) == NULL) {
252 if (errno == EMFILE || errno == ENFILE)
253 return (-1);
254 else
255 die(EMSG_OPEN, str->s_filename);
258 str->s_type.BF.s_vbuf = safe_realloc(NULL, STDIO_VBUF_SIZE);
259 if (setvbuf(str->s_type.BF.s_fp, str->s_type.BF.s_vbuf, _IOFBF,
260 STDIO_VBUF_SIZE) != 0) {
261 safe_free(str->s_type.BF.s_vbuf);
262 str->s_type.BF.s_vbuf = NULL;
265 stream_set(str, STREAM_OPEN | (S->m_single_byte_locale ? STREAM_SINGLE :
266 STREAM_WIDE));
267 stream_unset(str, STREAM_PRIMED);
269 return (1);
272 void
273 stream_set_size(stream_t *str, size_t new_size)
276 * p_new_size is new_size rounded upwards to nearest multiple of
277 * PAGESIZE, since mmap() is going to reserve it in any case. This
278 * ensures that the far end of the buffer is also aligned, such that we
279 * obtain aligned pointers if we choose to subtract from it.
281 size_t p_new_size = (new_size + PAGESIZE) & ~(PAGESIZE - 1);
283 if (str->s_buffer_size == p_new_size)
284 return;
286 if (str->s_buffer != NULL)
287 (void) munmap(str->s_buffer, str->s_buffer_size);
289 if (new_size == 0) {
290 str->s_buffer = NULL;
291 str->s_buffer_size = 0;
292 return;
295 str->s_buffer = xzmap(0, p_new_size, PROT_READ | PROT_WRITE,
296 MAP_PRIVATE, 0);
298 if (str->s_buffer == MAP_FAILED)
299 die(EMSG_MMAP);
301 str->s_buffer_size = p_new_size;
304 void
305 stream_add_file_to_chain(stream_t **str_chain, char *filename)
307 stream_t *str;
309 str = stream_new(STREAM_NO_SOURCE);
311 str->s_filename = filename;
312 str->s_type.SF.s_fd = -1;
314 stream_push_to_chain(str_chain, str);
317 void
318 stream_push_to_chain(stream_t **str_chain, stream_t *streamp)
320 stream_t *cur_streamp = *str_chain;
322 if (cur_streamp == NULL) {
323 *str_chain = streamp;
324 streamp->s_next = NULL;
325 return;
328 while (cur_streamp->s_next != NULL)
329 cur_streamp = cur_streamp->s_next;
331 cur_streamp->s_next = streamp;
332 streamp->s_previous = cur_streamp;
333 streamp->s_next = NULL;
336 static void
337 stream_dump(stream_t *str_in, stream_t *str_out)
339 ASSERT(!(str_in->s_status & STREAM_OUTPUT));
340 ASSERT(str_out->s_status & STREAM_OUTPUT);
342 SOP_PUT_LINE(str_out, &str_in->s_current);
344 while (!SOP_EOS(str_in)) {
345 SOP_FETCH(str_in);
346 SOP_PUT_LINE(str_out, &str_in->s_current);
351 * stream_push_to_temporary() with flags set to ST_CACHE merely copies the
352 * stream_t pointer onto the chain. With flags set to ST_NOCACHE, the stream is
353 * written out to a file. Stream pointers passed to stream_push_to_temporary()
354 * must refer to allocated objects, and not to objects created on function
355 * stacks. Finally, if strp == NULL, stream_push_to_temporary() creates and
356 * pushes the new stream; the output stream is left open if ST_OPEN is set.
358 stream_t *
359 stream_push_to_temporary(stream_t **str_chain, stream_t *streamp, int flags)
361 stream_t *out_streamp;
363 if (flags & ST_CACHE) {
364 ASSERT(streamp->s_status & STREAM_ARRAY);
365 stream_set(streamp, STREAM_NOT_FREEABLE | STREAM_TEMPORARY);
366 stream_push_to_chain(str_chain, streamp);
367 return (streamp);
370 out_streamp = safe_realloc(NULL, sizeof (stream_t));
372 if (streamp != NULL) {
373 stream_copy(out_streamp, streamp);
374 stream_unset(out_streamp, STREAM_OPEN);
375 ASSERT(streamp->s_element_size == sizeof (char) ||
376 streamp->s_element_size == sizeof (wchar_t));
377 stream_set(out_streamp,
378 streamp->s_element_size == 1 ? STREAM_SINGLE : STREAM_WIDE);
379 out_streamp->s_buffer = NULL;
380 out_streamp->s_buffer_size = 0;
381 } else {
382 stream_clear(out_streamp);
383 stream_set(out_streamp, flags & ST_WIDE ? STREAM_WIDE :
384 STREAM_SINGLE);
387 (void) bump_file_template();
388 out_streamp->s_filename = strdup(get_file_template());
390 if (SOP_OPEN_FOR_WRITE(out_streamp) == -1)
391 return (NULL);
393 stream_set(out_streamp, STREAM_TEMPORARY);
394 stream_push_to_chain(str_chain, out_streamp);
396 if (streamp != NULL) {
398 * We reset the input stream to the beginning, and copy it in
399 * sequence to the output stream, freeing the raw_collate field
400 * as we go.
402 if (SOP_PRIME(streamp) != PRIME_SUCCEEDED)
403 die(EMSG_BADPRIME);
404 stream_dump(streamp, out_streamp);
407 if (!(flags & ST_OPEN)) {
408 SOP_FREE(out_streamp);
409 (void) SOP_CLOSE(out_streamp);
413 * Now that we've written this stream to disk, we needn't protect any
414 * in-memory consumer.
416 if (streamp != NULL)
417 streamp->s_consumer = NULL;
419 return (out_streamp);
422 void
423 stream_close_all_previous(stream_t *tail_streamp)
425 stream_t *cur_streamp;
427 ASSERT(tail_streamp != NULL);
429 cur_streamp = tail_streamp->s_previous;
430 while (cur_streamp != NULL) {
431 (void) SOP_FREE(cur_streamp);
432 if (SOP_IS_CLOSABLE(cur_streamp))
433 (void) SOP_CLOSE(cur_streamp);
435 cur_streamp = cur_streamp->s_previous;
439 void
440 stream_unlink_temporary(stream_t *streamp)
442 if (streamp->s_status & STREAM_TEMPORARY) {
443 (void) SOP_FREE(streamp);
445 if (streamp->s_ops.sop_unlink)
446 (void) SOP_UNLINK(streamp);
451 * stream_insert() takes input from src stream, converts to each line to
452 * collatable form, and places a line_rec_t in dest stream, which is of type
453 * STREAM_ARRAY.
456 stream_insert(sort_t *S, stream_t *src, stream_t *dest)
458 ssize_t i = dest->s_type.LA.s_array_size;
459 line_rec_t *l_series;
460 char *l_convert = dest->s_buffer;
461 int return_val = ST_MEM_AVAIL;
462 int fetch_result = NEXT_LINE_COMPLETE;
465 * Scan through until total bytes allowed accumulated, and return.
466 * Use SOP_FETCH(src) so that this works for all stream types,
467 * and so that we can repeat until eos.
469 * For each new line, we move back sizeof (line_rec_t) from the end of
470 * the array buffer, and copy into the start of the array buffer. When
471 * the pointers meet, or when we exhaust the current stream, we return.
472 * If we have not filled the current memory allocation, we return
473 * ST_MEM_AVAIL, else we return ST_MEM_FILLED.
475 ASSERT(stream_is_primed(src));
476 ASSERT(dest->s_status & STREAM_ARRAY);
478 /*LINTED ALIGNMENT*/
479 l_series = (line_rec_t *)((caddr_t)dest->s_buffer
480 + dest->s_buffer_size) - dest->s_type.LA.s_array_size;
482 if (dest->s_type.LA.s_array_size)
483 l_convert = l_series->l_collate.sp +
484 l_series->l_collate_length + src->s_element_size;
487 * current line has been set prior to entry
489 src->s_current.l_collate.sp = l_convert;
490 src->s_current.l_collate_bufsize = (caddr_t)l_series
491 - (caddr_t)l_convert - sizeof (line_rec_t);
492 src->s_current.l_raw_collate.sp = NULL;
494 if (src->s_current.l_collate_bufsize <= 0)
495 return (ST_MEM_FILLED);
497 src->s_consumer = dest;
499 while (src->s_current.l_collate_bufsize > 0 &&
500 (src->s_current.l_collate_length = S->m_coll_convert(
501 S->m_fields_head, &src->s_current, FCV_FAIL,
502 S->m_field_separator)) >= 0) {
503 ASSERT((char *)l_series > l_convert);
504 l_series--;
505 l_convert += src->s_current.l_collate_length;
507 if ((char *)l_series <= l_convert) {
508 __S(stats_incr_insert_filled_downward());
509 l_series++;
510 return_val = ST_MEM_FILLED;
511 break;
515 * There's no collision with the lower part of the buffer, so we
516 * can safely begin processing the line. In the debug case, we
517 * test for uninitialized data by copying a non-zero pattern.
519 #ifdef DEBUG
520 memset(l_series, 0x1ff11ff1, sizeof (line_rec_t));
521 #endif
523 copy_line_rec(&src->s_current, l_series);
524 i++;
526 if (SOP_EOS(src) ||
527 (fetch_result = SOP_FETCH(src)) == NEXT_LINE_INCOMPLETE)
528 break;
530 src->s_current.l_collate.sp = l_convert;
531 src->s_current.l_collate_bufsize = (caddr_t)l_series
532 - (caddr_t)l_convert - sizeof (line_rec_t);
533 src->s_current.l_raw_collate.sp = NULL;
536 if (fetch_result == NEXT_LINE_INCOMPLETE) {
537 __S(stats_incr_insert_filled_input());
538 return_val = ST_MEM_FILLED;
539 } else if (src->s_current.l_collate_length < 0 ||
540 src->s_current.l_collate_bufsize <= 0) {
541 __S(stats_incr_insert_filled_upward());
542 return_val = ST_MEM_FILLED;
545 if (fetch_result != NEXT_LINE_INCOMPLETE &&
546 src->s_current.l_collate_length < 0 &&
547 i == 0)
549 * There's no room for conversion of our only line; need to
550 * execute with larger memory.
552 die(EMSG_MEMORY);
555 * Set up pointer array to line records.
557 if (i > dest->s_type.LA.s_array_size)
558 dest->s_type.LA.s_array = safe_realloc(dest->s_type.LA.s_array,
559 sizeof (line_rec_t *) * i);
560 dest->s_type.LA.s_array_size = i;
562 i = 0;
563 while (i < dest->s_type.LA.s_array_size) {
564 dest->s_type.LA.s_array[i] = l_series;
565 l_series++;
566 i++;
570 * LINES_ARRAY streams are always open.
572 stream_set(dest, STREAM_OPEN);
574 return (return_val);
578 * stream_swap_buffer() exchanges the stream's buffer with the proffered one;
579 * s_current is not adjusted so this is safe only for STREAM_INSTANT.
581 void
582 stream_swap_buffer(stream_t *str, char **buf, size_t *size)
584 void *tb = *buf;
585 size_t ts = *size;
587 *buf = str->s_buffer;
588 *size = str->s_buffer_size;
590 str->s_buffer = tb;
591 str->s_buffer_size = ts;