@@ -11,6 +11,7 @@ |
*/ |
package csip.utils; |
|
+import java.lang.reflect.Array; |
import java.util.ArrayList; |
import java.util.Collection; |
import java.util.List; |
@@ -173,4 +174,63 @@ |
es.shutdown(); |
} |
} |
+ |
+ /** |
+ * Execute runs in parallel. |
+ * |
+ * @param <V> The result type of the operations |
+ * @param nthreads number of threads to use |
+ * @param ret the base type of the return array |
+ * @param count total number of factory calls |
+ * @param factory the factory that creates the runs (index (0..count-1) -> |
+ * Run) |
+ * @return the result array of type 'ret' |
+ * @throws Exception the first exception that happens in a run.. |
+ */ |
+ public static <V> V[] run(int nthreads, Class<V> ret, int count, Function<Integer, Callable<V>> factory) throws Exception { |
+ if (count < 1) |
+ throw new IllegalArgumentException("Nothing to run."); |
+ |
+ if (nthreads < 1) |
+ throw new IllegalArgumentException("invalid # of threads: " + nthreads); |
+ |
+ if (ret == null) |
+ throw new NullPointerException("return type 'ret' is null"); |
+ |
+ int nt = Math.min(nthreads, count); |
+ ExecutorService es = Executors.newFixedThreadPool(nt); |
+ |
+ List<Future<Exception>> l = new ArrayList<>(); |
+ @SuppressWarnings("unchecked") |
+ V[] r = (V[]) Array.newInstance(ret, count); |
+ |
+ // submission of new runs. |
+ for (int i = 0; i < count; i++) { |
+ final Callable<V> c = factory.apply(i); |
+ if (c == null) |
+ break; // the creation of the run failed, done here |
+ |
+ final int fi = i; |
+ l.add(es.submit((Callable<Exception>) () -> { |
+ try { |
+ r[fi] = c.call(); |
+ } catch (Exception E) { |
+ return E; |
+ } |
+ return null; |
+ })); |
+ } |
+ // pull the Futures, check for exceptions, throw the first one |
+ try { |
+ for (Future<Exception> result : l) { |
+ if (result.get() != null) |
+ throw result.get(); |
+ } |
+ } catch (InterruptedException ex) { |
+ throw new Exception(ex); |
+ } finally { |
+ es.shutdown(); |
+ } |
+ return r; |
+ } |
} |