Parallel.java [src/csip/utils] Revision: default Date:
/*
* $Id$
*
* This file is part of the Cloud Services Integration Platform (CSIP),
* a Model-as-a-Service framework, API and application suite.
*
* 2012-2022, Olaf David and others, OMSLab, Colorado State University.
*
* OMSLab licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package csip.utils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
*
* @author od
*/
public class Parallel {
private Parallel() {
}
@FunctionalInterface
public interface Run {
/**
* Perform the run.
*
* @throws Exception
*/
void run() throws Exception;
}
/**
* Factory interface for creating runs.
*/
public interface RunFactory {
/**
* The number of runs to create
* @return the max # of runs
*/
int count();
/**
* Create a single run for a given index i.
*
* @param i the i th run to create
* @return the run instance. If this method returns null, the submission of
* new runs stops here.
*/
Run create(int i);
}
/**
* Execute all runs in parallel.
*
* @param runs the runs to do in parallel
* @throws Exception
*/
public static void run(Collection<Run> runs) throws Exception {
run(runs.size(), runs.toArray(new Run[0]));
}
/**
* Execute all runs in parallel.
*
* @param runs the runs to execute
* @throws Exception the first exception that happens in a run is propagated.
*/
public static void run(Run... runs) throws Exception {
run(runs.length, runs);
}
/**
* Execute all runs in parallel or all in series
* @param serial true if all should run in parallel, false if in sequence
* @param runs the runs to execute
* @throws Exception
*/
public static void run(boolean serial, Run... runs) throws Exception {
run(serial ? 1 : runs.length, runs);
}
/**
* Execute all runs in parallel.
*
* @param threads # of threads to use (1..runs.length)
* @param runs the runs to execute
* @throws Exception the first exception that happens in a run is propagated
* here
*/
public static void run(int threads, Run... runs) throws Exception {
if (runs.length == 0)
throw new IllegalArgumentException("Nothing to run.");
if (threads < 1 || threads > runs.length)
throw new IllegalArgumentException("invalid # of threads: " + threads);
List<Callable<Exception>> taskList = new ArrayList<>();
for (Run r : runs) {
taskList.add((Callable<Exception>) () -> {
try {
r.run();
} catch (Exception E) {
return E;
}
return null;
});
}
ExecutorService es = Executors.newFixedThreadPool(threads);
try {
for (Future<Exception> result : es.invokeAll(taskList)) {
if (result.get() != null)
throw result.get();
}
} catch (InterruptedException ex) {
throw new Exception(ex);
} finally {
es.shutdown();
}
}
/**
* Execute all runs in parallel.
*
* @param nthreads number of threads to use
* @param factory the factory that creates the runs
* @throws Exception the first exception that happens in a run..
*/
public static void run(int nthreads, RunFactory factory) throws Exception {
ExecutorService es = Executors.newFixedThreadPool(nthreads);
List<Future<Exception>> l = new ArrayList<>();
// first thread, submission of new runs.
for (int i = 0; i < factory.count(); i++) {
final Run r = factory.create(i);
if (r == null)
break; // the creation of the run failed, done here
l.add(es.submit((Callable<Exception>) () -> {
try {
r.run();
} catch (Exception E) {
return E;
}
return null;
}));
}
// second thread, pull the Futures, check for exceptions
try {
for (Future<Exception> result : l) {
if (result.get() != null)
throw result.get();
}
} catch (InterruptedException ex) {
throw new Exception(ex);
} finally {
es.shutdown();
}
}
}