Execution.java [src/csip/utils] Revision: Date:
/*
* $Id$
*
* This file is part of the Cloud Services Integration Platform (CSIP),
* a Model-as-a-Service framework, API and application suite.
*
* 2012-2022, Olaf David and others, OMSLab, Colorado State University.
*
* OMSLab licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package csip.utils;
import csip.api.server.Executable.StdHandler;
import static csip.api.server.Executable.TIMED_OUT;
import csip.SessionLogger;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.Writer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
/**
* Executable. Helper class to execute external programs.
*
* @author od
*/
class Execution {
final ProcessBuilder pb = new ProcessBuilder();
final File executable;
Object[] args = {};
Object[] opts = {};
SessionLogger logger;
long time = 3600 * 24;
TimeUnit unit = TimeUnit.SECONDS;
//
Writer stderr = new OutputStreamWriter(System.err) {
@Override
public void close() throws IOException {
System.err.flush();
stderr_closed = -1;
}
};
//
Writer stdout = new OutputStreamWriter(System.out) {
@Override
public void close() throws IOException {
System.out.flush();
stdout_closed = -1;
}
};
StdHandler stdout_handler;
StdHandler stderr_handler;
// Do nothing.
private static class NOPWriter extends Writer {
@Override
public void write(char[] cbuf, int off, int len) throws IOException {
}
@Override
public void flush() throws IOException {
}
@Override
public void close() throws IOException {
}
}
/**
* Create a new ProcessExecution.
*
* @param executable the executable file.
*/
Execution(File executable) {
this.executable = executable;
}
File getFile() {
return executable;
}
/**
* Set the execution arguments.
*
* @param args the command line arguments
*/
void setArguments(Object... args) {
this.args = args;
}
void setOptions(Object... opts) {
this.opts = opts;
}
/**
* Set the timeout.
*
* @param time the amount of the time unit.
* @param unit the TimeUnit
*/
void setTimeout(long time, TimeUnit unit) {
if (time < 1)
throw new IllegalArgumentException("time: " + time);
if (unit == null)
throw new IllegalArgumentException("time unit is null");
this.time = time;
this.unit = unit;
}
/**
* Set the execution arguments.
*
* @param args the command line arguments
*/
void setArguments(String... args) {
this.args = args;
}
void setOptions(String... opts) {
this.opts = opts;
}
/**
* Set the execution arguments.
*
* @param args the program arguments.
*/
void setArguments(List<Object> args) {
this.args = args.toArray(new Object[0]);
}
void setOptions(List<Object> opt) {
this.opts = opt.toArray(new Object[0]);
}
/**
* Get the arguments
*
* @return
*/
Object[] getArguments() {
return args.clone();
}
Object[] getOptions() {
return opts.clone();
}
/**
* Set the working directory where the handle get executed.
*
* @param dir the directory in which the executable will be started
*/
void setWorkingDirectory(File dir) {
if (!dir.exists() || dir.isFile())
throw new IllegalArgumentException("directory " + dir + " doesn't exist.");
pb.directory(dir);
}
/**
* get the execution environment. Use the returned map to customize the
* environment variables.
*
* @return the handle environment.
*/
Map<String, String> environment() {
return pb.environment();
}
/**
* Set the logger. This is optional.
*
* @param log the logger
*/
void setLogger(SessionLogger log) {
this.logger = log;
}
/**
* Output handler for stdout/stderr
*/
private static class OutputHandler extends Thread {
Writer w;
Reader r;
CountDownLatch l;
StdHandler handler;
String remainder = "";
OutputHandler(CountDownLatch l, InputStream is, Writer w, StdHandler handler) {
r = new InputStreamReader(is);
this.w = w;
this.l = l;
this.handler = handler;
setPriority(Thread.MIN_PRIORITY);
}
@Override
public void run() {
try {
char[] b = new char[2048];
int n = r.read(b);
while (n != -1) {
w.write(b, 0, n);
w.flush();
if (handler != null)
handle(new String(b, 0, n));
n = r.read(b);
}
if (handler != null)
handleRemainder();
} catch (IOException ex) {
if (ex.getMessage().equals("Stream closed."))
return;
ex.printStackTrace(System.err);
} finally {
try {
w.flush();
w.close();
r.close();
} catch (IOException ex) {
ex.printStackTrace(System.err);
}
l.countDown();
}
}
void handle(String out) {
int last = out.lastIndexOf('\n') + 1;
if (last > 0) {
String chunk = out.substring(0, last);
handler.handle(remainder + chunk);
remainder = out.substring(last);
} else {
handler.handle(remainder + out);
remainder = "";
}
}
void handleRemainder() {
if (!remainder.isEmpty())
handler.handle(remainder);
}
}
int stderr_closed = 0;
int stdout_closed = 0;
private void addToCMD(List<String> cmd, Object[] ao) {
for (Object o : ao) {
if (o != null) {
if (o.getClass() == String[].class) {
for (String s : (String[]) o) {
if (s != null && !s.isEmpty())
cmd.add(s);
}
} else {
String a = o.toString();
if (!a.isEmpty())
cmd.add(a);
}
}
}
}
/**
* Process execution. This call blocks until the handle is done.
*
* @return the exit status of the handle. 0 = all good
* @throws java.io.IOException
*/
int exec() throws IOException {
if (stdout_closed == stdout.hashCode())
throw new IOException("stdout for " + getFile() + " already closed. invoke redirectDefaults() or create new instance.");
if (stderr_closed == stderr.hashCode())
throw new IOException("stderr for " + getFile() + " already closed. invoke redirectDefaults() or create new instance.");
List<String> cmd = new ArrayList<>();
// the executable
cmd.add(executable.toString());
// Options first
addToCMD(cmd, opts);
// Arguments second
addToCMD(cmd, args);
pb.command(cmd);
if (logger != null && logger.isLoggable(Level.INFO)) {
logger.info("Cmd: " + pb.command().toString());
logger.info(" Env: " + pb.environment().toString());
logger.info(" Cwd: " + pb.directory().toString());
}
CountDownLatch l = new CountDownLatch(2);
Process p = pb.start();
new OutputHandler(l, p.getInputStream(), stdout, stdout_handler).start();
new OutputHandler(l, p.getErrorStream(), stderr, stderr_handler).start();
int exitValue = 0;
try {
boolean finnished = p.waitFor(time, unit);
l.await();
exitValue = finnished ? p.exitValue() : TIMED_OUT;
} catch (InterruptedException E) {
// do nothing
} finally {
if (stdout_closed != -1)
stdout_closed = stdout.hashCode();
if (stderr_closed != -1)
stderr_closed = stderr.hashCode();
p.getOutputStream().close();
p.destroy();
}
return exitValue;
}
/**
* Redirect the output stream
*
* @param w the stream handler
*/
void redirectOutput(Writer w) {
if (w == null)
throw new NullPointerException("w");
stdout = w;
}
/**
* Redirect the error stream
*
* @param w the new handler.
*/
void redirectError(Writer w) {
if (w == null)
throw new NullPointerException("w");
stderr = w;
}
/**
* Redirect stdout to null
*
*/
void redirectOutputNull() {
stdout = new NOPWriter();
stdout_closed = -1;
}
/**
* Redirect stderr to null
*
*/
void redirectErrorNull() {
stderr = new NOPWriter();
stderr_closed = -1;
}
/**
* Receive the current output from stdout.
*
* @param handler
*/
void setStdoutHandler(StdHandler handler) {
stdout_handler = handler;
}
/**
* Receive the current output from stderr.
*
* @param handler
*/
void setStderrHandler(StdHandler handler) {
stderr_handler = handler;
}
@Override
public String toString() {
return "Executable: " + executable.toString();
}
}