4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License, Version 1.0 only
6 * (the "License"). You may not use this file except in compliance
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or http://www.opensolaris.org/os/licensing.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
23 * Copyright 1998-2003 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
27 #pragma ident "%Z%%M% %I% %E% SMI"
34 * The following code implements the merge phase of sort(1) using a heap-based
35 * priority queue. Fast paths for merging two files as well as outputting a
36 * single file are provided.
38 * Memory footprint management
40 * The N-way fan-out of the merge phase can lead to compromising memory
41 * consumption if not constrained, so two mechanisms are used to regulate
42 * the memory footprint during the merge phase:
44 * 1. Single use memory advice. Since we proceed through each merge file in
45 * order, any line we have output is never required again--at least, not
46 * from that input file. Accordingly, we use the SOP_RELEASE_LINE()
47 * operation to advise that the memory backing the raw data for the stream
48 * up to that line is no longer of interest. (For certain classes of
49 * streams, this leads to an madvise(3C) call with the MADV_DONTNEED
52 * 2. Number of merge files. The number of merge files is constrained based
53 * on the amount of physical memory specified via the -S option (or deemed
54 * available based on an inquiry of sysconf(3C) for _SC_AVPHYS_PAGES).
55 * The number of merge files is calculated based on the average resident
56 * size of a stream that supports the SOP_RELEASE_LINE() operation; this
57 * number is conservative for streams that do not support this operation.
58 * A minimum of four subfiles will always be used, resource limits
61 * Temporary filespace footprint management
63 * Once the merge sort has utilized a temporary file, it may be deleted at
64 * close, as it's not used again and preserving the files until exit may
65 * compromise sort completion when limited temporary space is available.
69 static stream_t
**pq_queue
;
70 static int (*pq_coll_fcn
)(line_rec_t
*, line_rec_t
*, ssize_t
, flag_t
);
72 static ssize_t (*mg_coll_convert
)(field_t
*, line_rec_t
*, flag_t
, vchar_t
);
75 prepare_output_stream(stream_t
*ostrp
, sort_t
*S
)
78 stream_unset(ostrp
, STREAM_OPEN
);
81 (S
->m_single_byte_locale
? STREAM_SINGLE
: STREAM_WIDE
) |
82 (S
->m_unique_lines
? STREAM_UNIQUE
: 0));
84 if (S
->m_output_to_stdout
) {
85 stream_set(ostrp
, STREAM_NOTFILE
);
86 ostrp
->s_filename
= (char *)filename_stdout
;
88 ostrp
->s_filename
= S
->m_output_filename
;
90 return (SOP_OPEN_FOR_WRITE(ostrp
));
94 merge_one_stream(field_t
*fields_chain
, stream_t
*strp
, stream_t
*outstrp
,
95 vchar_t field_separator
)
97 size_t element_size
= strp
->s_element_size
;
98 size_t initial_size
= INITIAL_COLLATION_SIZE
* element_size
;
100 if (strp
->s_status
& STREAM_SINGLE
|| strp
->s_status
& STREAM_WIDE
)
101 stream_set(strp
, STREAM_INSTANT
);
103 if (SOP_PRIME(strp
) == PRIME_SUCCEEDED
) {
104 strp
->s_current
.l_collate_bufsize
= initial_size
;
105 strp
->s_current
.l_collate
.sp
= safe_realloc(NULL
, initial_size
);
107 (void) mg_coll_convert(fields_chain
, &strp
->s_current
,
108 FCV_REALLOC
, field_separator
);
109 SOP_PUT_LINE(outstrp
, &strp
->s_current
);
110 SOP_RELEASE_LINE(strp
);
112 while (!SOP_EOS(strp
)) {
114 if (strp
->s_current
.l_collate_length
== 0)
115 (void) mg_coll_convert(fields_chain
,
116 &strp
->s_current
, FCV_REALLOC
,
118 SOP_PUT_LINE(outstrp
, &strp
->s_current
);
119 SOP_RELEASE_LINE(strp
);
122 (void) SOP_CLOSE(strp
);
128 merge_two_streams(field_t
*fields_chain
, stream_t
*str_a
, stream_t
*str_b
,
129 stream_t
*outstrp
, vchar_t field_separator
, flag_t coll_flags
)
131 int (*collate_fcn
)(line_rec_t
*, line_rec_t
*, ssize_t
, flag_t
);
132 size_t element_size
= str_a
->s_element_size
;
133 size_t initial_size
= INITIAL_COLLATION_SIZE
* element_size
;
135 ASSERT(str_a
->s_element_size
== str_b
->s_element_size
);
137 if (str_a
->s_element_size
== sizeof (char))
138 collate_fcn
= collated
;
140 collate_fcn
= collated_wide
;
142 if (str_a
->s_status
& STREAM_SINGLE
|| str_a
->s_status
& STREAM_WIDE
)
143 stream_set(str_a
, STREAM_INSTANT
);
144 if (str_b
->s_status
& STREAM_SINGLE
|| str_b
->s_status
& STREAM_WIDE
)
145 stream_set(str_b
, STREAM_INSTANT
);
147 if (SOP_PRIME(str_a
) != PRIME_SUCCEEDED
) {
148 if (SOP_PRIME(str_b
) != PRIME_SUCCEEDED
)
151 merge_one_stream(fields_chain
, str_b
, outstrp
,
156 if (SOP_PRIME(str_b
) != PRIME_SUCCEEDED
) {
157 merge_one_stream(fields_chain
, str_a
, outstrp
,
162 str_a
->s_current
.l_collate_bufsize
=
163 str_b
->s_current
.l_collate_bufsize
= initial_size
;
165 str_a
->s_current
.l_collate
.sp
= safe_realloc(NULL
, initial_size
);
166 str_b
->s_current
.l_collate
.sp
= safe_realloc(NULL
, initial_size
);
168 (void) mg_coll_convert(fields_chain
, &str_a
->s_current
, FCV_REALLOC
,
170 (void) mg_coll_convert(fields_chain
, &str_b
->s_current
, FCV_REALLOC
,
174 if (collate_fcn(&str_a
->s_current
, &str_b
->s_current
, 0,
176 SOP_PUT_LINE(outstrp
, &str_a
->s_current
);
177 SOP_RELEASE_LINE(str_a
);
178 if (SOP_EOS(str_a
)) {
179 (void) SOP_CLOSE(str_a
);
184 if (str_a
->s_current
.l_collate_length
!= 0)
186 (void) mg_coll_convert(fields_chain
, &str_a
->s_current
,
187 FCV_REALLOC
, field_separator
);
189 SOP_PUT_LINE(outstrp
, &str_b
->s_current
);
190 SOP_RELEASE_LINE(str_b
);
191 if (SOP_EOS(str_b
)) {
196 if (str_b
->s_current
.l_collate_length
!= 0)
198 (void) mg_coll_convert(fields_chain
, &str_b
->s_current
,
199 FCV_REALLOC
, field_separator
);
203 SOP_PUT_LINE(outstrp
, &str_a
->s_current
);
204 SOP_RELEASE_LINE(str_a
);
206 while (!SOP_EOS(str_a
)) {
208 if (str_a
->s_current
.l_collate_length
== 0)
209 (void) mg_coll_convert(fields_chain
, &str_a
->s_current
,
210 FCV_REALLOC
, field_separator
);
211 SOP_PUT_LINE(outstrp
, &str_a
->s_current
);
212 SOP_RELEASE_LINE(str_a
);
215 (void) SOP_CLOSE(str_a
);
220 * priority queue routines
221 * used for merges involving more than two sources
224 heap_up(stream_t
**A
, int k
, flag_t coll_flags
)
227 pq_coll_fcn(&A
[k
/ 2]->s_current
, &A
[k
]->s_current
, 0,
229 swap((void **)&pq_queue
[k
], (void **)&pq_queue
[k
/ 2]);
235 heap_down(stream_t
**A
, int k
, int N
, flag_t coll_flags
)
241 if (j
< N
&& pq_coll_fcn(&A
[j
]->s_current
,
242 &A
[j
+ 1]->s_current
, 0, coll_flags
) > 0)
244 if (pq_coll_fcn(&A
[k
]->s_current
, &A
[j
]->s_current
, 0,
247 swap((void **)&pq_queue
[k
], (void **)&pq_queue
[j
]);
259 pqueue_init(size_t max_size
,
260 int (*coll_fcn
)(line_rec_t
*, line_rec_t
*, ssize_t
, flag_t
))
262 pq_queue
= safe_realloc(NULL
, sizeof (stream_t
*) * (max_size
+ 1));
264 pq_coll_fcn
= coll_fcn
;
268 pqueue_insert(stream_t
*source
, flag_t coll_flags
)
270 pq_queue
[++pq_N
] = source
;
271 heap_up(pq_queue
, pq_N
, coll_flags
);
275 pqueue_head(flag_t coll_flags
)
277 swap((void **)&pq_queue
[1], (void **)&pq_queue
[pq_N
]);
278 heap_down(pq_queue
, 1, pq_N
- 1, coll_flags
);
279 return (pq_queue
[pq_N
--]);
283 merge_n_streams(sort_t
*S
, stream_t
*head_streamp
, int n_streams
,
284 stream_t
*out_streamp
, flag_t coll_flags
)
286 stream_t
*top_streamp
;
287 stream_t
*cur_streamp
;
288 stream_t
*bot_streamp
;
289 stream_t
*loop_out_streamp
;
290 flag_t is_single_byte
= S
->m_single_byte_locale
;
295 threshold_opens
= MAX(4,
296 2 * S
->m_memory_available
/ DEFAULT_RELEASE_SIZE
);
298 pqueue_init(n_streams
, is_single_byte
? collated
: collated_wide
);
300 top_streamp
= bot_streamp
= head_streamp
;
303 hold_file_descriptor();
304 while (bot_streamp
!= NULL
) {
306 if (n_opens
> threshold_opens
||
307 stream_open_for_read(S
, bot_streamp
) == -1) {
309 * Available file descriptors would exceed
310 * memory target or have been exhausted; back
311 * off to the last valid, primed stream.
313 bot_streamp
= bot_streamp
->s_previous
;
317 if (bot_streamp
->s_status
& STREAM_SINGLE
||
318 bot_streamp
->s_status
& STREAM_WIDE
)
319 stream_set(bot_streamp
, STREAM_INSTANT
);
321 bot_streamp
= bot_streamp
->s_next
;
324 release_file_descriptor();
326 if (bot_streamp
== NULL
) {
327 if (prepare_output_stream(out_streamp
, S
) != -1)
328 loop_out_streamp
= out_streamp
;
330 die(EMSG_DESCRIPTORS
);
332 loop_out_streamp
= stream_push_to_temporary(
333 &head_streamp
, NULL
, ST_OPEN
| ST_NOCACHE
|
334 (is_single_byte
? 0 : ST_WIDE
));
336 if (loop_out_streamp
== NULL
||
337 top_streamp
== bot_streamp
)
339 * We need three file descriptors to make
340 * progress; if top_streamp == bot_streamp, then
343 die(EMSG_DESCRIPTORS
);
346 for (cur_streamp
= top_streamp
; cur_streamp
!= bot_streamp
;
347 cur_streamp
= cur_streamp
->s_next
) {
351 if (!(cur_streamp
->s_status
& STREAM_ARRAY
) &&
352 SOP_EOS(cur_streamp
)) {
353 stream_unlink_temporary(cur_streamp
);
358 * Given that stream is not empty, any error in priming
361 if (SOP_PRIME(cur_streamp
) != PRIME_SUCCEEDED
)
364 cur_streamp
->s_current
.l_collate_bufsize
=
365 INITIAL_COLLATION_SIZE
;
366 cur_streamp
->s_current
.l_collate
.sp
=
367 safe_realloc(NULL
, INITIAL_COLLATION_SIZE
);
368 (void) mg_coll_convert(S
->m_fields_head
,
369 &cur_streamp
->s_current
, FCV_REALLOC
,
370 S
->m_field_separator
);
372 pqueue_insert(cur_streamp
, coll_flags
);
375 while (!pqueue_empty()) {
376 cur_streamp
= pqueue_head(coll_flags
);
378 SOP_PUT_LINE(loop_out_streamp
, &cur_streamp
->s_current
);
379 SOP_RELEASE_LINE(cur_streamp
);
381 if (!SOP_EOS(cur_streamp
)) {
382 SOP_FETCH(cur_streamp
);
383 (void) mg_coll_convert(S
->m_fields_head
,
384 &cur_streamp
->s_current
, FCV_REALLOC
,
385 S
->m_field_separator
);
386 pqueue_insert(cur_streamp
, coll_flags
);
390 cur_streamp
= top_streamp
;
391 while (cur_streamp
!= bot_streamp
) {
392 if (!(cur_streamp
->s_status
& STREAM_ARRAY
))
393 safe_free(cur_streamp
->s_current
.l_collate
.sp
);
394 cur_streamp
->s_current
.l_collate
.sp
= NULL
;
396 (void) SOP_FREE(cur_streamp
);
397 stream_unlink_temporary(cur_streamp
);
398 (void) SOP_CLOSE(cur_streamp
);
400 cur_streamp
= cur_streamp
->s_next
;
403 (void) SOP_FLUSH(loop_out_streamp
);
405 if (bot_streamp
== NULL
)
408 if (!(loop_out_streamp
->s_status
& STREAM_NOTFILE
)) {
409 (void) SOP_CLOSE(loop_out_streamp
);
411 * Get file size so that we may treat intermediate files
412 * with our stream_mmap facilities.
414 stream_stat_chain(loop_out_streamp
);
415 __S(stats_incr_merge_files());
420 top_streamp
= bot_streamp
;
421 bot_streamp
= bot_streamp
->s_next
;
428 stream_t
*merge_chain
;
429 stream_t
*cur_streamp
;
434 if (S
->m_merge_only
) {
435 merge_chain
= S
->m_input_streams
;
436 set_cleanup_chain(&S
->m_input_streams
);
439 * Otherwise we're inheriting the temporary output files from
442 merge_chain
= S
->m_temporary_streams
;
443 stream_stat_chain(merge_chain
);
444 __S(stats_set_merge_files(stream_count_chain(merge_chain
)));
447 if (S
->m_field_options
& FIELD_REVERSE_COMPARISONS
)
448 coll_flags
= COLL_REVERSE
;
451 if (S
->m_entire_line
)
452 coll_flags
|= COLL_UNIQUE
;
454 n_merges
= stream_count_chain(merge_chain
);
456 mg_coll_convert
= S
->m_coll_convert
;
457 cur_streamp
= merge_chain
;
462 * No files for merge.
464 warn(gettext("no files available to merge\n"));
468 * Fast path: only one file for merge.
470 (void) stream_open_for_read(S
, cur_streamp
);
471 (void) prepare_output_stream(&out_stream
, S
);
472 merge_one_stream(S
->m_fields_head
, cur_streamp
,
473 &out_stream
, S
->m_field_separator
);
477 * Fast path: only two files for merge.
479 (void) stream_open_for_read(S
, cur_streamp
);
480 (void) stream_open_for_read(S
, cur_streamp
->s_next
);
481 if (prepare_output_stream(&out_stream
, S
) == -1)
482 die(EMSG_DESCRIPTORS
);
483 merge_two_streams(S
->m_fields_head
, cur_streamp
,
484 cur_streamp
->s_next
, &out_stream
,
485 S
->m_field_separator
, coll_flags
);
491 merge_n_streams(S
, cur_streamp
, n_merges
, &out_stream
,
496 remove_output_guard();