Parallel.java [src/csip/utils] Revision: default  Date:
/*
 * $Id$
 *
 * This file is part of the Cloud Services Integration Platform (CSIP),
 * a Model-as-a-Service framework, API and application suite.
 *
 * 2012-2022, Olaf David and others, OMSLab, Colorado State University.
 *
 * OMSLab licenses this file to you under the MIT license.
 * See the LICENSE file in the project root for more information.
 */
package csip.utils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 *
 * @author od
 */
public class Parallel {

  private Parallel() {
  }

  @FunctionalInterface
  public interface Run {

    /**
     * Perform the run.
     *
     * @throws Exception
     */
    void run() throws Exception;
  }

  /**
   * Factory interface for creating runs.
   */
  public interface RunFactory {

    /**
     * The number of runs to create
     * @return the max # of runs
     */
    int count();


    /**
     * Create a single run for a given index i.
     *
     * @param i the i th run to create
     * @return the run instance. If this method returns null, the submission of
     * new runs stops here.
     */
    Run create(int i);
  }


  /**
   * Execute all runs in parallel.
   *
   * @param runs the runs to do in parallel
   * @throws Exception
   */
  public static void run(Collection<Run> runs) throws Exception {
    run(runs.size(), runs.toArray(new Run[0]));
  }


  /**
   * Execute all runs in parallel.
   *
   * @param runs the runs to execute
   * @throws Exception the first exception that happens in a run is propagated.
   */
  public static void run(Run... runs) throws Exception {
    run(runs.length, runs);
  }


  /**
   * Execute all runs in parallel or all in series
   * @param serial true if all should run in parallel, false if in sequence
   * @param runs the runs to execute
   * @throws Exception
   */
  public static void run(boolean serial, Run... runs) throws Exception {
    run(serial ? 1 : runs.length, runs);
  }


  /**
   * Execute all runs in parallel.
   *
   * @param threads # of threads to use (1..runs.length)
   * @param runs the runs to execute
   * @throws Exception the first exception that happens in a run is propagated
   * here
   */
  public static void run(int threads, Run... runs) throws Exception {
    if (runs.length == 0)
      throw new IllegalArgumentException("Nothing to run.");
    
    if (threads < 1 || threads > runs.length)
      throw new IllegalArgumentException("invalid # of threads: " + threads);

    List<Callable<Exception>> taskList = new ArrayList<>();
    for (Run r : runs) {
      taskList.add((Callable<Exception>) () -> {
        try {
          r.run();
        } catch (Exception E) {
          return E;
        }
        return null;
      });
    }

    ExecutorService es = Executors.newFixedThreadPool(threads);
    try {
      for (Future<Exception> result : es.invokeAll(taskList)) {
        if (result.get() != null)
          throw result.get();
      }
    } catch (InterruptedException ex) {
      throw new Exception(ex);
    } finally {
      es.shutdown();
    }
  }


  /**
   * Execute all runs in parallel.
   *
   * @param nthreads number of threads to use
   * @param factory the factory that creates the runs
   * @throws Exception the first exception that happens in a run..
   */
  public static void run(int nthreads, RunFactory factory) throws Exception {
    ExecutorService es = Executors.newFixedThreadPool(nthreads);
    List<Future<Exception>> l = new ArrayList<>();

    // first thread, submission of new runs.
    for (int i = 0; i < factory.count(); i++) {
      final Run r = factory.create(i);
      if (r == null)
        break; // the creation of the run failed, done here
      
      l.add(es.submit((Callable<Exception>) () -> {
        try {
          r.run();
        } catch (Exception E) {
          return E;
        }
        return null;
      }));
    }
    // second thread, pull the Futures, check for exceptions
    try {
      for (Future<Exception> result : l) {
        if (result.get() != null)
          throw result.get();
      }
    } catch (InterruptedException ex) {
      throw new Exception(ex);
    } finally {
      es.shutdown();
    }
  }
}