Parallel.java [src/csip/utils] Revision:   Date:
/*
 * $Id: 2.7+51 Parallel.java b6a9acb8879a 2023-08-09 od $
 *
 * This file is part of the Cloud Services Integration Platform (CSIP),
 * a Model-as-a-Service framework, API and application suite.
 *
 * 2012-2023, Olaf David and others, OMSLab, Colorado State University.
 *
 * OMSLab licenses this file to you under the MIT license.
 * See the LICENSE file in the project root for more information.
 */
package csip.utils;

import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;

/**
 *
 * @author od
 */
public class Parallel {

  private Parallel() {
  }

  @FunctionalInterface
  public interface Run {

    /**
     * Perform the run.
     *
     * @throws Exception
     */
    void run() throws Exception;
  }

  /**
   * Execute all runs in parallel.
   *
   * @param runs the runs to do in parallel
   * @throws Exception
   */
  public static void run(Collection<Run> runs) throws Exception {
    run(runs.toArray(new Run[0]));
  }

  /**
   * Execute all runs in parallel.
   *
   * @param runs the runs to execute
   * @throws Exception the first exception that happens in a run is propagated.
   */
  public static void run(Run... runs) throws Exception {
    run(runs.length, runs);
  }

  /**
   * Execute all runs in parallel or serial
   *
   * @param serial true if all should run in serial, false if parallel
   * @param runs the runs to execute
   * @throws Exception
   */
  public static void run(boolean serial, Run... runs) throws Exception {
    run(serial ? 1 : runs.length, runs);
  }

  /**
   * Execute all runs in parallel or all in series
   *
   * @param serial true if all should run in parallel, false if in sequence
   * @param runs the runs to execute
   * @throws Exception
   */
  public static void run(boolean serial, Collection<Run> runs) throws Exception {
    run(serial, runs.toArray(new Run[0]));
  }

  /**
   * Execute all runs in parallel.
   *
   * @param threads # of threads to use (1..runs.length)
   * @param runs the runs to execute
   * @throws Exception the first exception that happens in a run is propagated
   * here
   */
  public static void run(int threads, Run... runs) throws Exception {
    if (runs.length == 0)
      throw new IllegalArgumentException("Nothing to run.");

    if (threads < 1)
      throw new IllegalArgumentException("invalid # of threads: " + threads);

    ExecutorService es = getES(threads, runs.length);
    List<Future<Exception>> l = new ArrayList<>();
    for (Run r : runs) {
      l.add(es.submit((Callable<Exception>) () -> {
        try {
          r.run();
        } catch (Exception E) {
          return E;
        }
        return null;
      }));
    }

    // get the Futures, check for exceptions, shutdown ES
    waitAndShutdown(es, l);
  }

  /**
   * Execute runs in parallel..
   *
   * @param nthreads number of threads to use
   * @param count total number of factory calls
   * @param factory the factory that creates the runs (index (0..count-1) ->
   * Run)
   * @throws Exception the first exception that happens in a run..
   */
  public static void run(int nthreads, int count, Function<Integer, Run> factory) throws Exception {
    if (count < 1)
      throw new IllegalArgumentException("Nothing to run.");

    if (nthreads < 1)
      throw new IllegalArgumentException("invalid # of threads: " + nthreads);

    ExecutorService es = getES(nthreads, count);
    List<Future<Exception>> l = new ArrayList<>();

    // first thread, submission of new runs.
    for (int i = 0; i < count; i++) {
      final Run r = factory.apply(i);
      if (r == null)
        break; // the creation of the run failed, done here

      l.add(es.submit((Callable<Exception>) () -> {
        try {
          r.run();
        } catch (Exception E) {
          return E;
        }
        return null;
      }));
    }

    // get the Futures, check for exceptions, shutdown ES
    waitAndShutdown(es, l);
  }

  /**
   * Execute runs in parallel.
   *
   * @param <V> The result type of the operations
   * @param nthreads number of threads to use
   * @param ret the base type of the return array
   * @param count total number of factory calls
   * @param factory the factory that creates the runs (index (0..count-1) ->
   * Run)
   * @return the result array of type 'ret'
   * @throws Exception the first exception that happens in a run..
   */
  public static <V> V[] run(int nthreads, Class<V> ret, int count, Function<Integer, Callable<V>> factory) throws Exception {
    if (count < 1)
      throw new IllegalArgumentException("Nothing to run.");

    if (nthreads < 1)
      throw new IllegalArgumentException("invalid # of threads: " + nthreads);

    if (ret == null)
      throw new NullPointerException("return type 'ret' is null");

    ExecutorService es = getES(nthreads, count);

    List<Future<Exception>> l = new ArrayList<>();
    @SuppressWarnings("unchecked")
    V[] r = (V[]) Array.newInstance(ret, count);

    // submission of new runs.
    for (int i = 0; i < count; i++) {
      final Callable<V> c = factory.apply(i);
      if (c == null)
        break; // the creation of the run failed, done here

      final int fi = i;
      l.add(es.submit((Callable<Exception>) () -> {
        try {
          r[fi] = c.call();
        } catch (Exception E) {
          return E;
        }
        return null;
      }));
    }

    // get the Futures, check for exceptions, shutdown ES
    waitAndShutdown(es, l);
    return r;
  }

  private static void waitAndShutdown(ExecutorService es, List<Future<Exception>> l) throws Exception {
    try {
      for (Future<Exception> result : l) {
        if (result.get() != null)
          throw result.get();
      }
    } catch (InterruptedException ex) {
      throw new Exception(ex);
    } finally {
      es.shutdown();
    }
  }

  private static ExecutorService getES(int threads, int count) {
    return Executors.newFixedThreadPool(Math.min(threads, count));
  }
}