1 package ini
.trakem2
.parallel
;
3 import ini
.trakem2
.utils
.Utils
;
5 import java
.util
.Iterator
;
6 import java
.util
.LinkedList
;
7 import java
.util
.concurrent
.Callable
;
8 import java
.util
.concurrent
.ExecutorService
;
9 import java
.util
.concurrent
.Future
;
12 * Like clojure's pmap, given a sequence of inputs obtained from an {@link Iterable},
13 * this {@link Iterable} will apply a function to each input ahead of consumption
14 * in a pool of threads managed by an {@link ExecutorService}.
16 * Does not hold onto the head of the input sequence.
17 * The sequence can be consumed only once, i.e. only a single call to {@link #iterator()} is possible.
19 * This class ought to be an {@link Iterator} rather than an {@link Iterable},
20 * but it is an {@link Iterable} for convenience, so that the for loop construct works.
22 public final class ParallelMapping
<I
,O
> implements Iterable
<O
>
24 final private Iterator
<I
> in
;
25 final private int n_proc
;
26 final private TaskFactory
<I
, O
> generator
;
28 public ParallelMapping(final Iterable
<I
> inputs
, final TaskFactory
<I
,O
> generator
) {
29 this(Runtime
.getRuntime().availableProcessors(), inputs
.iterator(), generator
);
32 public ParallelMapping(final Iterator
<I
> inputs
, final TaskFactory
<I
,O
> generator
) {
33 this(Runtime
.getRuntime().availableProcessors(), inputs
, generator
);
36 public ParallelMapping(final int n_proc
, final Iterable
<I
> inputs
, final TaskFactory
<I
,O
> generator
) {
37 this(n_proc
, inputs
.iterator(), generator
);
42 * @param n_proc Number of threads.
43 * @param inputs The sequence of inputs.
44 * @param generator The generator of {@link Callable} functions, one per input, each returning one output.
46 public ParallelMapping(final int n_proc
, final Iterator
<I
> inputs
, final TaskFactory
<I
,O
> generator
) {
48 this.n_proc
= Process
.sensible(n_proc
);
49 this.generator
= generator
;
53 public Iterator
<O
> iterator() {
54 // Check whether the inputs where already consumed
55 if (!in
.hasNext()) return null;
57 final ExecutorService exec
= Utils
.newFixedThreadPool(n_proc
, ParallelMapping
.class.getSimpleName());
58 final LinkedList
<Future
<O
>> futures
= new LinkedList
<Future
<O
>>();
60 return new Iterator
<O
>() {
63 public boolean hasNext() {
64 final boolean b
= !futures
.isEmpty() || in
.hasNext();
73 if (futures
.size() < n_proc
/ 2) {
74 for (int i
=0; i
<n_proc
; ++i
) {
79 futures
.add(exec
.submit(generator
.create(in
.next())));
83 return futures
.removeFirst().get();
84 } catch (Exception e
) {
85 throw new RuntimeException(e
);
90 public void remove() {
91 throw new UnsupportedOperationException();