ModelDataService.java [src/csip] Revision: default Date:
/*
* $Id$
*
* This file is part of the Cloud Services Integration Platform (CSIP),
* a Model-as-a-Service framework, API and application suite.
*
* 2012-2022, Olaf David and others, OMSLab, Colorado State University.
*
* OMSLab licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package csip;
import csip.api.server.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.logging.Level;
import java.text.DateFormat;
import java.time.Duration;
import java.time.format.DateTimeParseException;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.*;
import javax.ws.rs.core.*;
import org.apache.commons.io.*;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.codehaus.jettison.json.*;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import com.fasterxml.uuid.Generators;
import csip.annotations.*;
import csip.utils.*;
import csip.utils.Services.FormDataParameter;
import csip.api.server.COSU;
import csip.api.server.Velocity;
/**
* Base class for all modeling and data services. Any CSIP service
* implementation will subclass ModelDataService.
* <br>
* <br>
* Example:<br>
* <pre>
* import csip.ModelDataService;
*
* @Path("m/myservice/1.0")
* public class MyService extends ModelDataService {
*
* public void doProcess() {
* // service implementation.
* }
* }
* </pre>
*
* @author Olaf David
*/
public abstract class ModelDataService implements ModelDataServiceConstants {
private String suid = Generators.timeBasedGenerator().generate().toString();
private static String tz = Config.getString(Config.CSIP_TIMEZONE);
private static int SERVICE_ERROR = Config.getInt(Config.CSIP_RESPONSE_ERROR_HTTPSTATUS);
private Publisher publisher = Config.getPublisher();
private ResultStore resultStore = Config.getResultStore();
private int statusCode = 200;
private boolean needInit = true;
private volatile File workspace;
private File results;
private JSONObject metainfo;
private JSONObject request;
private Object param;
private Map<String, JSONObject> paramMap;
//TODO: make this private.
public Task mt;
private Date start = null;
private String[] inputs = ModelSession.NO_ATTACHMENTS; // no attachments
private String reqHost;
private String reqUrl = "";
private String reqRemoteIp = "";
private String reqContext;
private String reqScheme;
private String reqAuth;
private Map<String, JSONObject> rep;
// The progress (message) during execution.
private String progress = null;
// file stuff
private List<File> fres;
private Map<File, String> fdesc;
private Map<String, Map<String, Object>> rmeta;
private Map<String, Map<String, Object>> repmeta;
private Map<String, JSONObject> rdata;
private List<Object> deprResults;
private List<File> frep;
// options Default comes from @Options
private boolean unpack;
// Timeout of execution in seconds. Default comes from @Options
private long timeout;
protected SessionLogger LOG;
private static final int PRE = 0;
private static final int PROC = 1;
private static final int POST = 2;
private String digest;
private JSONArray cachedResult;
// payload management
private Map<String, FormDataParameter> formData;
// API management
private SimpleCache<String, Object> api = new SimpleCache<>();
// report data
private Map<String, JSONObject> reportData() {
return rep != null ? rep : (rep = new LinkedHashMap<>());
}
private List<Object> deprecatedResults() {
return deprResults != null ? deprResults : (deprResults = new ArrayList<>());
}
// result data
private Map<String, JSONObject> rdata() {
return rdata != null ? rdata : (rdata = new LinkedHashMap<>());
}
// result meta info
private Map<String, Map<String, Object>> rmeta() {
return rmeta != null ? rmeta : (rmeta = new HashMap<>());
}
// report meta info
private Map<String, Map<String, Object>> repmeta() {
return repmeta != null ? repmeta : (repmeta = new HashMap<>());
}
// files as results
private List<File> fresults() {
return fres != null ? fres : (fres = new ArrayList<>());
}
// files as reports
private List<File> freports() {
return frep != null ? frep : (frep = new ArrayList<>());
}
// files descriptions
private Map<File, String> fdesc() {
return fdesc != null ? fdesc : (fdesc = new HashMap<>());
}
/// API
/**
* Get the formdata payload parameter.
*
* @return the formdata API
*/
protected final PayloadFormData formdata() {
return (PayloadFormData) api.get("formdata",
o -> new PayloadFormDataImpl(formData, this));
}
/**
* Get access to metainfo content of the payload
*
* @return metainfo section API
*/
protected final PayloadMetaInfo metainfo() {
return (PayloadMetaInfo) api.get("metainfo",
o -> new PayloadMetaInfoImpl(metainfo));
}
/**
* Get the payload parameter API
*
* @return parameter section API
*/
protected final PayloadParameter parameter() {
return (PayloadParameter) api.get("parameter",
o -> new PayloadParameterImpl(paramMap));
}
/**
* Get access to the all payload attachments.
*
* @return the attachment API
*/
protected final PayloadAttachments attachments() {
return (PayloadAttachments) api.get("attachments",
o -> new PayloadAttachmentsImpl(inputs, this));
}
/**
* For populating service results.
*
* @return the result API.
*/
protected final PayloadResults results() {
return (PayloadResults) api.get("results",
o -> new PayloadResultsImpl(() -> rdata(), () -> rmeta(), () -> fresults(), () -> fdesc()));
}
/**
* For populating payload report data
*
* @return report API
*/
protected final PayloadResults report() {
return (PayloadResults) api.get("report",
o -> new PayloadResultsImpl(() -> reportData(), () -> repmeta(), () -> freports(), () -> fdesc()));
}
/**
* Get the service resources (@Resources annotations).
*
* @return the resource API.
*/
protected final ServiceResources resources() {
return (ServiceResources) api.get("resources",
o -> new ServiceResourcesImpl(this, LOG));
}
/**
* Get the service annotations other that @Resources
*
* @return annotations API
*/
protected final ServiceAnnotations service() {
return (ServiceAnnotations) api.get("annotations",
o -> new ServiceAnnotationsImpl(this));
}
/**
* Get access to the service configuration settings.
*
* @return config API
*/
protected final ServiceConfiguration config() {
return (ServiceConfiguration) api.get("config",
o -> new ServiceConfigurationImpl(this));
}
/**
* Access the workspace for the service,
*
* @return workspace API.
*/
protected final SessionWorkspace workspace() {
return (SessionWorkspace) api.get("workspace",
o -> new SessionWorkspaceImpl(this));
}
/**
* Get request information.
*
* @return request and context API
*/
protected final PayloadRequest request() {
return (PayloadRequest) api.get("request",
o -> new PayloadRequestImpl(reqRemoteIp, reqUrl, reqHost, reqContext,
reqScheme, reqAuth, request, this));
}
/**
* Get cosu information.
*
* @return request and context API
*/
protected final COSU cosu() {
return (COSU) api.get("cosu",
o -> new COSUImpl(metainfo));
}
/**
* Get a Velocity instance.
*
* @return the velocity template instance
*/
protected final Velocity velocity() {
return (Velocity) api.get("velocity",
o -> new VelocityImpl());
}
private JSONArray create_res(Map<String, Map<String, Object>> meta,
Map<String, JSONObject> data) throws JSONException {
JSONArray arr = new JSONArray();
for (Map.Entry<String, JSONObject> e : data.entrySet()) {
JSONObject val = e.getValue();
if (meta.containsKey(e.getKey())) {
Map<String, Object> md = meta.get(e.getKey());
for (Map.Entry<String, Object> e1 : md.entrySet()) {
val.put(e1.getKey(), e1.getValue());
}
}
arr.put(val);
}
return arr;
}
private JSONArray create_report() throws JSONException {
return create_res(repmeta(), reportData());
}
private JSONArray create_results() throws JSONException {
if (cachedResult != null)
return cachedResult;
JSONArray arr = create_res(rmeta(), rdata());
// this will go away in the future
if (deprResults != null) {
for (Object deprecatedResult : deprResults) {
arr.put(deprecatedResult);
}
}
return arr;
}
/**
* Indicate if the service needs a separate result folder.
*
* @return true if it is needed, false otherwise.
*/
private boolean hasResultsDir() {
return results != null;
}
////////////////////////////////////////////////////////////////////// Lifecycle
/**
* Get the service capabilities. Overwrite when parameter should be computed
* based on other information, such as data base table columns, etc.
*
* @return the parameter as JSONArray.
*/
protected JSONArray getCapabilities() {
return null;
}
/**
* Workflow step 1: process the request data.
*
* @throws Exception if pre-processing fails.
*/
protected void preProcess() throws Exception {
}
/**
* Workflow step 2: The process method.
*
* @throws Exception if processing fails.
*/
protected void doProcess() throws Exception {
}
/**
* Workflow step 3: create the response the data.
*
* @throws Exception if post-processing fails.
*/
protected void postProcess() throws Exception {
}
/**
* Call back method when a service is externally canceled to allow for
* cleanup, etc. To be save, synchronize access to data with doProcess().
*
*/
protected void onCancel() {
}
/**
* Create a report.
*
* @throws Exception if report creation fails.
*/
protected void doReport() throws Exception {
}
////////////////////////////////////////////////////////////////////
/**
* This replaces the process() call from process0.
*/
private Throwable doProcessWrapper(int execPhase) {
if (cachedResult != null)
return null;
try {
switch (execPhase) {
case PRE:
preProcess();
break;
case PROC:
doProcess();
break;
case POST:
postProcess();
break;
default:
throw new IllegalArgumentException("do process argument: " + execPhase);
}
} catch (Throwable e) {
return e;
}
return null;
}
private void processOptions() {
// fetching the defaults
try {
unpack = (boolean) Options.class.getMethod("unpackinput").getDefaultValue();
String to = (String) Options.class.getMethod("timeout").getDefaultValue();
timeout = Duration.parse(to).getSeconds();
} catch (Exception ex) {
// this should not happen.
LOG.log(Level.SEVERE, null, ex);
}
Options opt = getClass().getAnnotation(Options.class);
if (opt != null) {
// overwritten.
unpack = opt.unpackinput();
LOG.info(getClass() + ": unpack: " + unpack);
try {
timeout = Duration.parse(opt.timeout()).getSeconds();
LOG.info(getClass() + ": timeout: " + opt.timeout() + " (" + timeout + " sec)");
} catch (DateTimeParseException e) {
LOG.warning("illegal timeout: " + opt.timeout());
}
}
}
private Throwable process0() throws Exception {
// Do all installs before anyhting else.
for (Resource r : Binaries.getResourcesByType(getClass(), ResourceType.INSTALL)) {
Binaries.doInstall(r, LOG);
}
Resource resource = Binaries.getAutoExecResource(getClass());
if (resource == null)
// Nothing can be executed automatically, move on with doProcess() ...
return doProcessWrapper(PROC);
Executable e;
switch (resource.type()) {
// case OMS_COMP:
// Class<?> cl = getClass();
// if (!resource.file().isEmpty()) {
// String classname = resource.file();
// cl = Thread.currentThread().getContextClassLoader().loadClass(classname);
// }
// try {
// OMSComponentMapper cm = new OMSComponentMapper(cl.newInstance());
// cm.setInputs(this);
// cm.process();
// cm.getOutputs(this);
// } catch (Exception t) {
// return t;
// }
// return null;
case PYTHON3:
case PYTHON2:
e = Binaries.getResourcePython(resource, getWorkspaceDir0(), LOG, getClass());
break;
case EXECUTABLE:
e = Binaries.getResourceExe0(resource, getWorkspaceDir0(), LOG, getClass());
break;
case CLASSNAME:
// case OMS_DSL:
List<File> jars = new ArrayList<>();
File csipHome = new File(Config.getString(Config.CSIP_DIR));
for (Resource j1 : Binaries.getResourcesByType(getClass(), ResourceType.JAR)) {
// be aware of the underscore in a jar file.
jars.add(Binaries.unpackResource0(j1, csipHome, LOG));
// Binaries.unpackResource("/oms-all.jar_", new File(oms_home, "oms-all.jar"));
}
// if (resource.type() == ResourceType.OMS_DSL)
// e = Binaries.getResourceOMSDSL(resource, getWorkspaceDir0(), jars, LOG);
// else
e = Binaries.getResourceJava(resource, getWorkspaceDir0(), jars, LOG);
break;
default:
return new ServiceException("Invalid resource type for id='auto' " + resource.type());
}
LOG.info("running : " + e.getName());
int ret = e.exec();
LOG.info("done with exit value: " + ret);
// Successful
if (ret == 0) {
if (LOG.isLoggable(Level.INFO)) {
FilenameFilter ff = new WildcardFileFilter("*" + Binaries.STDOUT, IOCase.INSENSITIVE);
File[] f = getWorkspaceDir0().listFiles(ff);
if (f != null && f.length > 0)
LOG.info(f[0] + ":\n" + FileUtils.readFileToString(f[0], "UTF-8"));
}
return null;
}
// Failed
FilenameFilter ff = new WildcardFileFilter("*" + Binaries.STDERR, IOCase.INSENSITIVE);
File[] f = getWorkspaceDir0().listFiles(ff);
if (f != null && f.length > 0)
return new ServiceException(FileUtils.readFileToString(f[0], "UTF-8"));
return new ServiceException("Error: return code " + ret);
}
private Throwable postProcess0() throws Exception {
for (Resource r : Binaries.getResourcesByType(getClass(), ResourceType.OUTPUT)) {
String[] files = r.file().split("\\s+");
for (String file : files) {
File[] f = Utils.expandFiles(getWorkspaceDir0(), file);
if (f != null && f.length > 0)
results().put(f);
}
}
// this is for debugging only! be careful
if (metainfo.optBoolean("archive", false)) {
File archiveZip = zipWorkspace();
File f = new File(getWorkspaceDir0(), "archive-" + getSUID() + ".zip");
archiveZip.renameTo(f);
results().put(f);
}
return doProcessWrapper(POST);
}
final File getWorkspaceDir0() {
File ws = workspace;
if (ws == null) {
synchronized (this) {
ws = workspace;
if (ws == null) {
workspace = ws = Services.getWorkDir(getSUID());
workspace.mkdirs();
}
}
}
return ws;
}
/**
* Get a new Results folder for this model run. returns null if it has no
* results folder.
*
* @return the results folder or null if there should be none.
*/
private File getResultsDir() {
if (results == null) {
results = Services.getResultsDir(getSUID());
results.mkdirs();
}
return results;
}
/**
* Get the simulation unique identifier (128 byte UUID).
*
* @return the suid string
*/
protected final String getSUID() {
return suid;
}
final boolean isAsync() {
return metainfo.has(KEY_MODE)
&& metainfo.optString(KEY_MODE).equals(ASYNC);
}
/**
* Get the Parameter as map "name" to "JSONObject".
*
* @return the parameter map
*/
@Deprecated
protected final Map<String, JSONObject> getParamMap() {
return paramMap;
}
/**
* Get the request parameter.
*
* @return request parameter
*/
@Deprecated
protected final Object getParam() {
return param;
}
///////////////////////////////////////////////////////////////////////private
private void doCreateReport(JSONObject meta) throws Exception {
// if the request wants to skip reporting
if (meta.has("report") && !meta.getBoolean("report"))
return;
// allow the subclass to populate report entries
doReport();
// report call has been made?
if (rep != null) {
JSONArray report = create_report();
if (frep != null)
annotateResults(report, frep);
// put an flag into the metainfo to indicate report
// availability
meta.put("report", true);
JSONObject r = new JSONObject();
r.put(KEY_METAINFO, meta)
.put(KEY_REPORT, report);
String resp = JSONUtils.toString(r);
FileUtils.writeStringToFile(new File(getResultsDir(), REPORT_FILE), resp, "UTF-8");
if (LOG.isLoggable(Level.INFO))
LOG.info("created report : " + new File(getResultsDir(), REPORT_FILE));
}
}
private void processResults(List<File> resultfiles) throws IOException {
File wsDir = getWorkspaceDir0();
File resultDir = getResultsDir();
for (File sfile : resultfiles) {
if (!sfile.exists()) {
sfile = new File(wsDir, sfile.toString());
if (!sfile.exists())
throw new IOException("Result file not found: " + sfile);
}
if (!sfile.canRead())
throw new IOException("Cannot read file: " + sfile);
if (LOG.isLoggable(Level.INFO))
LOG.info("Copy result" + sfile.toString() + " to " + resultDir);
FileUtils.copyFileToDirectory(sfile, resultDir);
}
}
private void annotateResults(JSONArray results, List<File> files) throws Exception {
if (files == null || files.isEmpty())
return;
// No need to convert toPublicURL as getCodebase() is drawing from a
// previously translated URI anyway.
String url = request().getCodebase();
for (File f : files) {
String fn = f.getName();
String fn_enc = UriBuilder.fromPath(fn).build().toString(); // proper url encoding.
String descr = fdesc().get(f);
results.put(JSONUtils.dataDesc(fn, url + "q/" + getSUID() + "/" + fn_enc, descr));
}
}
private JSONObject getFailedMetaInfo() {
try {
JSONObject metainfoClone = (metainfo == null)
? new JSONObject() : JSONUtils.clone(metainfo);
metainfoClone.put(KEY_STATUS, FAILED)
.put(KEY_SUUID, getSUID())
.put(KEY_TSTAMP, Dates.nowISO(tz))
.put(KEY_SERVICE_URL, request().getURL())
.put(KEY_CLOUD_NODE, Services.LOCAL_IP_ADDR)
.put(KEY_REQ_IP, request().getRemoteAddr());
return metainfoClone;
} catch (JSONException ex) {
LOG.log(Level.SEVERE, null, ex);
}
return null;
}
private JSONObject getDeniedMetaInfo() {
try {
JSONObject metainfoClone = (metainfo == null)
? new JSONObject() : JSONUtils.clone(metainfo);
metainfoClone.put(KEY_STATUS, DENIED)
.put(KEY_TSTAMP, Dates.nowISO(tz))
.put(KEY_SERVICE_URL, request().getURL())
.put(KEY_REQ_IP, request().getRemoteAddr());
return metainfoClone;
} catch (JSONException ex) {
LOG.log(Level.SEVERE, null, ex);
}
return null;
}
private void validateAuthentication(HttpServletRequest httpReq)
throws SecurityException {
TokenAuthentication ta = Config.getTokenAuthentication();
if (ta.requiresAuthentication()) {
String auth = httpReq.getHeader(HttpHeaders.AUTHORIZATION);
if (ta.isTokenBasedAuthentication(auth))
ta.validate(ta.getToken(auth));
else
throw new SecurityException("No authorization token provided.");
}
}
private void initOnce(HttpServletRequest httpReq) {
if (needInit) {
String extSUID = httpReq.getHeader(KEY_SUUID);
if (extSUID != null)
suid = extSUID;
else
// only use the a (kafka) publisher
// when needed: the external suid is set.
publisher = Publisher.NONE;
if (LOG == null)
LOG = new SessionLogger(getResultsDir(), service().getPath(),
getSUID(), Config.getString(Config.CSIP_LOGGING_LEVEL));
start = new Date();
processOptions();
needInit = false;
}
}
/**
* Get the status type if allowed.
*
* @param s the requested status type
* @return the actual status type.
*/
private Response.StatusType getRespStatus(Response.StatusType s) {
if (SERVICE_ERROR >= 500)
return s;
return Response.Status.OK;
}
//////////////////////////////////////////////////////////////////// HTTP
/**
* Describe the service as JSON. (Service endpoint use only)
*
* @param uriInfo the URI Info (injected)
* @param httpReq the servlet request (injected)
* @return The service signature as JSON
*/
@GET
@Produces(MediaType.APPLICATION_JSON)
public final String describeJSON(@Context UriInfo uriInfo,
@Context HttpServletRequest httpReq) {
initOnce(httpReq);
LOG.log(Level.INFO, "HTTP/GET {0}", uriInfo.getRequestUri().toString());
LOG.log(Level.INFO, " Request Parameter template for " + getClass());
try {
// 1) Check if service is mapping a OMS component
// List<Resource> resource = Binaries.getResourcesByType(getClass(), ResourceType.OMS_COMP);
// if (resource.size() > 0) {
// Class<?> cl = getClass();
// if (!resource.get(0).file().isEmpty()) {
// String classname = resource.get(0).file();
// cl = Thread.currentThread().getContextClassLoader().loadClass(classname);
// }
// OMSComponentMapper cm = new OMSComponentMapper(cl.newInstance());
// return cm.getInputs().toString(4);
// }
JSONObject r;
JSONArray params = getCapabilities();
// 2) check if service is implementing getCapabilities()
if (params != null) {
r = JSONUtils.newRequest();
r.put(ModelDataService.KEY_PARAMETER, params);
} else {
// 3) look for an external .json template.
String classname = '/' + getClass().getName().replace('.', '/');
LOG.info("Search " + classname + "Req.json");
InputStream is = getClass().getResourceAsStream(classname + "Req.json");
if (is == null) {
LOG.info("Search " + classname + ".json");
is = getClass().getResourceAsStream(classname + ".json");
if (is == null) {
LOG.info("Return empty request.");
r = JSONUtils.newRequest();
} else {
r = new JSONObject(IOUtils.toString(is, "UTF-8"));
}
} else {
r = new JSONObject(IOUtils.toString(is, "UTF-8"));
}
}
JSONObject m = r.getJSONObject(KEY_METAINFO);
JSONUtils.mergeInto(Utils.getServiceInfo(getClass()), m);
String host = Utils.getPublicRequestURL(httpReq);
m.put(KEY_SERVICE_URL, host);
return JSONUtils.toString(r).replace("\\/", "/");
} catch (Exception ex) {
LOG.log(Level.WARNING, null, ex);
LOG.info("Return empty request.");
return JSONUtils.newRequest().toString();
}
}
/**
* Service Handler for non-multipart requests. There are no form parameter,
* everything is in the body. (Service endpoint only)
*
* @param uriInfo The UriInfo context
* @param httpReq tye servlet request
* @param requestStr the request string
* @return the JSON response of the service.
*/
@POST
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public final Response execute(@Context UriInfo uriInfo,
@Context HttpServletRequest httpReq, String requestStr) {
initOnce(httpReq);
LOG.log(Level.INFO, "HTTP/POST {0}", uriInfo.getRequestUri().toString());
reqRemoteIp = httpReq.getHeader("X-Forwarded-For");
if (reqRemoteIp == null)
reqRemoteIp = httpReq.getRemoteAddr();
reqHost = httpReq.getRemoteHost();
reqUrl = Utils.getPublicRequestURL(httpReq);
reqContext = httpReq.getContextPath();
reqScheme = httpReq.getScheme();
reqAuth = httpReq.getHeader(HttpHeaders.AUTHORIZATION);
if (requestStr == null)
return Response.status(getRespStatus(Response.Status.BAD_REQUEST))
.entity(JSONUtils.error("Null JSON Request.").toString()).build();
if (LOG.isLoggable(Level.INFO)) {
LOG.info("path: " + service().getPath());
LOG.info("from: " + httpReq.getRemoteAddr() + ","
+ httpReq.getRemoteHost() + "," + httpReq.getRemoteUser());
LOG.info("request url: " + httpReq.getRequestURL().toString());
LOG.info("context: " + httpReq.getContextPath());
LOG.info("x-forwarded-for:" + httpReq.getHeader("X-Forwarded-For"));
}
if (LOG.isLoggable(Level.FINE))
LOG.fine("request: " + requestStr);
JSONObject response = null;
String responseStr = null;
try {
try {
request = new JSONObject(requestStr);
} catch (JSONException E) {
return Response.status(getRespStatus(Response.Status.BAD_REQUEST))
.entity(JSONUtils.error("Malformed JSON Request.").toString()).build();
}
// a request can be empty: '{}', means no metainfo, no parameter
if (request.length() == 0)
request = JSONUtils.newRequest();
// fetch parameter and meta info.
param = request.get(KEY_PARAMETER);
// get the metainfo, if not, create one.
if (request.has(KEY_METAINFO))
metainfo = request.getJSONObject(KEY_METAINFO);
else
metainfo = new JSONObject();
// optional passing of webhook as HTTP Header.
String webhook = httpReq.getHeader(HEADER_WEBHOOK);
if (webhook != null)
metainfo.put(KEY_WEBHOOK, webhook);
String rf = httpReq.getHeader(HEADER_REQUEST_FILE);
if (rf != null)
metainfo.put(KEY_REQUEST_FILE, rf);
FileUtils.writeStringToFile(new File(getResultsDir(), REQUEST_FILE), requestStr, "UTF-8");
validateAuthentication(httpReq);
if (resultStore != ResultStore.NONE) {
digest = resultStore.getDigest(service().getPath() + "-" + requestStr);
if (digest == null)
throw new ServiceException("Internal error: No digest created.");
if (!metainfo.optBoolean("rs_skip", false))
cachedResult = resultStore.getResult(digest);
}
// extract all Resources
Binaries.extractAllResources(getClass(), LOG);
paramMap = JSONUtils.preprocess(param);
// add the attachmed inputs to it
if (inputs != null && inputs.length > 0) {
String a = Arrays.toString(inputs);
metainfo.put("attachments", a.substring(1, a.length() - 1));
}
// handleCache(metainfo);
tz = JSONUtils.getJSONString(metainfo, KEY_TZ, Config.getString(Config.CSIP_TIMEZONE));
if (LOG.isLoggable(Level.INFO))
LOG.info("timezone: " + tz);
if (metainfo.has(KEY_PARAMETERSETS)) {
// ensemble run
response = new Callable<JSONObject>() {
String path = service().getPath();
@Override
public JSONObject call() throws Exception {
// map the request to callables
List<Callable<JSONObject>> ens = Services.mapEnsemble(request, path);
if (ens != null) {
// execute it
List<Future<JSONObject>> results = Services.runEnsemble(ens);
// reduce results
return Services.reduceEnsemble(results, request);
} else {
return JSONUtils.newError(getFailedMetaInfo(),
param, new ServiceException("Not an ensemble."));
}
}
}.call();
} else {
// single run:
publisher.publish(STARTED, getSUID());
mt = new Task();
if (metainfo.has(KEY_MODE) && metainfo.getString(KEY_MODE).equals(ASYNC)) {
// async call
JSONObject metainfoClone = JSONUtils.clone(metainfo);
// Put polling info in obj before starting!
ServiceAnnotations sa = service();
if (sa.getNextPoll() != -1) {
metainfoClone.put(KEY_NEXT_POLL, sa.getNextPoll());
}
if (sa.getFirstPoll() != -1) {
metainfoClone.put(KEY_FIRST_POLL, sa.getFirstPoll());
}
// generate response.
metainfoClone.put(KEY_STATUS, SUBMITTED)
.put(KEY_SUUID, getSUID())
.put(KEY_TSTAMP, Dates.nowISO(tz))
.put(KEY_SERVICE_URL, request().getURL());
// done, there is the response
response = JSONUtils.newResponse(request, null, metainfoClone);
responseStr = JSONUtils.toString(response);
// start the async call as the very last thing before returning
mt.start();
} else {
// sync call
mt.run();
responseStr = FileUtils.readFileToString(
new File(getResultsDir(), RESPONSE_FILE), "UTF-8");
}
}
} catch (SecurityException E) {
response = JSONUtils.newDeniedError(getDeniedMetaInfo(), param, E);
responseStr = JSONUtils.toString(response).replace("\\/", "/");
LOG.log(Level.SEVERE, E.getMessage());
statusCode = SERVICE_ERROR;
} catch (Throwable t) {
response = JSONUtils.newError(getFailedMetaInfo(), param, t);
responseStr = JSONUtils.toString(response).replace("\\/", "/");
LOG.log(Level.SEVERE, response.toString(), t);
statusCode = SERVICE_ERROR;
}
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("response :" + responseStr);
}
// LOG.close();
return Response.status(getRespStatus(new ServiceErrorStatusType(statusCode)))
.entity(responseStr)
.build();
}
/**
* Handler for model services. Multi-part handling. (Service endpoint only)
*
* @param uriInfo the context info
* @param httpReq the servlet request
* @param multipart multi part input.
*
* @return the JSON response as String.
*/
@POST
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.MULTIPART_FORM_DATA)
public final Response execute(@Context UriInfo uriInfo,
@Context HttpServletRequest httpReq, FormDataMultiPart multipart) {
initOnce(httpReq);
LOG.log(Level.INFO, "HTTP/POST {0}", uriInfo.getRequestUri().toString());
Response response = null;
try {
if (multipart == null || multipart.getBodyParts() == null
|| multipart.getBodyParts().isEmpty())
throw new ServiceException("Missing JSON request and/or files");
formData = Services.getFormParameter(multipart.getBodyParts());
File ws = getWorkspaceDir0();
// copy all file - file9 to workspace.
inputs = Utils.copyAndExtract(LOG, ws, formData, unpack);
String requestStr = null;
// fetch param JSON
FormDataParameter fdParam = formData.get(FORM_PARAM);
if (fdParam == null && !formData.isEmpty()) {
// only files are attached, no json
requestStr = JSONUtils.newRequest().toString();
if (LOG.isLoggable(Level.INFO))
LOG.info("creating empty request string");
// throw new ServiceException("Missing JSON request.");
} else if (fdParam.getFilename() == null) {
// direct passing
requestStr = fdParam.getValue();
} else {
if (ws == null)
throw new ServiceException("Illegal service configuration. missing simulation dir.");
File paramFile = new File(ws, fdParam.getFilename());
if (!paramFile.exists())
throw new ServiceException("Missing JSON request file.");
requestStr = FileUtils.readFileToString(paramFile, "UTF-8");
}
// proxy to the regular call.
response = execute(uriInfo, httpReq, requestStr);
} catch (Throwable t) {
response = Response.status(getRespStatus(Response.Status.fromStatusCode(SERVICE_ERROR)))
.entity(JSONUtils.newError(getFailedMetaInfo(), param, t).toString()).build();
LOG.log(Level.SEVERE, response.toString(), t);
}
// Let's not catch every exception so we have hope of finding the bug!
// LOG.close();
return response;
}
/**
* Set the progress as a string message. Call this message during process() to
* indicate progress for long running models. If the service is called
* asynchronously the message will be reported in the metainfo part of the
* rest call as the 'progress' entry.
*
* @param progress a meaningful message
* @throws ServiceException if there are problems.
*/
protected void setProgress(String progress) throws ServiceException {
this.progress = progress;
mt.setRunningStatus();
}
/**
* Set the progress as a numerical value (0..100)
*
* @param progress a value between 0 and 100;
* @throws ServiceException if progress arguments are out of range
*/
protected void setProgress(int progress) throws ServiceException {
if (progress < 0 || progress > 100)
throw new ServiceException("invalid progress: " + progress);
setProgress(Integer.toString(progress));
}
private long getKeepResults() {
if (metainfo.optBoolean("archive", false))
return metainfo.optLong("csip.session.ttl", Dates.getDurationSec("csip.session.ttl"));
return Dates.getDurationSec("csip.session.ttl");
}
File zipWorkspace() throws IOException {
// copy back the files to the workspace to make them a part of the archive
for (String f : new String[]{REQUEST_FILE, RESPONSE_FILE, RESULT_FILE, LOG_FILE}) {
File req = new File(getResultsDir(), f);
if (req.exists())
FileUtils.copyFileToDirectory(req, getWorkspaceDir0());
}
return ZipFiles.zip(getWorkspaceDir0());
}
/**
* Model execution Task.
*
*/
final class Task extends Thread {
// 1 day in milli seconds, max time for a running job to be accessible.
static final long ONE_DAY = 60 * 60 * 24 * 1000;
//
FutureTask<Throwable> task = new FutureTask<>(() -> process0());
String status = UNKNOWN;
Timer snapshot;
Throwable serviceError;
// empty request.
JSONObject emptyReq = new JSONObject();
@Override
public void run() {
ExecutorService es = Config.getExecutorService();
try {
// step 1 preprocess input
setRunningStatus();
publisher.publish(RUNNING, getSUID());
serviceError = doProcessWrapper(PRE);
if (serviceError == null) {
Config.getModelTasks().add(this);
es.submit(task);
// running from here on.
long sec = Dates.getDurationSec(Config.CSIP_SNAPSHOT, -1);
if (sec > 0) {
if (Config.isArchiveEnabled()) {
snapshot = new Timer("timer-" + getSUID());
snapshot.scheduleAtFixedRate(new TimerTask() {
long dur = -1;
@Override
public void run() {
try {
if (sec > dur) {
LOG.info("taking workspace snapshot ...");
ArchiveStore as = Config.getArchiveStore();
if (!as.isAvailable())
return;
long start = System.currentTimeMillis();
ModelSession ms = Config.getSessionStore().getSession(getSUID());
File snapshotZip = zipWorkspace();
DateFormat df = Dates.newISOFormat(tz);
Date now = new Date();
Date expDate = Dates.futureDate(now, 31536000); // 1 year expiration
ModelArchive ma = new ModelArchive(df.format(now),
df.format(expDate), request().getURL(), ms.getStatus(), ms.getReqIP());
if (as.hasArchive(getSUID()))
as.removeArchive(getSUID());
as.archiveSession(getSUID(), ma, snapshotZip);
FileUtils.deleteQuietly(snapshotZip);
long end = System.currentTimeMillis();
dur = (end - start) / 1000;
LOG.info("snapshot done in " + dur + " seconds.");
} else {
LOG.info("skipped snapshot: " + dur + " - " + sec);
}
} catch (Exception ex) {
LOG.log(Level.SEVERE, null, ex);
}
}
}, sec * 1000, sec * 1000);
}
}
// waiting here.
serviceError = task.get(timeout, TimeUnit.SECONDS);
if (serviceError == null) {
// step 3
serviceError = postProcess0();
if (serviceError == null) {
// step 4
JSONArray results = create_results();
if (fres != null)
// automatically annotate the output if there are files being returned.
annotateResults(results, fres);
if (results.length() == 0) {
// check if there is a .results.json file from python or R
// TODO: Read that file into JSONArray
File f = new File(getResultsDir(), RESULT_FILE);
if (f.exists() && f.canRead())
results = new JSONObject(FileUtils.readFileToString(f, "UTF-8"))
.getJSONArray(KEY_RESULT);
}
JSONObject meta = setFinishedStatus(results);
// step 5 create a report (if supported)
doCreateReport(meta);
// done here
if (snapshot != null)
snapshot.cancel();
}
}
}
if (serviceError != null) {
// doProcess threw an exception as serviceError
setFailedStatus(serviceError);
statusCode = SERVICE_ERROR;
LOG.log(Level.SEVERE, null, serviceError);
}
} catch (TimeoutException e) {
LOG.info("service: " + service().getPath()
+ " timed out after " + timeout + " seconds. service cancelled.");
setTimedoutStatus();
} catch (CancellationException e) {
LOG.log(Level.INFO, "cancelled.");
setCancelledStatus();
} catch (Exception e) {
LOG.log(Level.SEVERE, null, e);
setFailedStatus(e);
} finally {
// manage expiration action
try {
final ModelSession ms = Config.getSessionStore().getSession(getSUID());
if (fres != null)
processResults(fres);
if (frep != null)
processResults(frep);
// archive management
new Thread(() -> {
if (Config.isArchiveEnabled() && metainfo.optBoolean("csip.archive.enabled", true)) {
try {
if ((Config.getBoolean(Config.CSIP_ARCHIVE_FAILEDONLY) && serviceError != null) // only archive if failed
|| !Config.getBoolean(Config.CSIP_ARCHIVE_FAILEDONLY) // always archive
|| (Config.getBoolean(Config.CSIP_ARCHIVE_ONREQUEST) && metainfo().getBoolean(Config.CSIP_ARCHIVE_ONREQUEST))) { // only if requested.
JSONObject mi = metainfo;
if (mi != null && "map".equals(mi.optString("phase")))
// do not archive in the map phase.
return;
ArchiveStore as = Config.getArchiveStore();
if (as.isAvailable()) {
// close log before archiving.
LOG.close();
long st = System.currentTimeMillis();
File archiveZip = zipWorkspace();
// move it from session store to archive store.
// turn session into archive
DateFormat df = Dates.newISOFormat(tz);
Date now = new Date();
Date expDate = Dates.futureDate(now, Dates.getDurationSec("csip.archive.ttl"));
ModelArchive ma = new ModelArchive(df.format(now),
df.format(expDate), request().getURL(),
ms.getStatus(), ms.getReqIP());
if (as.hasArchive(getSUID()))
as.removeArchive(getSUID());
as.archiveSession(getSUID(), ma, archiveZip);
publisher.publish(ARCHIVED, getSUID());
FileUtils.deleteQuietly(archiveZip);
long en = System.currentTimeMillis();
LOG.info("Archived " + getSUID() + " in " + (en - st) + " ms.");
}
}
} catch (Exception ex) {
LOG.log(Level.SEVERE, null, ex);
}
}
// remove the workspace unless requested via config
boolean delete_ws = true;
if (Config.getBoolean(Config.CSIP_KEEPWORKSPACE, false)) {
delete_ws = false;
} else if (ms.getStatus().equals(FAILED)
&& Config.getBoolean("csip.keepworkspace.failed", false)) {
delete_ws = false;
}
if (delete_ws)
FileUtils.deleteQuietly(getWorkspaceDir0());
// keep the results, let the Sweeper remove it after ttl
// separate therad
Sweeper r = new Sweeper(hasResultsDir() ? getResultsDir() : null, getSUID());
long keepResults = getKeepResults();
if (ms.getStatus().equals(FAILED)) {
keepResults = Dates.getDurationSec("csip.session.ttl.failed");
} else if (ms.getStatus().equals(CANCELED)) {
keepResults = Dates.getDurationSec("csip.session.ttl.cancelled");
}
if (keepResults > 0) {
try {
Config.getTimer().schedule(r, keepResults * 1000);
} catch (IllegalStateException E) {
Config.getNewTimer().schedule(r, keepResults * 1000);
}
} else {
r.run();
}
}).start();
} catch (Exception ex) {
LOG.log(Level.SEVERE, null, ex);
}
LOG.close(); // this might already being closed if archived.
api.clear();
// remove the running session.
Config.getModelTasks().remove(this);
}
}
boolean cancel() {
onCancel();
return task.cancel(true);
}
ModelDataService getService() {
return ModelDataService.this;
}
@Override
public String toString() {
return "\n[" + getSUID() + " - " + status + " - " + start.toString() + "]";
}
// status management.
private void store(JSONObject request, JSONObject metainfo, JSONArray results) {
try {
Object results_ = results;
if (JSONUtils.getPayloadVersion(param) == 2)
results_ = JSONUtils.toV2(results);
JSONObject response = JSONUtils.newResponse(request, results_, metainfo);
File responseFile = new File(getResultsDir(), RESPONSE_FILE);
Lock l = Config.wsFileLocks.get(responseFile, (File t) -> new ReentrantLock());
l.lock();
try {
FileUtils.writeStringToFile(responseFile,
JSONUtils.toString(response).replace("\\/", "/"), "UTF-8");
} finally {
l.unlock();
}
String stat = metainfo.getString(KEY_STATUS);
String tstamp = metainfo.getString(KEY_TSTAMP);
String expDate = "";
if (metainfo.has(KEY_EXPIRATION_DATE))
expDate = metainfo.getString(KEY_EXPIRATION_DATE);
String cputime = "";
if (metainfo.has(KEY_CPU_TIME))
cputime = metainfo.getString(KEY_CPU_TIME);
boolean hasReport = metainfo.has("report") && metainfo.getBoolean("report");
ModelSession ms = new ModelSession(tstamp, expDate, service().getPath(), stat,
Services.LOCAL_IP_ADDR, cputime, request().getRemoteAddr(), inputs, hasReport, progress);
Config.getSessionStore().setSession(getSUID(), ms);
} catch (Exception ex) {
LOG.log(Level.SEVERE, null, ex);
}
}
private JSONObject updateMetadata(String status) {
this.status = status;
JSONObject mi = metainfo;
try {
mi.put(KEY_STATUS, status)
.put(KEY_SUUID, getSUID())
.put(KEY_CLOUD_NODE, Services.LOCAL_IP_ADDR)
.put(KEY_REQ_IP, request().getRemoteAddr())
.put(KEY_SERVICE_URL, request().getURL());
if (progress != null)
mi.put(KEY_PROGRESS, progress);
if (mi.has(KEY_LOG))
mi.put(KEY_LOG, request().getCodebase() + "q/log/" + getSUID());
mi.put(KEY_TSTAMP, Dates.newISOFormat(tz).format(start));
} catch (JSONException ex) {
LOG.log(Level.SEVERE, null, ex);
}
return mi;
}
private void setRunningStatus() {
try {
JSONObject metainfo = updateMetadata(RUNNING);
store(emptyReq, metainfo, null);
} catch (Exception ex) {
LOG.log(Level.SEVERE, null, ex);
}
}
private JSONObject setFinishedStatus(JSONArray results) {
try {
// only is progress is being touched, set it to 100%
JSONObject metainfo = updateMetadata(isQueued() ? QUEUED : FINISHED);
metainfo.put(KEY_CPU_TIME, Dates.diffInMillis(start, new Date()));
Date expDate = Dates.futureDate(getKeepResults());
metainfo.put(KEY_EXPIRATION_DATE, Dates.newISOFormat(tz).format(expDate));
if (resultStore != ResultStore.NONE) {
// only store the result if there are no input attachments
// and no output file is being produced.
if (cachedResult == null && (fres == null || fresults().isEmpty()) && attachments().getFilesCount() == 0)
resultStore.putResult(digest, results);
metainfo.put("rs_digest", digest);
}
// COSU
COSUImpl cosu = (COSUImpl) api.get("cosu");
if (cosu != null && cosu.isSupported()) {
metainfo.put("cosu", cosu.getCOSUMetainfo());
}
store(request, metainfo, results);
publisher.publish(FINISHED, getSUID() + " " + request().getURL());
return metainfo;
} catch (JSONException ex) {
LOG.log(Level.SEVERE, null, ex);
return null;
}
}
private void setFailedStatus(Throwable err) {
String message = JSONUtils.getErrorMessage(err);
try {
JSONObject metainfo = updateMetadata(FAILED);
metainfo.put(KEY_CPU_TIME, Dates.diffInMillis(start, new Date()));
Date expDate = Dates.futureDate(getKeepResults());
metainfo.put(KEY_EXPIRATION_DATE, Dates.newISOFormat(tz).format(expDate))
.put(ERROR, message);
if (Config.getBoolean(Config.CSIP_RESPONSE_STACKTRACE)) {
JSONArray trace = JSONUtils.getJSONStackTrace(err);
if (trace != null)
metainfo.put("stacktrace", trace);
}
store(request, metainfo, null);
publisher.publish(FAILED, getSUID() + " " + request().getURL());
} catch (JSONException ex) {
LOG.log(Level.SEVERE, null, ex);
}
}
private void setCancelledStatus() {
try {
JSONObject metainfo = updateMetadata(CANCELED);
metainfo.put(KEY_CPU_TIME, Dates.diffInMillis(start, new Date()));
store(request, metainfo, null);
publisher.publish(CANCELED, getSUID() + " " + request().getURL());
} catch (JSONException ex) {
LOG.log(Level.SEVERE, null, ex);
}
}
private void setTimedoutStatus() {
try {
JSONObject metainfo = updateMetadata(TIMEDOUT);
metainfo.put(KEY_CPU_TIME, Dates.diffInMillis(start, new Date()));
store(request, metainfo, null);
publisher.publish(TIMEDOUT, getSUID() + " " + request().getURL());
} catch (JSONException ex) {
LOG.log(Level.SEVERE, null, ex);
}
}
}
/**
* Allow the subclass to queue the request.
*
* @return
*/
boolean isQueued() {
return false;
}
}