Use internal SNAPSHOT couplings again
[trakem2.git] / TrakEM2_ / src / main / java / ini / trakem2 / parallel / ParallelMapping.java
blobf8b95a116b42ca2c28c3786707b3cc69663ccb2d
1 package ini.trakem2.parallel;
3 import ini.trakem2.utils.Utils;
5 import java.util.Iterator;
6 import java.util.LinkedList;
7 import java.util.concurrent.Callable;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.Future;
11 /**
12 * Like clojure's pmap, given a sequence of inputs obtained from an {@link Iterable},
13 * this {@link Iterable} will apply a function to each input ahead of consumption
14 * in a pool of threads managed by an {@link ExecutorService}.
16 * Does not hold onto the head of the input sequence.
17 * The sequence can be consumed only once, i.e. only a single call to {@link #iterator()} is possible.
19 * This class ought to be an {@link Iterator} rather than an {@link Iterable},
20 * but it is an {@link Iterable} for convenience, so that the for loop construct works.
22 public final class ParallelMapping<I,O> implements Iterable<O>
24 final private Iterator<I> in;
25 final private int n_proc;
26 final private TaskFactory<I, O> generator;
28 public ParallelMapping(final Iterable<I> inputs, final TaskFactory<I,O> generator) {
29 this(Runtime.getRuntime().availableProcessors(), inputs.iterator(), generator);
32 public ParallelMapping(final Iterator<I> inputs, final TaskFactory<I,O> generator) {
33 this(Runtime.getRuntime().availableProcessors(), inputs, generator);
36 public ParallelMapping(final int n_proc, final Iterable<I> inputs, final TaskFactory<I,O> generator) {
37 this(n_proc, inputs.iterator(), generator);
40 /**
42 * @param n_proc Number of threads.
43 * @param inputs The sequence of inputs.
44 * @param generator The generator of {@link Callable} functions, one per input, each returning one output.
46 public ParallelMapping(final int n_proc, final Iterator<I> inputs, final TaskFactory<I,O> generator) {
47 this.in = inputs;
48 this.n_proc = Process.sensible(n_proc);
49 this.generator = generator;
52 @Override
53 public Iterator<O> iterator() {
54 // Check whether the inputs where already consumed
55 if (!in.hasNext()) return null;
57 final ExecutorService exec = Utils.newFixedThreadPool(n_proc, ParallelMapping.class.getSimpleName());
58 final LinkedList<Future<O>> futures = new LinkedList<Future<O>>();
60 return new Iterator<O>() {
62 @Override
63 public boolean hasNext() {
64 final boolean b = !futures.isEmpty() || in.hasNext();
65 if (!b) {
66 exec.shutdown();
68 return b;
71 @Override
72 public O next() {
73 if (futures.size() < n_proc / 2) {
74 for (int i=0; i<n_proc; ++i) {
75 if (!in.hasNext()) {
76 exec.shutdown();
77 break;
79 futures.add(exec.submit(generator.create(in.next())));
82 try {
83 return futures.removeFirst().get();
84 } catch (Exception e) {
85 throw new RuntimeException(e);
89 @Override
90 public void remove() {
91 throw new UnsupportedOperationException();