Use internal SNAPSHOT couplings again
[trakem2.git] / TrakEM2_ / src / main / java / ini / trakem2 / parallel / Process.java
blobe41c3b89bc0f2816eac4b2d5a7391ea30bb865e8
1 package ini.trakem2.parallel;
3 import ini.trakem2.utils.Utils;
5 import java.util.Collection;
6 import java.util.LinkedList;
7 import java.util.concurrent.ExecutorService;
8 import java.util.concurrent.Future;
10 /** For all methods, if the number of processors given as argument is zero or larger than the maximum available plus 2,
11 * the number of processors will be adjusted to fall within the range [1, max+2]. */
12 public class Process {
14 static private final int MIN_AHEAD = 4;
15 static public final int NUM_PROCESSORS = Runtime.getRuntime().availableProcessors();
17 static final int sensible(final int nproc) {
18 return Math.max(1, Math.min(nproc, NUM_PROCESSORS + 2));
21 /** Takes a Collection of inputs, applies a function to each created by the generator,
22 * and places their output in outputs in the same order as each input was retrieved from inputs. */
23 static public final <I,O> void progressive(final Iterable<I> inputs, final TaskFactory<I,O> generator, final Collection<O> outputs) throws Exception {
24 progressive(inputs, generator, outputs, NUM_PROCESSORS);
26 /** Takes a Collection of inputs, applies a function to each created by the generator,
27 * and places their output in outputs in the same order as each input was retrieved from inputs. */
28 static public final <I,O> void progressive(final Iterable<I> inputs, final TaskFactory<I,O> generator, final Collection<O> outputs, final int n_proc) throws Exception {
29 process(inputs, generator, outputs, n_proc, true);
32 /** Takes a Collection of inputs, applies a function to each created by the generator,
33 * and places their output in outputs in the same order as each input was retrieved from inputs;
34 * will not wait for executing tasks to finish before creating and submitting new tasks. */
35 static public final <I,O> void unbound(final Iterable<I> inputs, final TaskFactory<I,O> generator, final Collection<O> outputs, final int n_proc) throws Exception {
36 process(inputs, generator, outputs, n_proc, false);
39 static private final <I,O> void process(final Iterable<I> inputs, final TaskFactory<I,O> generator, final Collection<O> outputs, final int n_proc, final boolean bound) throws Exception {
40 final int nproc = sensible(n_proc);
41 final ExecutorService exec = Utils.newFixedThreadPool(nproc, "Process." + (bound ? "progressive" : "unbound"));
42 try {
43 final LinkedList<Future<O>> fus = new LinkedList<Future<O>>();
44 final int ahead = Math.max(nproc + nproc, MIN_AHEAD);
45 for (final I input : inputs) {
46 if (Thread.currentThread().isInterrupted()) {
47 return;
49 fus.add(exec.submit(generator.create(input)));
50 if (bound) while (fus.size() > ahead) {
51 // wait
52 outputs.add(fus.removeFirst().get());
55 // wait for remaining, if any
56 for (final Future<O> fu : fus) {
57 if (null != fu) outputs.add(fu.get());
58 else outputs.add(null);
60 } finally {
61 exec.shutdown();
65 /** Takes a Collection of inputs, applies a function to each created by the generator. */
66 static public final <I,O> void progressive(final Iterable<I> inputs, final TaskFactory<I,O> generator) throws Exception {
67 progressive(inputs, generator, NUM_PROCESSORS);
69 static public final <I,O> void progressive(final Iterable<I> inputs, final TaskFactory<I,O> generator, final int n_proc) throws Exception {
70 process(inputs, generator, n_proc, true);
72 static public final <I,O> void unbound(final Iterable<I> inputs, final TaskFactory<I,O> generator) throws Exception {
73 unbound(inputs, generator, NUM_PROCESSORS);
75 static public final <I,O> void unbound(final Iterable<I> inputs, final TaskFactory<I,O> generator, final int n_proc) throws Exception {
76 process(inputs, generator, n_proc, false);
78 static private final <I,O> void process(final Iterable<I> inputs, final TaskFactory<I,O> generator, final int n_proc, final boolean bound) throws Exception {
79 final int nproc = sensible(n_proc);
80 final ExecutorService exec = Utils.newFixedThreadPool(nproc, "Process." + (bound ? "progressive" : "unbound"));
81 try {
82 final LinkedList<Future<O>> fus = new LinkedList<Future<O>>();
83 final int ahead = Math.max(nproc + nproc, MIN_AHEAD);
84 for (final I input : inputs) {
85 if (Thread.currentThread().isInterrupted()) {
86 return;
88 fus.add(exec.submit(generator.create(input)));
89 if (bound) while (fus.size() > ahead) {
90 fus.removeFirst().get();
93 for (final Future<O> fu : fus) if (null != fu) fu.get();
94 } finally {
95 exec.shutdown();