4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
22 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
26 #pragma ident "%Z%%M% %I% %E% SMI"
30 static const stream_ops_t invalid_ops
= {
46 stream_t
*str
= safe_realloc(NULL
, sizeof (stream_t
));
55 stream_set(stream_t
*str
, flag_t flags
)
57 if (flags
& STREAM_SOURCE_MASK
) {
58 ASSERT((flags
& STREAM_SOURCE_MASK
) == STREAM_ARRAY
||
59 (flags
& STREAM_SOURCE_MASK
) == STREAM_SINGLE
||
60 (flags
& STREAM_SOURCE_MASK
) == STREAM_MMAP
||
61 (flags
& STREAM_SOURCE_MASK
) == STREAM_WIDE
);
63 str
->s_status
&= ~STREAM_SOURCE_MASK
;
64 str
->s_status
|= flags
& STREAM_SOURCE_MASK
;
66 switch (flags
& STREAM_SOURCE_MASK
) {
67 case STREAM_NO_SOURCE
:
68 str
->s_element_size
= 0;
69 str
->s_ops
= invalid_ops
;
73 * Array streams inherit element size.
75 str
->s_ops
= stream_array_ops
;
78 str
->s_element_size
= sizeof (char);
79 str
->s_ops
= stream_mmap_ops
;
82 str
->s_element_size
= sizeof (char);
83 str
->s_ops
= stream_stdio_ops
;
86 str
->s_element_size
= sizeof (wchar_t);
87 str
->s_ops
= stream_wide_ops
;
90 die(EMSG_UNKN_STREAM
, str
->s_status
);
94 str
->s_status
|= (flags
& ~STREAM_SOURCE_MASK
);
96 if (str
->s_status
& STREAM_UNIQUE
)
97 switch (str
->s_status
& STREAM_SOURCE_MASK
) {
99 str
->s_ops
.sop_put_line
=
100 stream_stdio_put_line_unique
;
103 str
->s_ops
.sop_put_line
=
104 stream_wide_put_line_unique
;
110 if (str
->s_status
& STREAM_INSTANT
)
111 switch (str
->s_status
& STREAM_SOURCE_MASK
) {
113 str
->s_ops
.sop_fetch
=
114 stream_stdio_fetch_overwrite
;
117 str
->s_ops
.sop_fetch
=
118 stream_wide_fetch_overwrite
;
126 stream_unset(stream_t
*streamp
, flag_t flags
)
128 ASSERT(!(flags
& STREAM_SOURCE_MASK
));
130 streamp
->s_status
&= ~(flags
& ~STREAM_SOURCE_MASK
);
134 stream_is_primed(stream_t
*streamp
)
136 return (streamp
->s_status
& STREAM_PRIMED
);
140 stream_clear(stream_t
*str
)
142 (void) memset(str
, 0, sizeof (stream_t
));
146 stream_copy(stream_t
*dest
, stream_t
*src
)
148 (void) memcpy(dest
, src
, sizeof (stream_t
));
152 stream_stat_chain(stream_t
*strp
)
155 stream_t
*cur_strp
= strp
;
157 while (cur_strp
!= NULL
) {
158 if (cur_strp
->s_status
& STREAM_NOTFILE
||
159 cur_strp
->s_status
& STREAM_ARRAY
) {
160 cur_strp
= cur_strp
->s_next
;
164 if (stat(cur_strp
->s_filename
, &buf
) < 0)
165 die(EMSG_STAT
, cur_strp
->s_filename
);
167 cur_strp
->s_dev
= buf
.st_dev
;
168 cur_strp
->s_ino
= buf
.st_ino
;
169 cur_strp
->s_filesize
= buf
.st_size
;
171 cur_strp
= cur_strp
->s_next
;
176 stream_count_chain(stream_t
*str
)
180 while (str
!= NULL
) {
189 stream_open_for_read(sort_t
*S
, stream_t
*str
)
193 ASSERT(!(str
->s_status
& STREAM_OUTPUT
));
196 * STREAM_ARRAY streams are open by definition.
198 if ((str
->s_status
& STREAM_SOURCE_MASK
) == STREAM_ARRAY
) {
199 stream_set(str
, STREAM_ARRAY
| STREAM_OPEN
);
204 * Set data type according to locale for input from stdin.
206 if (str
->s_status
& STREAM_NOTFILE
) {
207 str
->s_type
.BF
.s_fp
= stdin
;
208 stream_set(str
, STREAM_OPEN
| (S
->m_single_byte_locale
?
209 STREAM_SINGLE
: STREAM_WIDE
));
213 ASSERT(str
->s_filename
);
215 #ifndef DEBUG_DISALLOW_MMAP
216 if (S
->m_single_byte_locale
&&
217 str
->s_filesize
> 0 &&
218 str
->s_filesize
< SSIZE_MAX
) {
220 * make mmap() attempt; set s_status and return if successful
222 fd
= open(str
->s_filename
, O_RDONLY
);
224 if (errno
== EMFILE
|| errno
== ENFILE
)
227 die(EMSG_OPEN
, str
->s_filename
);
229 str
->s_buffer
= mmap(0, str
->s_filesize
, PROT_READ
,
232 if (str
->s_buffer
!= MAP_FAILED
) {
233 str
->s_buffer_size
= str
->s_filesize
;
234 str
->s_type
.SF
.s_fd
= fd
;
236 stream_set(str
, STREAM_MMAP
| STREAM_OPEN
);
237 stream_unset(str
, STREAM_PRIMED
);
242 * Otherwise the mmap() failed due to address space exhaustion;
243 * since we have already opened the file, we close it and drop
244 * into the normal (STDIO) case.
247 str
->s_buffer
= NULL
;
249 #endif /* DEBUG_DISALLOW_MMAP */
251 if ((str
->s_type
.BF
.s_fp
= fopen(str
->s_filename
, "r")) == NULL
) {
252 if (errno
== EMFILE
|| errno
== ENFILE
)
255 die(EMSG_OPEN
, str
->s_filename
);
258 str
->s_type
.BF
.s_vbuf
= safe_realloc(NULL
, STDIO_VBUF_SIZE
);
259 if (setvbuf(str
->s_type
.BF
.s_fp
, str
->s_type
.BF
.s_vbuf
, _IOFBF
,
260 STDIO_VBUF_SIZE
) != 0) {
261 safe_free(str
->s_type
.BF
.s_vbuf
);
262 str
->s_type
.BF
.s_vbuf
= NULL
;
265 stream_set(str
, STREAM_OPEN
| (S
->m_single_byte_locale
? STREAM_SINGLE
:
267 stream_unset(str
, STREAM_PRIMED
);
273 stream_set_size(stream_t
*str
, size_t new_size
)
276 * p_new_size is new_size rounded upwards to nearest multiple of
277 * PAGESIZE, since mmap() is going to reserve it in any case. This
278 * ensures that the far end of the buffer is also aligned, such that we
279 * obtain aligned pointers if we choose to subtract from it.
281 size_t p_new_size
= (new_size
+ PAGESIZE
) & ~(PAGESIZE
- 1);
283 if (str
->s_buffer_size
== p_new_size
)
286 if (str
->s_buffer
!= NULL
)
287 (void) munmap(str
->s_buffer
, str
->s_buffer_size
);
290 str
->s_buffer
= NULL
;
291 str
->s_buffer_size
= 0;
295 str
->s_buffer
= xzmap(0, p_new_size
, PROT_READ
| PROT_WRITE
,
298 if (str
->s_buffer
== MAP_FAILED
)
301 str
->s_buffer_size
= p_new_size
;
305 stream_add_file_to_chain(stream_t
**str_chain
, char *filename
)
309 str
= stream_new(STREAM_NO_SOURCE
);
311 str
->s_filename
= filename
;
312 str
->s_type
.SF
.s_fd
= -1;
314 stream_push_to_chain(str_chain
, str
);
318 stream_push_to_chain(stream_t
**str_chain
, stream_t
*streamp
)
320 stream_t
*cur_streamp
= *str_chain
;
322 if (cur_streamp
== NULL
) {
323 *str_chain
= streamp
;
324 streamp
->s_next
= NULL
;
328 while (cur_streamp
->s_next
!= NULL
)
329 cur_streamp
= cur_streamp
->s_next
;
331 cur_streamp
->s_next
= streamp
;
332 streamp
->s_previous
= cur_streamp
;
333 streamp
->s_next
= NULL
;
337 stream_dump(stream_t
*str_in
, stream_t
*str_out
)
339 ASSERT(!(str_in
->s_status
& STREAM_OUTPUT
));
340 ASSERT(str_out
->s_status
& STREAM_OUTPUT
);
342 SOP_PUT_LINE(str_out
, &str_in
->s_current
);
344 while (!SOP_EOS(str_in
)) {
346 SOP_PUT_LINE(str_out
, &str_in
->s_current
);
351 * stream_push_to_temporary() with flags set to ST_CACHE merely copies the
352 * stream_t pointer onto the chain. With flags set to ST_NOCACHE, the stream is
353 * written out to a file. Stream pointers passed to stream_push_to_temporary()
354 * must refer to allocated objects, and not to objects created on function
355 * stacks. Finally, if strp == NULL, stream_push_to_temporary() creates and
356 * pushes the new stream; the output stream is left open if ST_OPEN is set.
359 stream_push_to_temporary(stream_t
**str_chain
, stream_t
*streamp
, int flags
)
361 stream_t
*out_streamp
;
363 if (flags
& ST_CACHE
) {
364 ASSERT(streamp
->s_status
& STREAM_ARRAY
);
365 stream_set(streamp
, STREAM_NOT_FREEABLE
| STREAM_TEMPORARY
);
366 stream_push_to_chain(str_chain
, streamp
);
370 out_streamp
= safe_realloc(NULL
, sizeof (stream_t
));
372 if (streamp
!= NULL
) {
373 stream_copy(out_streamp
, streamp
);
374 stream_unset(out_streamp
, STREAM_OPEN
);
375 ASSERT(streamp
->s_element_size
== sizeof (char) ||
376 streamp
->s_element_size
== sizeof (wchar_t));
377 stream_set(out_streamp
,
378 streamp
->s_element_size
== 1 ? STREAM_SINGLE
: STREAM_WIDE
);
379 out_streamp
->s_buffer
= NULL
;
380 out_streamp
->s_buffer_size
= 0;
382 stream_clear(out_streamp
);
383 stream_set(out_streamp
, flags
& ST_WIDE
? STREAM_WIDE
:
387 (void) bump_file_template();
388 out_streamp
->s_filename
= strdup(get_file_template());
390 if (SOP_OPEN_FOR_WRITE(out_streamp
) == -1)
393 stream_set(out_streamp
, STREAM_TEMPORARY
);
394 stream_push_to_chain(str_chain
, out_streamp
);
396 if (streamp
!= NULL
) {
398 * We reset the input stream to the beginning, and copy it in
399 * sequence to the output stream, freeing the raw_collate field
402 if (SOP_PRIME(streamp
) != PRIME_SUCCEEDED
)
404 stream_dump(streamp
, out_streamp
);
407 if (!(flags
& ST_OPEN
)) {
408 SOP_FREE(out_streamp
);
409 (void) SOP_CLOSE(out_streamp
);
413 * Now that we've written this stream to disk, we needn't protect any
414 * in-memory consumer.
417 streamp
->s_consumer
= NULL
;
419 return (out_streamp
);
423 stream_close_all_previous(stream_t
*tail_streamp
)
425 stream_t
*cur_streamp
;
427 ASSERT(tail_streamp
!= NULL
);
429 cur_streamp
= tail_streamp
->s_previous
;
430 while (cur_streamp
!= NULL
) {
431 (void) SOP_FREE(cur_streamp
);
432 if (SOP_IS_CLOSABLE(cur_streamp
))
433 (void) SOP_CLOSE(cur_streamp
);
435 cur_streamp
= cur_streamp
->s_previous
;
440 stream_unlink_temporary(stream_t
*streamp
)
442 if (streamp
->s_status
& STREAM_TEMPORARY
) {
443 (void) SOP_FREE(streamp
);
445 if (streamp
->s_ops
.sop_unlink
)
446 (void) SOP_UNLINK(streamp
);
451 * stream_insert() takes input from src stream, converts to each line to
452 * collatable form, and places a line_rec_t in dest stream, which is of type
456 stream_insert(sort_t
*S
, stream_t
*src
, stream_t
*dest
)
458 ssize_t i
= dest
->s_type
.LA
.s_array_size
;
459 line_rec_t
*l_series
;
460 char *l_convert
= dest
->s_buffer
;
461 int return_val
= ST_MEM_AVAIL
;
462 int fetch_result
= NEXT_LINE_COMPLETE
;
465 * Scan through until total bytes allowed accumulated, and return.
466 * Use SOP_FETCH(src) so that this works for all stream types,
467 * and so that we can repeat until eos.
469 * For each new line, we move back sizeof (line_rec_t) from the end of
470 * the array buffer, and copy into the start of the array buffer. When
471 * the pointers meet, or when we exhaust the current stream, we return.
472 * If we have not filled the current memory allocation, we return
473 * ST_MEM_AVAIL, else we return ST_MEM_FILLED.
475 ASSERT(stream_is_primed(src
));
476 ASSERT(dest
->s_status
& STREAM_ARRAY
);
479 l_series
= (line_rec_t
*)((caddr_t
)dest
->s_buffer
480 + dest
->s_buffer_size
) - dest
->s_type
.LA
.s_array_size
;
482 if (dest
->s_type
.LA
.s_array_size
)
483 l_convert
= l_series
->l_collate
.sp
+
484 l_series
->l_collate_length
+ src
->s_element_size
;
487 * current line has been set prior to entry
489 src
->s_current
.l_collate
.sp
= l_convert
;
490 src
->s_current
.l_collate_bufsize
= (caddr_t
)l_series
491 - (caddr_t
)l_convert
- sizeof (line_rec_t
);
492 src
->s_current
.l_raw_collate
.sp
= NULL
;
494 if (src
->s_current
.l_collate_bufsize
<= 0)
495 return (ST_MEM_FILLED
);
497 src
->s_consumer
= dest
;
499 while (src
->s_current
.l_collate_bufsize
> 0 &&
500 (src
->s_current
.l_collate_length
= S
->m_coll_convert(
501 S
->m_fields_head
, &src
->s_current
, FCV_FAIL
,
502 S
->m_field_separator
)) >= 0) {
503 ASSERT((char *)l_series
> l_convert
);
505 l_convert
+= src
->s_current
.l_collate_length
;
507 if ((char *)l_series
<= l_convert
) {
508 __S(stats_incr_insert_filled_downward());
510 return_val
= ST_MEM_FILLED
;
515 * There's no collision with the lower part of the buffer, so we
516 * can safely begin processing the line. In the debug case, we
517 * test for uninitialized data by copying a non-zero pattern.
520 memset(l_series
, 0x1ff11ff1, sizeof (line_rec_t
));
523 copy_line_rec(&src
->s_current
, l_series
);
527 (fetch_result
= SOP_FETCH(src
)) == NEXT_LINE_INCOMPLETE
)
530 src
->s_current
.l_collate
.sp
= l_convert
;
531 src
->s_current
.l_collate_bufsize
= (caddr_t
)l_series
532 - (caddr_t
)l_convert
- sizeof (line_rec_t
);
533 src
->s_current
.l_raw_collate
.sp
= NULL
;
536 if (fetch_result
== NEXT_LINE_INCOMPLETE
) {
537 __S(stats_incr_insert_filled_input());
538 return_val
= ST_MEM_FILLED
;
539 } else if (src
->s_current
.l_collate_length
< 0 ||
540 src
->s_current
.l_collate_bufsize
<= 0) {
541 __S(stats_incr_insert_filled_upward());
542 return_val
= ST_MEM_FILLED
;
545 if (fetch_result
!= NEXT_LINE_INCOMPLETE
&&
546 src
->s_current
.l_collate_length
< 0 &&
549 * There's no room for conversion of our only line; need to
550 * execute with larger memory.
555 * Set up pointer array to line records.
557 if (i
> dest
->s_type
.LA
.s_array_size
)
558 dest
->s_type
.LA
.s_array
= safe_realloc(dest
->s_type
.LA
.s_array
,
559 sizeof (line_rec_t
*) * i
);
560 dest
->s_type
.LA
.s_array_size
= i
;
563 while (i
< dest
->s_type
.LA
.s_array_size
) {
564 dest
->s_type
.LA
.s_array
[i
] = l_series
;
570 * LINES_ARRAY streams are always open.
572 stream_set(dest
, STREAM_OPEN
);
578 * stream_swap_buffer() exchanges the stream's buffer with the proffered one;
579 * s_current is not adjusted so this is safe only for STREAM_INSTANT.
582 stream_swap_buffer(stream_t
*str
, char **buf
, size_t *size
)
587 *buf
= str
->s_buffer
;
588 *size
= str
->s_buffer_size
;
591 str
->s_buffer_size
= ts
;