Parallel.java [src/csip/utils] Revision: Date:
/*
* $Id: 2.7+51 Parallel.java b6a9acb8879a 2023-08-09 od $
*
* This file is part of the Cloud Services Integration Platform (CSIP),
* a Model-as-a-Service framework, API and application suite.
*
* 2012-2023, Olaf David and others, OMSLab, Colorado State University.
*
* OMSLab licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package csip.utils;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
/**
*
* @author od
*/
public class Parallel {
private Parallel() {
}
@FunctionalInterface
public interface Run {
/**
* Perform the run.
*
* @throws Exception
*/
void run() throws Exception;
}
/**
* Execute all runs in parallel.
*
* @param runs the runs to do in parallel
* @throws Exception
*/
public static void run(Collection<Run> runs) throws Exception {
run(runs.toArray(new Run[0]));
}
/**
* Execute all runs in parallel.
*
* @param runs the runs to execute
* @throws Exception the first exception that happens in a run is propagated.
*/
public static void run(Run... runs) throws Exception {
run(runs.length, runs);
}
/**
* Execute all runs in parallel or serial
*
* @param serial true if all should run in serial, false if parallel
* @param runs the runs to execute
* @throws Exception
*/
public static void run(boolean serial, Run... runs) throws Exception {
run(serial ? 1 : runs.length, runs);
}
/**
* Execute all runs in parallel or all in series
*
* @param serial true if all should run in parallel, false if in sequence
* @param runs the runs to execute
* @throws Exception
*/
public static void run(boolean serial, Collection<Run> runs) throws Exception {
run(serial, runs.toArray(new Run[0]));
}
/**
* Execute all runs in parallel.
*
* @param threads # of threads to use (1..runs.length)
* @param runs the runs to execute
* @throws Exception the first exception that happens in a run is propagated
* here
*/
public static void run(int threads, Run... runs) throws Exception {
if (runs.length == 0)
throw new IllegalArgumentException("Nothing to run.");
if (threads < 1)
throw new IllegalArgumentException("invalid # of threads: " + threads);
ExecutorService es = getES(threads, runs.length);
List<Future<Exception>> l = new ArrayList<>();
for (Run r : runs) {
l.add(es.submit((Callable<Exception>) () -> {
try {
r.run();
} catch (Exception E) {
return E;
}
return null;
}));
}
// get the Futures, check for exceptions, shutdown ES
waitAndShutdown(es, l);
}
/**
* Execute runs in parallel..
*
* @param nthreads number of threads to use
* @param count total number of factory calls
* @param factory the factory that creates the runs (index (0..count-1) ->
* Run)
* @throws Exception the first exception that happens in a run..
*/
public static void run(int nthreads, int count, Function<Integer, Run> factory) throws Exception {
if (count < 1)
throw new IllegalArgumentException("Nothing to run.");
if (nthreads < 1)
throw new IllegalArgumentException("invalid # of threads: " + nthreads);
ExecutorService es = getES(nthreads, count);
List<Future<Exception>> l = new ArrayList<>();
// first thread, submission of new runs.
for (int i = 0; i < count; i++) {
final Run r = factory.apply(i);
if (r == null)
break; // the creation of the run failed, done here
l.add(es.submit((Callable<Exception>) () -> {
try {
r.run();
} catch (Exception E) {
return E;
}
return null;
}));
}
// get the Futures, check for exceptions, shutdown ES
waitAndShutdown(es, l);
}
/**
* Execute runs in parallel.
*
* @param <V> The result type of the operations
* @param nthreads number of threads to use
* @param ret the base type of the return array
* @param count total number of factory calls
* @param factory the factory that creates the runs (index (0..count-1) ->
* Run)
* @return the result array of type 'ret'
* @throws Exception the first exception that happens in a run..
*/
public static <V> V[] run(int nthreads, Class<V> ret, int count, Function<Integer, Callable<V>> factory) throws Exception {
if (count < 1)
throw new IllegalArgumentException("Nothing to run.");
if (nthreads < 1)
throw new IllegalArgumentException("invalid # of threads: " + nthreads);
if (ret == null)
throw new NullPointerException("return type 'ret' is null");
ExecutorService es = getES(nthreads, count);
List<Future<Exception>> l = new ArrayList<>();
@SuppressWarnings("unchecked")
V[] r = (V[]) Array.newInstance(ret, count);
// submission of new runs.
for (int i = 0; i < count; i++) {
final Callable<V> c = factory.apply(i);
if (c == null)
break; // the creation of the run failed, done here
final int fi = i;
l.add(es.submit((Callable<Exception>) () -> {
try {
r[fi] = c.call();
} catch (Exception E) {
return E;
}
return null;
}));
}
// get the Futures, check for exceptions, shutdown ES
waitAndShutdown(es, l);
return r;
}
private static void waitAndShutdown(ExecutorService es, List<Future<Exception>> l) throws Exception {
try {
for (Future<Exception> result : l) {
if (result.get() != null)
throw result.get();
}
} catch (InterruptedException ex) {
throw new Exception(ex);
} finally {
es.shutdown();
}
}
private static ExecutorService getES(int threads, int count) {
return Executors.newFixedThreadPool(Math.min(threads, count));
}
}