1 package ini
.trakem2
.parallel
;
3 import ini
.trakem2
.utils
.Utils
;
5 import java
.util
.Collection
;
6 import java
.util
.LinkedList
;
7 import java
.util
.concurrent
.ExecutorService
;
8 import java
.util
.concurrent
.Future
;
10 /** For all methods, if the number of processors given as argument is zero or larger than the maximum available plus 2,
11 * the number of processors will be adjusted to fall within the range [1, max+2]. */
12 public class Process
{
14 static private final int MIN_AHEAD
= 4;
15 static public final int NUM_PROCESSORS
= Runtime
.getRuntime().availableProcessors();
17 static final int sensible(final int nproc
) {
18 return Math
.max(1, Math
.min(nproc
, NUM_PROCESSORS
+ 2));
21 /** Takes a Collection of inputs, applies a function to each created by the generator,
22 * and places their output in outputs in the same order as each input was retrieved from inputs. */
23 static public final <I
,O
> void progressive(final Iterable
<I
> inputs
, final TaskFactory
<I
,O
> generator
, final Collection
<O
> outputs
) throws Exception
{
24 progressive(inputs
, generator
, outputs
, NUM_PROCESSORS
);
26 /** Takes a Collection of inputs, applies a function to each created by the generator,
27 * and places their output in outputs in the same order as each input was retrieved from inputs. */
28 static public final <I
,O
> void progressive(final Iterable
<I
> inputs
, final TaskFactory
<I
,O
> generator
, final Collection
<O
> outputs
, final int n_proc
) throws Exception
{
29 process(inputs
, generator
, outputs
, n_proc
, true);
32 /** Takes a Collection of inputs, applies a function to each created by the generator,
33 * and places their output in outputs in the same order as each input was retrieved from inputs;
34 * will not wait for executing tasks to finish before creating and submitting new tasks. */
35 static public final <I
,O
> void unbound(final Iterable
<I
> inputs
, final TaskFactory
<I
,O
> generator
, final Collection
<O
> outputs
, final int n_proc
) throws Exception
{
36 process(inputs
, generator
, outputs
, n_proc
, false);
39 static private final <I
,O
> void process(final Iterable
<I
> inputs
, final TaskFactory
<I
,O
> generator
, final Collection
<O
> outputs
, final int n_proc
, final boolean bound
) throws Exception
{
40 final int nproc
= sensible(n_proc
);
41 final ExecutorService exec
= Utils
.newFixedThreadPool(nproc
, "Process." + (bound ?
"progressive" : "unbound"));
43 final LinkedList
<Future
<O
>> fus
= new LinkedList
<Future
<O
>>();
44 final int ahead
= Math
.max(nproc
+ nproc
, MIN_AHEAD
);
45 for (final I input
: inputs
) {
46 if (Thread
.currentThread().isInterrupted()) {
49 fus
.add(exec
.submit(generator
.create(input
)));
50 if (bound
) while (fus
.size() > ahead
) {
52 outputs
.add(fus
.removeFirst().get());
55 // wait for remaining, if any
56 for (final Future
<O
> fu
: fus
) {
57 if (null != fu
) outputs
.add(fu
.get());
58 else outputs
.add(null);
65 /** Takes a Collection of inputs, applies a function to each created by the generator. */
66 static public final <I
,O
> void progressive(final Iterable
<I
> inputs
, final TaskFactory
<I
,O
> generator
) throws Exception
{
67 progressive(inputs
, generator
, NUM_PROCESSORS
);
69 static public final <I
,O
> void progressive(final Iterable
<I
> inputs
, final TaskFactory
<I
,O
> generator
, final int n_proc
) throws Exception
{
70 process(inputs
, generator
, n_proc
, true);
72 static public final <I
,O
> void unbound(final Iterable
<I
> inputs
, final TaskFactory
<I
,O
> generator
) throws Exception
{
73 unbound(inputs
, generator
, NUM_PROCESSORS
);
75 static public final <I
,O
> void unbound(final Iterable
<I
> inputs
, final TaskFactory
<I
,O
> generator
, final int n_proc
) throws Exception
{
76 process(inputs
, generator
, n_proc
, false);
78 static private final <I
,O
> void process(final Iterable
<I
> inputs
, final TaskFactory
<I
,O
> generator
, final int n_proc
, final boolean bound
) throws Exception
{
79 final int nproc
= sensible(n_proc
);
80 final ExecutorService exec
= Utils
.newFixedThreadPool(nproc
, "Process." + (bound ?
"progressive" : "unbound"));
82 final LinkedList
<Future
<O
>> fus
= new LinkedList
<Future
<O
>>();
83 final int ahead
= Math
.max(nproc
+ nproc
, MIN_AHEAD
);
84 for (final I input
: inputs
) {
85 if (Thread
.currentThread().isInterrupted()) {
88 fus
.add(exec
.submit(generator
.create(input
)));
89 if (bound
) while (fus
.size() > ahead
) {
90 fus
.removeFirst().get();
93 for (final Future
<O
> fu
: fus
) if (null != fu
) fu
.get();