1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2009, 2011, 2012 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "math/sort.h"
23 #include "data/case.h"
24 #include "data/casereader.h"
25 #include "data/casewriter-provider.h"
26 #include "data/casewriter.h"
27 #include "data/settings.h"
28 #include "data/subcase.h"
29 #include "libpspp/array.h"
30 #include "libpspp/assertion.h"
31 #include "math/merge.h"
33 #include "gl/xalloc.h"
36 #define _(msgid) gettext (msgid)
38 /* These should only be changed for testing purposes. */
40 int max_buffers
= INT_MAX
;
44 struct caseproto
*proto
;
45 struct subcase ordering
;
47 struct pqueue
*pqueue
;
49 struct casewriter
*run
;
51 struct ccase
*run_end
;
54 static struct casewriter_class sort_casewriter_class
;
56 static struct pqueue
*pqueue_create (const struct subcase
*,
57 const struct caseproto
*);
58 static void pqueue_destroy (struct pqueue
*);
59 static bool pqueue_is_full (const struct pqueue
*);
60 static bool pqueue_is_empty (const struct pqueue
*);
61 static void pqueue_push (struct pqueue
*, struct ccase
*, casenumber
);
62 static struct ccase
*pqueue_pop (struct pqueue
*, casenumber
*);
64 static void output_record (struct sort_writer
*);
67 sort_create_writer (const struct subcase
*ordering
,
68 const struct caseproto
*proto
)
70 struct sort_writer
*sort
;
72 sort
= xmalloc (sizeof *sort
);
73 sort
->proto
= caseproto_ref (proto
);
74 subcase_clone (&sort
->ordering
, ordering
);
75 sort
->merge
= merge_create (ordering
, proto
);
76 sort
->pqueue
= pqueue_create (ordering
, proto
);
81 return casewriter_create (proto
, &sort_casewriter_class
, sort
);
85 sort_casewriter_write (struct casewriter
*writer UNUSED
, void *sort_
,
88 struct sort_writer
*sort
= sort_
;
91 if (pqueue_is_full (sort
->pqueue
))
94 next_run
= (sort
->run_end
== NULL
95 || subcase_compare_3way (&sort
->ordering
, c
,
96 &sort
->ordering
, sort
->run_end
) < 0);
97 pqueue_push (sort
->pqueue
, c
, sort
->run_id
+ (next_run
? 1 : 0));
101 sort_casewriter_destroy (struct casewriter
*writer UNUSED
, void *sort_
)
103 struct sort_writer
*sort
= sort_
;
105 subcase_destroy (&sort
->ordering
);
106 merge_destroy (sort
->merge
);
107 pqueue_destroy (sort
->pqueue
);
108 casewriter_destroy (sort
->run
);
109 case_unref (sort
->run_end
);
110 caseproto_unref (sort
->proto
);
114 static struct casereader
*
115 sort_casewriter_convert_to_reader (struct casewriter
*writer
, void *sort_
)
117 struct sort_writer
*sort
= sort_
;
118 struct casereader
*output
;
120 if (sort
->run
== NULL
&& sort
->run_id
== 0)
123 sort
->run
= mem_writer_create (sort
->proto
);
126 while (!pqueue_is_empty (sort
->pqueue
))
127 output_record (sort
);
129 merge_append (sort
->merge
, casewriter_make_reader (sort
->run
));
132 output
= merge_make_reader (sort
->merge
);
133 sort_casewriter_destroy (writer
, sort
);
138 output_record (struct sort_writer
*sort
)
140 struct ccase
*min_case
;
141 casenumber min_run_id
;
143 min_case
= pqueue_pop (sort
->pqueue
, &min_run_id
);
145 printf ("\toutput: %f to run %d\n", case_num_idx (min_case
, 0), min_run_id
);
148 if (sort
->run_id
!= min_run_id
&& sort
->run
!= NULL
)
150 merge_append (sort
->merge
, casewriter_make_reader (sort
->run
));
153 if (sort
->run
== NULL
)
155 sort
->run
= tmpfile_writer_create (sort
->proto
);
156 sort
->run_id
= min_run_id
;
159 case_unref (sort
->run_end
);
160 sort
->run_end
= case_ref (min_case
);
161 casewriter_write (sort
->run
, min_case
);
164 static struct casewriter_class sort_casewriter_class
=
166 sort_casewriter_write
,
167 sort_casewriter_destroy
,
168 sort_casewriter_convert_to_reader
,
171 /* Reads all the cases from INPUT. Sorts the cases according to
172 ORDERING. Returns the sorted cases in a new casereader.
173 INPUT is destroyed by this function.
176 sort_execute (struct casereader
*input
, const struct subcase
*ordering
)
178 struct casewriter
*output
=
179 sort_create_writer (ordering
, casereader_get_proto (input
));
180 casereader_transfer (input
, output
);
181 return casewriter_make_reader (output
);
184 /* Reads all the cases from INPUT. Sorts the cases in ascending
185 order according to VARIABLE. Returns the sorted cases in a
186 new casereader. INPUT is destroyed by this function. */
188 sort_execute_1var (struct casereader
*input
, const struct variable
*var
)
191 struct casereader
*reader
;
193 subcase_init_var (&sc
, var
, SC_ASCEND
);
194 reader
= sort_execute (input
, &sc
);
195 subcase_destroy (&sc
);
201 struct subcase ordering
;
202 struct pqueue_record
*records
;
203 size_t record_cnt
; /* Current number of records. */
204 size_t record_cap
; /* Space currently allocated for records. */
205 size_t record_max
; /* Max space we are willing to allocate. */
216 static int compare_pqueue_records_minheap (const void *a
, const void *b
,
219 static struct pqueue
*
220 pqueue_create (const struct subcase
*ordering
, const struct caseproto
*proto
)
224 pq
= xmalloc (sizeof *pq
);
225 subcase_clone (&pq
->ordering
, ordering
);
226 pq
->record_max
= settings_get_workspace_cases (proto
);
227 if (pq
->record_max
> max_buffers
)
228 pq
->record_max
= max_buffers
;
229 else if (pq
->record_max
< min_buffers
)
230 pq
->record_max
= min_buffers
;
240 pqueue_destroy (struct pqueue
*pq
)
244 while (!pqueue_is_empty (pq
))
247 struct ccase
*c
= pqueue_pop (pq
, &id
);
250 subcase_destroy (&pq
->ordering
);
257 pqueue_is_full (const struct pqueue
*pq
)
259 return pq
->record_cnt
>= pq
->record_max
;
263 pqueue_is_empty (const struct pqueue
*pq
)
265 return pq
->record_cnt
== 0;
269 pqueue_push (struct pqueue
*pq
, struct ccase
*c
, casenumber id
)
271 struct pqueue_record
*r
;
273 assert (!pqueue_is_full (pq
));
275 if (pq
->record_cnt
>= pq
->record_cap
)
277 pq
->record_cap
= pq
->record_cap
* 2;
278 if (pq
->record_cap
< 16)
280 else if (pq
->record_cap
> pq
->record_max
)
281 pq
->record_cap
= pq
->record_max
;
282 pq
->records
= xrealloc (pq
->records
,
283 pq
->record_cap
* sizeof *pq
->records
);
286 r
= &pq
->records
[pq
->record_cnt
++];
291 push_heap (pq
->records
, pq
->record_cnt
, sizeof *pq
->records
,
292 compare_pqueue_records_minheap
, pq
);
295 static struct ccase
*
296 pqueue_pop (struct pqueue
*pq
, casenumber
*id
)
298 struct pqueue_record
*r
;
300 assert (!pqueue_is_empty (pq
));
302 pop_heap (pq
->records
, pq
->record_cnt
--, sizeof *pq
->records
,
303 compare_pqueue_records_minheap
, pq
);
305 r
= &pq
->records
[pq
->record_cnt
];
310 /* Compares record-run tuples A and B on id, then on case data,
311 then on insertion order, in descending order. */
313 compare_pqueue_records_minheap (const void *a_
, const void *b_
,
316 const struct pqueue_record
*a
= a_
;
317 const struct pqueue_record
*b
= b_
;
318 const struct pqueue
*pq
= pq_
;
319 int result
= a
->id
< b
->id
? -1 : a
->id
> b
->id
;
321 result
= subcase_compare_3way (&pq
->ordering
, a
->c
, &pq
->ordering
, b
->c
);
323 result
= a
->idx
< b
->idx
? -1 : a
->idx
> b
->idx
;