4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License, Version 1.0 only
6 * (the "License"). You may not use this file except in compliance
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or http://www.opensolaris.org/os/licensing.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
27 #pragma ident "%Z%%M% %I% %E% SMI"
29 #include "streams_stdio.h"
30 #include "streams_common.h"
32 #define SHELF_OCCUPIED 1
33 #define SHELF_VACANT 0
34 static int shelf
= SHELF_VACANT
;
37 * Single-byte character file i/o-based streams implementation
39 * The routines in this file contain the implementation of the i/o streams
40 * interface for those situations where the input is via stdio.
43 * In the case where the input buffer contains insufficient room to hold the
44 * entire line, the fractional line is shelved, and will be grafted to on the
48 stream_stdio_open_for_write(stream_t
*str
)
50 stream_simple_file_t
*SF
= &(str
->s_type
.SF
);
52 ASSERT(!(str
->s_status
& STREAM_OPEN
));
53 ASSERT(!(str
->s_status
& STREAM_OUTPUT
));
55 if (str
->s_status
& STREAM_NOTFILE
)
56 SF
->s_fd
= fileno(stdout
);
58 if ((SF
->s_fd
= open(str
->s_filename
, O_CREAT
| O_TRUNC
|
59 O_WRONLY
, OUTPUT_MODE
)) < 0) {
60 if (errno
== EMFILE
|| errno
== ENFILE
)
63 die(EMSG_OPEN
, str
->s_filename
);
66 stream_set(str
, STREAM_OPEN
| STREAM_OUTPUT
);
72 * In the case of an instantaneous stream, we allocate a small buffer (64k) here
73 * for the stream; otherwise, the s_buffer and s_buffer_size members should have
74 * been set by stream_set_size() prior to calling stream_prime().
76 * Repriming (priming an already primed stream) is done when we are reentering a
77 * file after having sorted a previous portion of the file.
80 stream_stdio_prime(stream_t
*str
)
82 stream_buffered_file_t
*BF
= &(str
->s_type
.BF
);
83 char *current_position
;
87 ASSERT(!(str
->s_status
& STREAM_OUTPUT
));
88 ASSERT(str
->s_status
& (STREAM_SINGLE
| STREAM_WIDE
));
89 ASSERT(str
->s_status
& STREAM_OPEN
);
91 if (str
->s_status
& STREAM_INSTANT
&& (str
->s_buffer
== NULL
)) {
92 str
->s_buffer
= xzmap(0, STDIO_VBUF_SIZE
, PROT_READ
|
93 PROT_WRITE
, MAP_PRIVATE
, 0);
94 if (str
->s_buffer
== MAP_FAILED
)
96 str
->s_buffer_size
= STDIO_VBUF_SIZE
;
99 ASSERT(str
->s_buffer
!= NULL
);
101 if (stream_is_primed(str
)) {
103 * l_data_length is only set to -1 in the case of coincidental
104 * exhaustion of the input butter. This is thus the only case
105 * which involves no copying on a re-prime.
107 int shelf_state
= shelf
;
109 ASSERT(str
->s_current
.l_data_length
>= -1);
110 (void) memcpy(str
->s_buffer
, str
->s_current
.l_data
.sp
,
111 str
->s_current
.l_data_length
+ 1);
112 str
->s_current
.l_data
.sp
= str
->s_buffer
;
115 * If our current line is incomplete, we need to get the rest of
116 * the line--if we can't, then we've exhausted memory.
118 if ((str
->s_current
.l_data_length
== -1 ||
119 shelf_state
== SHELF_OCCUPIED
||
120 *(str
->s_current
.l_data
.sp
+
121 str
->s_current
.l_data_length
) != '\n') &&
122 SOP_FETCH(str
) == NEXT_LINE_INCOMPLETE
&&
123 shelf_state
== SHELF_OCCUPIED
)
126 str
->s_current
.l_collate
.sp
= NULL
;
127 str
->s_current
.l_collate_length
= 0;
129 return (PRIME_SUCCEEDED
);
132 stream_set(str
, STREAM_PRIMED
);
134 current_position
= (char *)str
->s_buffer
;
135 end_of_buffer
= (char *)str
->s_buffer
+ str
->s_buffer_size
;
139 (void) fgets(current_position
, end_of_buffer
- current_position
,
142 stream_set(str
, STREAM_EOS_REACHED
);
143 stream_unset(str
, STREAM_PRIMED
);
144 return (PRIME_FAILED_EMPTY_FILE
);
147 str
->s_current
.l_data
.sp
= current_position
;
149 * Because one might run sort on a binary file, strlen() is no longer
150 * trustworthy--we must explicitly search for a newline.
152 if ((next_nl
= memchr(current_position
, '\n',
153 end_of_buffer
- current_position
)) == NULL
) {
154 warn(WMSG_NEWLINE_ADDED
, str
->s_filename
);
155 str
->s_current
.l_data_length
= MIN(strlen(current_position
),
156 end_of_buffer
- current_position
);
158 str
->s_current
.l_data_length
= next_nl
- current_position
;
161 str
->s_current
.l_collate
.sp
= NULL
;
162 str
->s_current
.l_collate_length
= 0;
164 __S(stats_incr_fetches());
165 return (PRIME_SUCCEEDED
);
169 * stream_stdio_fetch() guarantees the return of a complete line, or a flag
170 * indicating that the complete line could not be read.
173 stream_stdio_fetch(stream_t
*str
)
175 ssize_t dist_to_buf_end
;
177 char *graft_pt
, *next_nl
;
179 ASSERT(str
->s_status
& STREAM_OPEN
);
180 ASSERT(str
->s_status
& (STREAM_SINGLE
| STREAM_WIDE
));
181 ASSERT((str
->s_status
& STREAM_EOS_REACHED
) == 0);
183 graft_pt
= str
->s_current
.l_data
.sp
+ str
->s_current
.l_data_length
+ 1;
185 if (shelf
== SHELF_VACANT
) {
187 * The graft point is the start of the current line.
189 str
->s_current
.l_data
.sp
= graft_pt
;
190 } else if (str
->s_current
.l_data_length
> -1) {
192 * Correct for terminating NUL on shelved line. This NUL is
193 * only present if we didn't have the coincidental case
194 * mentioned in the comment below.
199 dist_to_buf_end
= str
->s_buffer_size
- (graft_pt
-
200 (char *)str
->s_buffer
);
202 if (dist_to_buf_end
<= 1) {
204 * fgets()'s behaviour in the case of a one-character buffer is
205 * somewhat unhelpful: it fills the buffer with '\0' and
206 * returns successfully (even if EOF has been reached for the
207 * file in question). Since we may be in the middle of a
208 * grafting operation, we leave early, maintaining the shelf in
211 str
->s_current
.l_data_length
= -1;
212 return (NEXT_LINE_INCOMPLETE
);
215 if (fgets(graft_pt
, dist_to_buf_end
, str
->s_type
.BF
.s_fp
) == NULL
) {
216 if (feof(str
->s_type
.BF
.s_fp
))
217 stream_set(str
, STREAM_EOS_REACHED
);
219 die(EMSG_READ
, str
->s_filename
);
222 trip_eof(str
->s_type
.BF
.s_fp
);
224 * Because one might run sort on a binary file, strlen() is no longer
225 * trustworthy--we must explicitly search for a newline.
227 if ((next_nl
= memchr(str
->s_current
.l_data
.sp
, '\n',
228 dist_to_buf_end
)) == NULL
) {
229 str
->s_current
.l_data_length
= strlen(str
->s_current
.l_data
.sp
);
231 str
->s_current
.l_data_length
= next_nl
-
232 str
->s_current
.l_data
.sp
;
235 str
->s_current
.l_collate_length
= 0;
237 if (*(str
->s_current
.l_data
.sp
+ str
->s_current
.l_data_length
) !=
239 if (!feof(str
->s_type
.BF
.s_fp
)) {
241 * We were only able to read part of the line; note that
242 * we have something on the shelf for our next fetch.
243 * If the shelf was previously occupied, and we still
244 * can't get the entire line, then we need more
247 if (shelf
== SHELF_OCCUPIED
)
250 shelf
= SHELF_OCCUPIED
;
251 ret_val
= NEXT_LINE_INCOMPLETE
;
253 __S(stats_incr_shelves());
255 stream_set(str
, STREAM_EOS_REACHED
);
256 warn(WMSG_NEWLINE_ADDED
, str
->s_filename
);
259 shelf
= SHELF_VACANT
;
260 ret_val
= NEXT_LINE_COMPLETE
;
261 __S(stats_incr_fetches());
268 * stdio_fetch_overwrite() is used when we are performing an operation where we
269 * need the buffer contents only over a single period. (merge and check are
270 * operations of this kind.) In this case, we read the current line at the head
271 * of the stream's defined buffer. If we cannot read the entire line, we have
272 * not allocated sufficient memory.
275 stream_stdio_fetch_overwrite(stream_t
*str
)
277 ssize_t dist_to_buf_end
;
279 ASSERT(str
->s_status
& STREAM_OPEN
);
280 ASSERT(str
->s_status
& (STREAM_SINGLE
| STREAM_WIDE
));
281 ASSERT((str
->s_status
& STREAM_EOS_REACHED
) == 0);
283 str
->s_current
.l_data
.sp
= str
->s_buffer
;
284 dist_to_buf_end
= str
->s_buffer_size
;
286 if (fgets(str
->s_current
.l_data
.sp
, dist_to_buf_end
,
287 str
->s_type
.BF
.s_fp
) == NULL
) {
288 if (feof(str
->s_type
.BF
.s_fp
))
289 stream_set(str
, STREAM_EOS_REACHED
);
291 die(EMSG_READ
, str
->s_filename
);
294 trip_eof(str
->s_type
.BF
.s_fp
);
295 str
->s_current
.l_data_length
= strlen(str
->s_current
.l_data
.sp
) - 1;
296 str
->s_current
.l_collate_length
= 0;
298 if (str
->s_current
.l_data_length
== -1 ||
299 *(str
->s_current
.l_data
.sp
+ str
->s_current
.l_data_length
) !=
301 if (!feof(str
->s_type
.BF
.s_fp
)) {
303 * In the overwrite case, failure to read the entire
304 * line means our buffer size was insufficient (as we
305 * are using all of it). Exit, requesting more
310 stream_set(str
, STREAM_EOS_REACHED
);
311 warn(WMSG_NEWLINE_ADDED
, str
->s_filename
);
315 __S(stats_incr_fetches());
316 return (NEXT_LINE_COMPLETE
);
320 stream_stdio_is_closable(stream_t
*str
)
322 if (str
->s_status
& STREAM_OPEN
&& !(str
->s_status
& STREAM_NOTFILE
))
328 stream_stdio_close(stream_t
*str
)
330 ASSERT(str
->s_status
& STREAM_OPEN
);
332 if (!(str
->s_status
& STREAM_OUTPUT
)) {
333 if (!(str
->s_status
& STREAM_NOTFILE
))
334 (void) fclose(str
->s_type
.BF
.s_fp
);
336 if (str
->s_type
.BF
.s_vbuf
!= NULL
) {
337 free(str
->s_type
.BF
.s_vbuf
);
338 str
->s_type
.BF
.s_vbuf
= NULL
;
341 if (cxwrite(str
->s_type
.SF
.s_fd
, NULL
, 0) == 0)
342 (void) close(str
->s_type
.SF
.s_fd
);
344 die(EMSG_WRITE
, str
->s_filename
);
347 stream_unset(str
, STREAM_OPEN
| STREAM_PRIMED
| STREAM_OUTPUT
);
352 stream_stdio_send_eol(stream_t
*str
)
354 ASSERT(str
->s_status
& STREAM_OPEN
);
355 ASSERT(str
->s_status
& STREAM_OUTPUT
);
357 if (cxwrite(str
->s_type
.SF
.s_fd
, "\n", 1) < 0)
358 die(EMSG_WRITE
, str
->s_filename
);
362 stream_stdio_flush(stream_t
*str
)
364 ASSERT(str
->s_status
& STREAM_OPEN
);
365 ASSERT(str
->s_status
& STREAM_OUTPUT
);
367 if (cxwrite(str
->s_type
.SF
.s_fd
, NULL
, 0) < 0)
368 die(EMSG_WRITE
, str
->s_filename
);
372 stream_stdio_put_line(stream_t
*str
, line_rec_t
*line
)
374 ASSERT(str
->s_status
& STREAM_OPEN
);
375 ASSERT(str
->s_status
& STREAM_OUTPUT
);
377 if (line
->l_data_length
>= 0) {
378 if (cxwrite(str
->s_type
.SF
.s_fd
, line
->l_data
.sp
,
379 line
->l_data_length
) < 0)
380 die(EMSG_WRITE
, str
->s_filename
);
382 stream_stdio_send_eol(str
);
383 __S(stats_incr_puts());
385 safe_free(line
->l_raw_collate
.sp
);
386 line
->l_raw_collate
.sp
= NULL
;
390 stream_stdio_put_line_unique(stream_t
*str
, line_rec_t
*line
)
392 static line_rec_t pvs
;
393 static size_t collate_buf_len
;
395 ASSERT(str
->s_status
& STREAM_OPEN
);
396 ASSERT(str
->s_status
& STREAM_OUTPUT
);
398 if (pvs
.l_collate
.sp
!= NULL
&&
399 collated(&pvs
, line
, 0, COLL_UNIQUE
) == 0) {
400 __S(stats_incr_not_unique());
404 __S(stats_incr_put_unique());
405 stream_stdio_put_line(str
, line
);
407 if (line
->l_collate_length
+ 1 > collate_buf_len
) {
408 pvs
.l_collate
.sp
= safe_realloc(pvs
.l_collate
.sp
,
409 line
->l_collate_length
+ 1);
410 collate_buf_len
= line
->l_collate_length
+ 1;
413 (void) memcpy(pvs
.l_collate
.sp
, line
->l_collate
.sp
,
414 line
->l_collate_length
);
415 *(pvs
.l_collate
.sp
+ line
->l_collate_length
) = '\0';
416 pvs
.l_collate_length
= line
->l_collate_length
;
420 stream_stdio_unlink(stream_t
*str
)
422 if (!(str
->s_status
& STREAM_NOTFILE
))
423 return (unlink(str
->s_filename
));
429 stream_stdio_free(stream_t
*str
)
432 * Unmap the memory we allocated for input, if it's valid to do so.
434 if (!(str
->s_status
& STREAM_OPEN
) ||
435 (str
->s_consumer
!= NULL
&&
436 str
->s_consumer
->s_status
& STREAM_NOT_FREEABLE
))
439 if (str
->s_buffer
!= NULL
) {
440 if (munmap(str
->s_buffer
, str
->s_buffer_size
) < 0)
441 die(EMSG_MUNMAP
, "/dev/zero");
443 str
->s_buffer
= NULL
;
444 str
->s_buffer_size
= 0;
448 stream_unset(str
, STREAM_PRIMED
| STREAM_INSTANT
);
454 stream_stdio_eos(stream_t
*str
)
458 ASSERT(!(str
->s_status
& STREAM_OUTPUT
));
459 ASSERT(str
->s_status
& (STREAM_SINGLE
| STREAM_WIDE
));
461 if (str
== NULL
|| str
->s_status
& STREAM_EOS_REACHED
)
464 trip_eof(str
->s_type
.BF
.s_fp
);
465 if (feof(str
->s_type
.BF
.s_fp
) &&
466 shelf
== SHELF_VACANT
&&
467 str
->s_current
.l_collate_length
!= -1) {
469 stream_set(str
, STREAM_EOS_REACHED
);
477 stream_stdio_release_line(stream_t
*str
)
481 const stream_ops_t stream_stdio_ops
= {
482 stream_stdio_is_closable
,
488 stream_stdio_open_for_write
,
490 stream_stdio_put_line
,
491 stream_stdio_release_line
,
492 stream_stdio_send_eol
,