ModelDataService.java [src/csip] Revision:   Date:
/*
 * $Id$
 *
 * This file is part of the Cloud Services Integration Platform (CSIP),
 * a Model-as-a-Service framework, API and application suite.
 *
 * 2012-2022, Olaf David and others, OMSLab, Colorado State University.
 *
 * OMSLab licenses this file to you under the MIT license.
 * See the LICENSE file in the project root for more information.
 */
package csip;

import csip.api.server.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.logging.Level;
import java.text.DateFormat;
import java.time.Duration;
import java.time.format.DateTimeParseException;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.*;
import javax.ws.rs.core.*;
import org.apache.commons.io.*;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.codehaus.jettison.json.*;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import com.fasterxml.uuid.Generators;
import csip.annotations.*;
import csip.utils.*;
import csip.utils.Services.FormDataParameter;
import csip.api.server.COSU;
import csip.api.server.Velocity;

/**
 * Base class for all modeling and data services. Any CSIP service
 * implementation will subclass ModelDataService.
 * <br>
 * <br>
 * Example:<br>
 * <pre>
 *  import csip.ModelDataService;
 *
 *  &#64;Path("m/myservice/1.0")
 *  public class MyService extends ModelDataService {
 *
 *  public void doProcess() {
 *    // service implementation.
 *  }
 * }
 * </pre>
 *
 * @author Olaf David
 */
public abstract class ModelDataService implements ModelDataServiceConstants {

  private String suid = Generators.timeBasedGenerator().generate().toString();

  private static String tz = Config.getString(Config.CSIP_TIMEZONE);
  private static int SERVICE_ERROR = Config.getInt(Config.CSIP_RESPONSE_ERROR_HTTPSTATUS);

  private Publisher publisher = Config.getPublisher();
  private ResultStore resultStore = Config.getResultStore();

  private int statusCode = 200;

  private boolean needInit = true;

  private volatile File workspace;
  private File results;

  private JSONObject metainfo;
  private JSONObject request;
  private Object param;
  private Map<String, JSONObject> paramMap;

  //TODO: make this private.
  public Task mt;

  private Date start = null;
  private String[] inputs = ModelSession.NO_ATTACHMENTS;  // no attachments

  private String reqHost;
  private String reqUrl = "";
  private String reqRemoteIp = "";
  private String reqContext;
  private String reqScheme;
  private String reqAuth;

  private Map<String, JSONObject> rep;

  // The progress (message) during execution.
  private String progress = null;

  // file stuff
  private List<File> fres;
  private Map<File, String> fdesc;
  private Map<String, Map<String, Object>> rmeta;
  private Map<String, Map<String, Object>> repmeta;

  private Map<String, JSONObject> rdata;
  private List<Object> deprResults;
  private List<File> frep;

  // options Default comes from @Options
  private boolean unpack;
  // Timeout of execution in seconds. Default comes from @Options
  private long timeout;

  protected SessionLogger LOG;

  private static final int PRE = 0;
  private static final int PROC = 1;
  private static final int POST = 2;

  private String digest;
  private JSONArray cachedResult;

  // payload management
  private Map<String, FormDataParameter> formData;
  // API management
  private SimpleCache<String, Object> api = new SimpleCache<>();


  // report data 
  private Map<String, JSONObject> reportData() {
    return rep != null ? rep : (rep = new LinkedHashMap<>());
  }


  private List<Object> deprecatedResults() {
    return deprResults != null ? deprResults : (deprResults = new ArrayList<>());
  }


  // result data
  private Map<String, JSONObject> rdata() {
    return rdata != null ? rdata : (rdata = new LinkedHashMap<>());
  }


  // result meta info
  private Map<String, Map<String, Object>> rmeta() {
    return rmeta != null ? rmeta : (rmeta = new HashMap<>());
  }


  // report meta info
  private Map<String, Map<String, Object>> repmeta() {
    return repmeta != null ? repmeta : (repmeta = new HashMap<>());
  }


  // files as results
  private List<File> fresults() {
    return fres != null ? fres : (fres = new ArrayList<>());
  }


  // files as reports
  private List<File> freports() {
    return frep != null ? frep : (frep = new ArrayList<>());
  }


  // files descriptions
  private Map<File, String> fdesc() {
    return fdesc != null ? fdesc : (fdesc = new HashMap<>());
  }


/// API 
  /**
   * Get the formdata payload parameter.
   *
   * @return the formdata API
   */
  protected final PayloadFormData formdata() {
    return (PayloadFormData) api.get("formdata",
        o -> new PayloadFormDataImpl(formData, this));
  }


  /**
   * Get access to metainfo content of the payload
   *
   * @return metainfo section API
   */
  protected final PayloadMetaInfo metainfo() {
    return (PayloadMetaInfo) api.get("metainfo",
        o -> new PayloadMetaInfoImpl(metainfo));
  }


  /**
   * Get the payload parameter API
   *
   * @return parameter section API
   */
  protected final PayloadParameter parameter() {
    return (PayloadParameter) api.get("parameter",
        o -> new PayloadParameterImpl(paramMap));
  }


  /**
   * Get access to the all payload attachments.
   *
   * @return the attachment API
   */
  protected final PayloadAttachments attachments() {
    return (PayloadAttachments) api.get("attachments",
        o -> new PayloadAttachmentsImpl(inputs, this));
  }


  /**
   * For populating service results.
   *
   * @return the result API.
   */
  protected final PayloadResults results() {
    return (PayloadResults) api.get("results",
        o -> new PayloadResultsImpl(() -> rdata(), () -> rmeta(), () -> fresults(), () -> fdesc()));
  }


  /**
   * For populating payload report data
   *
   * @return report API
   */
  protected final PayloadResults report() {
    return (PayloadResults) api.get("report",
        o -> new PayloadResultsImpl(() -> reportData(), () -> repmeta(), () -> freports(), () -> fdesc()));
  }


  /**
   * Get the service resources (@Resources annotations).
   *
   * @return the resource API.
   */
  protected final ServiceResources resources() {
    return (ServiceResources) api.get("resources",
        o -> new ServiceResourcesImpl(this, LOG));
  }


  /**
   * Get the service annotations other that @Resources
   *
   * @return annotations API
   */
  protected final ServiceAnnotations service() {
    return (ServiceAnnotations) api.get("annotations",
        o -> new ServiceAnnotationsImpl(this));
  }


  /**
   * Get access to the service configuration settings.
   *
   * @return config API
   */
  protected final ServiceConfiguration config() {
    return (ServiceConfiguration) api.get("config",
        o -> new ServiceConfigurationImpl(this));
  }


  /**
   * Access the workspace for the service,
   *
   * @return workspace API.
   */
  protected final SessionWorkspace workspace() {
    return (SessionWorkspace) api.get("workspace",
        o -> new SessionWorkspaceImpl(this));
  }


  /**
   * Get request information.
   *
   * @return request and context API
   */
  protected final PayloadRequest request() {
    return (PayloadRequest) api.get("request",
        o -> new PayloadRequestImpl(reqRemoteIp, reqUrl, reqHost, reqContext,
            reqScheme, reqAuth, request, this));
  }


  /**
   * Get cosu information.
   *
   * @return request and context API
   */
  protected final COSU cosu() {
    return (COSU) api.get("cosu",
        o -> new COSUImpl(metainfo));
  }


  /**
   * Get a Velocity instance.
   *
   * @return the velocity template instance
   */
  protected final Velocity velocity() {
    return (Velocity) api.get("velocity",
        o -> new VelocityImpl());
  }


  private JSONArray create_res(Map<String, Map<String, Object>> meta,
      Map<String, JSONObject> data) throws JSONException {
    JSONArray arr = new JSONArray();
    for (Map.Entry<String, JSONObject> e : data.entrySet()) {
      JSONObject val = e.getValue();
      if (meta.containsKey(e.getKey())) {
        Map<String, Object> md = meta.get(e.getKey());
        for (Map.Entry<String, Object> e1 : md.entrySet()) {
          val.put(e1.getKey(), e1.getValue());
        }
      }
      arr.put(val);
    }
    return arr;
  }


  private JSONArray create_report() throws JSONException {
    return create_res(repmeta(), reportData());
  }


  private JSONArray create_results() throws JSONException {
    if (cachedResult != null)
      return cachedResult;

    JSONArray arr = create_res(rmeta(), rdata());
    // this will go away in the future
    if (deprResults != null) {
      for (Object deprecatedResult : deprResults) {
        arr.put(deprecatedResult);
      }
    }
    return arr;
  }


  /**
   * Indicate if the service needs a separate result folder.
   *
   * @return true if it is needed, false otherwise.
   */
  private boolean hasResultsDir() {
    return results != null;
  }

  ////////////////////////////////////////////////////////////////////// Lifecycle    

  /**
   * Get the service capabilities. Overwrite when parameter should be computed
   * based on other information, such as data base table columns, etc.
   *
   * @return the parameter as JSONArray.
   */
  protected JSONArray getCapabilities() {
    return null;
  }


  /**
   * Workflow step 1: process the request data.
   *
   * @throws Exception if pre-processing fails.
   */
  protected void preProcess() throws Exception {
  }


  /**
   * Workflow step 2: The process method.
   *
   * @throws Exception if processing fails.
   */
  protected void doProcess() throws Exception {
  }


  /**
   * Workflow step 3: create the response the data.
   *
   * @throws Exception if post-processing fails.
   */
  protected void postProcess() throws Exception {
  }


  /**
   * Call back method when a service is externally canceled to allow for
   * cleanup, etc. To be save, synchronize access to data with doProcess().
   *
   */
  protected void onCancel() {
  }


  /**
   * Create a report.
   *
   * @throws Exception if report creation fails.
   */
  protected void doReport() throws Exception {
  }


  ////////////////////////////////////////////////////////////////////    
  /**
   * This replaces the process() call from process0.
   */
  private Throwable doProcessWrapper(int execPhase) {
    if (cachedResult != null)
      return null;

    try {
      switch (execPhase) {
        case PRE:
          preProcess();
          break;
        case PROC:
          doProcess();
          break;
        case POST:
          postProcess();
          break;
        default:
          throw new IllegalArgumentException("do process argument: " + execPhase);
      }
    } catch (Throwable e) {
      return e;
    }
    return null;
  }


  private void processOptions() {
    // fetching the defaults
    try {
      unpack = (boolean) Options.class.getMethod("unpackinput").getDefaultValue();
      String to = (String) Options.class.getMethod("timeout").getDefaultValue();
      timeout = Duration.parse(to).getSeconds();
    } catch (Exception ex) {
      // this should not happen.
      LOG.log(Level.SEVERE, null, ex);
    }
    Options opt = getClass().getAnnotation(Options.class);
    if (opt != null) {
      // overwritten.
      unpack = opt.unpackinput();
      LOG.info(getClass() + ":  unpack: " + unpack);
      try {
        timeout = Duration.parse(opt.timeout()).getSeconds();
        LOG.info(getClass() + ":  timeout: " + opt.timeout() + " (" + timeout + " sec)");
      } catch (DateTimeParseException e) {
        LOG.warning("illegal timeout: " + opt.timeout());
      }
    }
  }


  private Throwable process0() throws Exception {
    // Do all installs before anyhting else.
    for (Resource r : Binaries.getResourcesByType(getClass(), ResourceType.INSTALL)) {
      Binaries.doInstall(r, LOG);
    }
    Resource resource = Binaries.getAutoExecResource(getClass());
    if (resource == null)
      // Nothing can be executed automatically, move on with doProcess() ...
      return doProcessWrapper(PROC);

    Executable e;
    switch (resource.type()) {
//      case OMS_COMP:
//        Class<?> cl = getClass();
//        if (!resource.file().isEmpty()) {
//          String classname = resource.file();
//          cl = Thread.currentThread().getContextClassLoader().loadClass(classname);
//        }
//        try {
//          OMSComponentMapper cm = new OMSComponentMapper(cl.newInstance());
//          cm.setInputs(this);
//          cm.process();
//          cm.getOutputs(this);
//        } catch (Exception t) {
//          return t;
//        }
//        return null;
      case PYTHON3:
      case PYTHON2:
        e = Binaries.getResourcePython(resource, getWorkspaceDir0(), LOG, getClass());
        break;
      case EXECUTABLE:
        e = Binaries.getResourceExe0(resource, getWorkspaceDir0(), LOG, getClass());
        break;
      case CLASSNAME:
//      case OMS_DSL:
        List<File> jars = new ArrayList<>();
        File csipHome = new File(Config.getString(Config.CSIP_DIR));
        for (Resource j1 : Binaries.getResourcesByType(getClass(), ResourceType.JAR)) {
          // be aware of the underscore in a jar file.
          jars.add(Binaries.unpackResource0(j1, csipHome, LOG));
          // Binaries.unpackResource("/oms-all.jar_", new File(oms_home, "oms-all.jar"));
        }
//        if (resource.type() == ResourceType.OMS_DSL)
//          e = Binaries.getResourceOMSDSL(resource, getWorkspaceDir0(), jars, LOG);
//        else
        e = Binaries.getResourceJava(resource, getWorkspaceDir0(), jars, LOG);

        break;
      default:
        return new ServiceException("Invalid resource type for id='auto' " + resource.type());
    }

    LOG.info("running : " + e.getName());
    int ret = e.exec();
    LOG.info("done with exit value: " + ret);

    // Successful
    if (ret == 0) {
      if (LOG.isLoggable(Level.INFO)) {
        FilenameFilter ff = new WildcardFileFilter("*" + Binaries.STDOUT, IOCase.INSENSITIVE);
        File[] f = getWorkspaceDir0().listFiles(ff);
        if (f != null && f.length > 0)
          LOG.info(f[0] + ":\n" + FileUtils.readFileToString(f[0], "UTF-8"));
      }
      return null;
    }

    // Failed
    FilenameFilter ff = new WildcardFileFilter("*" + Binaries.STDERR, IOCase.INSENSITIVE);
    File[] f = getWorkspaceDir0().listFiles(ff);
    if (f != null && f.length > 0)
      return new ServiceException(FileUtils.readFileToString(f[0], "UTF-8"));

    return new ServiceException("Error: return code " + ret);
  }


  private Throwable postProcess0() throws Exception {
    for (Resource r : Binaries.getResourcesByType(getClass(), ResourceType.OUTPUT)) {
      String[] files = r.file().split("\\s+");
      for (String file : files) {
        File[] f = Utils.expandFiles(getWorkspaceDir0(), file);
        if (f != null && f.length > 0)
          results().put(f);
      }
    }
    // this is for debugging only! be careful
    if (metainfo.optBoolean("archive", false)) {
      File archiveZip = zipWorkspace();
      File f = new File(getWorkspaceDir0(), "archive-" + getSUID() + ".zip");
      archiveZip.renameTo(f);
      results().put(f);
    }
    return doProcessWrapper(POST);
  }


  final File getWorkspaceDir0() {
    File ws = workspace;
    if (ws == null) {
      synchronized (this) {
        ws = workspace;
        if (ws == null) {
          workspace = ws = Services.getWorkDir(getSUID());
          workspace.mkdirs();
        }
      }
    }
    return ws;
  }


  /**
   * Get a new Results folder for this model run. returns null if it has no
   * results folder.
   *
   * @return the results folder or null if there should be none.
   */
  private File getResultsDir() {
    if (results == null) {
      results = Services.getResultsDir(getSUID());
      results.mkdirs();
    }
    return results;
  }


  /**
   * Get the simulation unique identifier (128 byte UUID).
   *
   * @return the suid string
   */
  protected final String getSUID() {
    return suid;
  }


  final boolean isAsync() {
    return metainfo.has(KEY_MODE)
        && metainfo.optString(KEY_MODE).equals(ASYNC);
  }


  /**
   * Get the Parameter as map "name" to "JSONObject".
   *
   * @return the parameter map
   */
  @Deprecated
  protected final Map<String, JSONObject> getParamMap() {
    return paramMap;
  }


  /**
   * Get the request parameter.
   *
   * @return request parameter
   */
  @Deprecated
  protected final Object getParam() {
    return param;
  }

  ///////////////////////////////////////////////////////////////////////private     

  private void doCreateReport(JSONObject meta) throws Exception {
    // if the request wants to skip reporting
    if (meta.has("report") && !meta.getBoolean("report"))
      return;

    //  allow the subclass to populate report entries
    doReport();
    // report call has been made?
    if (rep != null) {
      JSONArray report = create_report();
      if (frep != null)
        annotateResults(report, frep);

      // put an flag into the metainfo to indicate report 
      // availability
      meta.put("report", true);
      JSONObject r = new JSONObject();
      r.put(KEY_METAINFO, meta)
          .put(KEY_REPORT, report);
      String resp = JSONUtils.toString(r);
      FileUtils.writeStringToFile(new File(getResultsDir(), REPORT_FILE), resp, "UTF-8");
      if (LOG.isLoggable(Level.INFO))
        LOG.info("created report : " + new File(getResultsDir(), REPORT_FILE));
    }
  }


  private void processResults(List<File> resultfiles) throws IOException {
    File wsDir = getWorkspaceDir0();
    File resultDir = getResultsDir();
    for (File sfile : resultfiles) {
      if (!sfile.exists()) {
        sfile = new File(wsDir, sfile.toString());
        if (!sfile.exists())
          throw new IOException("Result file not found: " + sfile);

      }
      if (!sfile.canRead())
        throw new IOException("Cannot read file: " + sfile);

      if (LOG.isLoggable(Level.INFO))
        LOG.info("Copy result" + sfile.toString() + " to " + resultDir);

      FileUtils.copyFileToDirectory(sfile, resultDir);
    }
  }


  private void annotateResults(JSONArray results, List<File> files) throws Exception {
    if (files == null || files.isEmpty())
      return;

    // No need to convert toPublicURL as getCodebase() is drawing from a 
    // previously translated URI anyway.
    String url = request().getCodebase();
    for (File f : files) {
      String fn = f.getName();
      String fn_enc = UriBuilder.fromPath(fn).build().toString(); // proper url encoding.
      String descr = fdesc().get(f);
      results.put(JSONUtils.dataDesc(fn, url + "q/" + getSUID() + "/" + fn_enc, descr));
    }
  }


  private JSONObject getFailedMetaInfo() {
    try {
      JSONObject metainfoClone = (metainfo == null)
          ? new JSONObject() : JSONUtils.clone(metainfo);
      metainfoClone.put(KEY_STATUS, FAILED)
          .put(KEY_SUUID, getSUID())
          .put(KEY_TSTAMP, Dates.nowISO(tz))
          .put(KEY_SERVICE_URL, request().getURL())
          .put(KEY_CLOUD_NODE, Services.LOCAL_IP_ADDR)
          .put(KEY_REQ_IP, request().getRemoteAddr());
      return metainfoClone;
    } catch (JSONException ex) {
      LOG.log(Level.SEVERE, null, ex);
    }
    return null;
  }


  private JSONObject getDeniedMetaInfo() {
    try {
      JSONObject metainfoClone = (metainfo == null)
          ? new JSONObject() : JSONUtils.clone(metainfo);
      metainfoClone.put(KEY_STATUS, DENIED)
          .put(KEY_TSTAMP, Dates.nowISO(tz))
          .put(KEY_SERVICE_URL, request().getURL())
          .put(KEY_REQ_IP, request().getRemoteAddr());
      return metainfoClone;
    } catch (JSONException ex) {
      LOG.log(Level.SEVERE, null, ex);
    }
    return null;
  }


  private void validateAuthentication(HttpServletRequest httpReq)
      throws SecurityException {
    TokenAuthentication ta = Config.getTokenAuthentication();
    if (ta.requiresAuthentication()) {
      String auth = httpReq.getHeader(HttpHeaders.AUTHORIZATION);
      if (ta.isTokenBasedAuthentication(auth))
        ta.validate(ta.getToken(auth));
      else
        throw new SecurityException("No authorization token provided.");
    }
  }


  private void initOnce(HttpServletRequest httpReq) {
    if (needInit) {
      String extSUID = httpReq.getHeader(KEY_SUUID);
      if (extSUID != null)
        suid = extSUID;
      else
        // only use the a (kafka) publisher 
        // when needed: the external suid is set.
        publisher = Publisher.NONE;

      if (LOG == null)
        LOG = new SessionLogger(getResultsDir(), service().getPath(),
            getSUID(), Config.getString(Config.CSIP_LOGGING_LEVEL));

      start = new Date();
      processOptions();
      needInit = false;
    }
  }


  /**
   * Get the status type if allowed.
   *
   * @param s the requested status type
   * @return the actual status type.
   */
  private Response.StatusType getRespStatus(Response.StatusType s) {
    if (SERVICE_ERROR >= 500)
      return s;

    return Response.Status.OK;
  }


  //////////////////////////////////////////////////////////////////// HTTP    
  /**
   * Describe the service as JSON. (Service endpoint use only)
   *
   * @param uriInfo the URI Info (injected)
   * @param httpReq the servlet request (injected)
   * @return The service signature as JSON
   */
  @GET
  @Produces(MediaType.APPLICATION_JSON)
  public final String describeJSON(@Context UriInfo uriInfo,
      @Context HttpServletRequest httpReq) {
    initOnce(httpReq);
    LOG.log(Level.INFO, "HTTP/GET {0}", uriInfo.getRequestUri().toString());
    LOG.log(Level.INFO, " Request Parameter template for " + getClass());
    try {
      // 1) Check if service is mapping a OMS component
//      List<Resource> resource = Binaries.getResourcesByType(getClass(), ResourceType.OMS_COMP);
//      if (resource.size() > 0) {
//        Class<?> cl = getClass();
//        if (!resource.get(0).file().isEmpty()) {
//          String classname = resource.get(0).file();
//          cl = Thread.currentThread().getContextClassLoader().loadClass(classname);
//        }
//        OMSComponentMapper cm = new OMSComponentMapper(cl.newInstance());
//        return cm.getInputs().toString(4);
//      }

      JSONObject r;
      JSONArray params = getCapabilities();

      // 2) check if service is implementing getCapabilities()
      if (params != null) {
        r = JSONUtils.newRequest();
        r.put(ModelDataService.KEY_PARAMETER, params);
      } else {
        // 3) look for an external .json template.
        String classname = '/' + getClass().getName().replace('.', '/');
        LOG.info("Search " + classname + "Req.json");
        InputStream is = getClass().getResourceAsStream(classname + "Req.json");
        if (is == null) {
          LOG.info("Search " + classname + ".json");
          is = getClass().getResourceAsStream(classname + ".json");
          if (is == null) {
            LOG.info("Return empty request.");
            r = JSONUtils.newRequest();
          } else {
            r = new JSONObject(IOUtils.toString(is, "UTF-8"));
          }
        } else {
          r = new JSONObject(IOUtils.toString(is, "UTF-8"));
        }
      }
      JSONObject m = r.getJSONObject(KEY_METAINFO);
      JSONUtils.mergeInto(Utils.getServiceInfo(getClass()), m);
      String host = Utils.getPublicRequestURL(httpReq);
      m.put(KEY_SERVICE_URL, host);
      return JSONUtils.toString(r).replace("\\/", "/");
    } catch (Exception ex) {
      LOG.log(Level.WARNING, null, ex);
      LOG.info("Return empty request.");
      return JSONUtils.newRequest().toString();
    }
  }


  /**
   * Service Handler for non-multipart requests. There are no form parameter,
   * everything is in the body. (Service endpoint only)
   *
   * @param uriInfo The UriInfo context
   * @param httpReq tye servlet request
   * @param requestStr the request string
   * @return the JSON response of the service.
   */
  @POST
  @Produces(MediaType.APPLICATION_JSON)
  @Consumes(MediaType.APPLICATION_JSON)
  public final Response execute(@Context UriInfo uriInfo,
      @Context HttpServletRequest httpReq, String requestStr) {

    initOnce(httpReq);

    LOG.log(Level.INFO, "HTTP/POST {0}", uriInfo.getRequestUri().toString());
    reqRemoteIp = httpReq.getHeader("X-Forwarded-For");
    if (reqRemoteIp == null)
      reqRemoteIp = httpReq.getRemoteAddr();

    reqHost = httpReq.getRemoteHost();
    reqUrl = Utils.getPublicRequestURL(httpReq);
    reqContext = httpReq.getContextPath();
    reqScheme = httpReq.getScheme();
    reqAuth = httpReq.getHeader(HttpHeaders.AUTHORIZATION);

    if (requestStr == null)
      return Response.status(getRespStatus(Response.Status.BAD_REQUEST))
          .entity(JSONUtils.error("Null JSON Request.").toString()).build();

    if (LOG.isLoggable(Level.INFO)) {
      LOG.info("path: " + service().getPath());
      LOG.info("from: " + httpReq.getRemoteAddr() + ","
          + httpReq.getRemoteHost() + "," + httpReq.getRemoteUser());
      LOG.info("request url: " + httpReq.getRequestURL().toString());
      LOG.info("context: " + httpReq.getContextPath());
      LOG.info("x-forwarded-for:" + httpReq.getHeader("X-Forwarded-For"));
    }

    if (LOG.isLoggable(Level.FINE))
      LOG.fine("request: " + requestStr);

    JSONObject response = null;
    String responseStr = null;

    try {
      try {
        request = new JSONObject(requestStr);
      } catch (JSONException E) {
        return Response.status(getRespStatus(Response.Status.BAD_REQUEST))
            .entity(JSONUtils.error("Malformed JSON Request.").toString()).build();
      }

      // a request can be empty: '{}', means no metainfo, no parameter 
      if (request.length() == 0)
        request = JSONUtils.newRequest();

      // fetch parameter and meta info.
      param = request.get(KEY_PARAMETER);

      // get the metainfo, if not, create one.
      if (request.has(KEY_METAINFO))
        metainfo = request.getJSONObject(KEY_METAINFO);
      else
        metainfo = new JSONObject();

      // optional passing of webhook as HTTP Header.
      String webhook = httpReq.getHeader(HEADER_WEBHOOK);
      if (webhook != null)
        metainfo.put(KEY_WEBHOOK, webhook);

      String rf = httpReq.getHeader(HEADER_REQUEST_FILE);
      if (rf != null)
        metainfo.put(KEY_REQUEST_FILE, rf);

      FileUtils.writeStringToFile(new File(getResultsDir(), REQUEST_FILE), requestStr, "UTF-8");
      validateAuthentication(httpReq);
      if (resultStore != ResultStore.NONE) {
        digest = resultStore.getDigest(service().getPath() + "-" + requestStr);
        if (digest == null)
          throw new ServiceException("Internal error: No digest created.");

        if (!metainfo.optBoolean("rs_skip", false))
          cachedResult = resultStore.getResult(digest);
      }

      // extract all Resources
      Binaries.extractAllResources(getClass(), LOG);

      paramMap = JSONUtils.preprocess(param);

      // add the attachmed inputs to it
      if (inputs != null && inputs.length > 0) {
        String a = Arrays.toString(inputs);
        metainfo.put("attachments", a.substring(1, a.length() - 1));
      }

      // handleCache(metainfo);
      tz = JSONUtils.getJSONString(metainfo, KEY_TZ, Config.getString(Config.CSIP_TIMEZONE));
      if (LOG.isLoggable(Level.INFO))
        LOG.info("timezone: " + tz);

      if (metainfo.has(KEY_PARAMETERSETS)) {
        // ensemble run
        response = new Callable<JSONObject>() {
          String path = service().getPath();


          @Override
          public JSONObject call() throws Exception {

            // map the request to callables
            List<Callable<JSONObject>> ens = Services.mapEnsemble(request, path);
            if (ens != null) {
              // execute it
              List<Future<JSONObject>> results = Services.runEnsemble(ens);

              // reduce results
              return Services.reduceEnsemble(results, request);
            } else {
              return JSONUtils.newError(getFailedMetaInfo(),
                  param, new ServiceException("Not an ensemble."));
            }
          }

        }.call();
      } else {
        // single run:
        publisher.publish(STARTED, getSUID());
        mt = new Task();
        if (metainfo.has(KEY_MODE) && metainfo.getString(KEY_MODE).equals(ASYNC)) {
          // async call
          JSONObject metainfoClone = JSONUtils.clone(metainfo);

          // Put polling info in obj before starting!
          ServiceAnnotations sa = service();
          if (sa.getNextPoll() != -1) {
            metainfoClone.put(KEY_NEXT_POLL, sa.getNextPoll());
          }
          if (sa.getFirstPoll() != -1) {
            metainfoClone.put(KEY_FIRST_POLL, sa.getFirstPoll());
          }

          // generate response.
          metainfoClone.put(KEY_STATUS, SUBMITTED)
              .put(KEY_SUUID, getSUID())
              .put(KEY_TSTAMP, Dates.nowISO(tz))
              .put(KEY_SERVICE_URL, request().getURL());

          // done, there is the response
          response = JSONUtils.newResponse(request, null, metainfoClone);
          responseStr = JSONUtils.toString(response);

          // start the async call as the very last thing before returning
          mt.start();
        } else {
          // sync call
          mt.run();
          responseStr = FileUtils.readFileToString(
              new File(getResultsDir(), RESPONSE_FILE), "UTF-8");
        }
      }
    } catch (SecurityException E) {
      response = JSONUtils.newDeniedError(getDeniedMetaInfo(), param, E);
      responseStr = JSONUtils.toString(response).replace("\\/", "/");
      LOG.log(Level.SEVERE, E.getMessage());
      statusCode = SERVICE_ERROR;
    } catch (Throwable t) {
      response = JSONUtils.newError(getFailedMetaInfo(), param, t);
      responseStr = JSONUtils.toString(response).replace("\\/", "/");
      LOG.log(Level.SEVERE, response.toString(), t);
      statusCode = SERVICE_ERROR;
    }
    if (LOG.isLoggable(Level.FINE)) {
      LOG.fine("response :" + responseStr);
    }
//    LOG.close();
    return Response.status(getRespStatus(new ServiceErrorStatusType(statusCode)))
        .entity(responseStr)
        .build();
  }


  /**
   * Handler for model services. Multi-part handling. (Service endpoint only)
   *
   * @param uriInfo the context info
   * @param httpReq the servlet request
   * @param multipart multi part input.
   *
   * @return the JSON response as String.
   */
  @POST
  @Produces(MediaType.APPLICATION_JSON)
  @Consumes(MediaType.MULTIPART_FORM_DATA)
  public final Response execute(@Context UriInfo uriInfo,
      @Context HttpServletRequest httpReq, FormDataMultiPart multipart) {

    initOnce(httpReq);
    LOG.log(Level.INFO, "HTTP/POST {0}", uriInfo.getRequestUri().toString());
    Response response = null;
    try {
      if (multipart == null || multipart.getBodyParts() == null
          || multipart.getBodyParts().isEmpty())
        throw new ServiceException("Missing JSON request and/or files");

      formData = Services.getFormParameter(multipart.getBodyParts());
      File ws = getWorkspaceDir0();

      // copy all file - file9 to workspace.
      inputs = Utils.copyAndExtract(LOG, ws, formData, unpack);

      String requestStr = null;
      // fetch param JSON
      FormDataParameter fdParam = formData.get(FORM_PARAM);
      if (fdParam == null && !formData.isEmpty()) {
        // only files are attached, no json
        requestStr = JSONUtils.newRequest().toString();
        if (LOG.isLoggable(Level.INFO))
          LOG.info("creating empty request string");

        //  throw new ServiceException("Missing JSON request.");
      } else if (fdParam.getFilename() == null) {
        // direct passing
        requestStr = fdParam.getValue();
      } else {
        if (ws == null)
          throw new ServiceException("Illegal service configuration. missing simulation dir.");

        File paramFile = new File(ws, fdParam.getFilename());
        if (!paramFile.exists())
          throw new ServiceException("Missing JSON request file.");

        requestStr = FileUtils.readFileToString(paramFile, "UTF-8");
      }
      // proxy to the regular call.
      response = execute(uriInfo, httpReq, requestStr);
    } catch (Throwable t) {
      response = Response.status(getRespStatus(Response.Status.fromStatusCode(SERVICE_ERROR)))
          .entity(JSONUtils.newError(getFailedMetaInfo(), param, t).toString()).build();
      LOG.log(Level.SEVERE, response.toString(), t);
    }
    // Let's not catch every exception so we have hope of finding the bug!
//    LOG.close();
    return response;
  }


  /**
   * Set the progress as a string message. Call this message during process() to
   * indicate progress for long running models. If the service is called
   * asynchronously the message will be reported in the metainfo part of the
   * rest call as the 'progress' entry.
   *
   * @param progress a meaningful message
   * @throws ServiceException if there are problems.
   */
  protected void setProgress(String progress) throws ServiceException {
    this.progress = progress;
    mt.setRunningStatus();
  }


  /**
   * Set the progress as a numerical value (0..100)
   *
   * @param progress a value between 0 and 100;
   * @throws ServiceException if progress arguments are out of range
   */
  protected void setProgress(int progress) throws ServiceException {
    if (progress < 0 || progress > 100)
      throw new ServiceException("invalid progress: " + progress);

    setProgress(Integer.toString(progress));
  }


  private long getKeepResults() {
    if (metainfo.optBoolean("archive", false))
      return metainfo.optLong("csip.session.ttl", Dates.getDurationSec("csip.session.ttl"));

    return Dates.getDurationSec("csip.session.ttl");
  }


  File zipWorkspace() throws IOException {
    // copy back the files to the workspace to make them a part of the archive
    for (String f : new String[]{REQUEST_FILE, RESPONSE_FILE, RESULT_FILE, LOG_FILE}) {
      File req = new File(getResultsDir(), f);
      if (req.exists())
        FileUtils.copyFileToDirectory(req, getWorkspaceDir0());
    }
    return ZipFiles.zip(getWorkspaceDir0());
  }

  /**
   * Model execution Task.
   *
   */
  final class Task extends Thread {

    // 1 day in milli seconds, max time for a running job to be accessible.
    static final long ONE_DAY = 60 * 60 * 24 * 1000;
    //
    FutureTask<Throwable> task = new FutureTask<>(() -> process0());
    String status = UNKNOWN;
    Timer snapshot;
    Throwable serviceError;
    // empty request.
    JSONObject emptyReq = new JSONObject();


    @Override
    public void run() {
      ExecutorService es = Config.getExecutorService();
      try {
        // step 1 preprocess input
        setRunningStatus();
        publisher.publish(RUNNING, getSUID());
        serviceError = doProcessWrapper(PRE);
        if (serviceError == null) {
          Config.getModelTasks().add(this);
          es.submit(task);
          // running from here on.
          long sec = Dates.getDurationSec(Config.CSIP_SNAPSHOT, -1);
          if (sec > 0) {
            if (Config.isArchiveEnabled()) {
              snapshot = new Timer("timer-" + getSUID());
              snapshot.scheduleAtFixedRate(new TimerTask() {
                long dur = -1;


                @Override
                public void run() {
                  try {
                    if (sec > dur) {
                      LOG.info("taking workspace snapshot ...");
                      ArchiveStore as = Config.getArchiveStore();
                      if (!as.isAvailable())
                        return;

                      long start = System.currentTimeMillis();
                      ModelSession ms = Config.getSessionStore().getSession(getSUID());
                      File snapshotZip = zipWorkspace();
                      DateFormat df = Dates.newISOFormat(tz);
                      Date now = new Date();
                      Date expDate = Dates.futureDate(now, 31536000); // 1 year expiration 
                      ModelArchive ma = new ModelArchive(df.format(now),
                          df.format(expDate), request().getURL(), ms.getStatus(), ms.getReqIP());
                      if (as.hasArchive(getSUID()))
                        as.removeArchive(getSUID());

                      as.archiveSession(getSUID(), ma, snapshotZip);
                      FileUtils.deleteQuietly(snapshotZip);
                      long end = System.currentTimeMillis();
                      dur = (end - start) / 1000;
                      LOG.info("snapshot done in " + dur + " seconds.");
                    } else {
                      LOG.info("skipped snapshot: " + dur + " - " + sec);
                    }
                  } catch (Exception ex) {
                    LOG.log(Level.SEVERE, null, ex);
                  }
                }
              }, sec * 1000, sec * 1000);
            }
          }
          // waiting here.     
          serviceError = task.get(timeout, TimeUnit.SECONDS);
          if (serviceError == null) {
            // step 3
            serviceError = postProcess0();
            if (serviceError == null) {
              // step 4
              JSONArray results = create_results();
              if (fres != null)
                // automatically annotate the output if there are files being returned.
                annotateResults(results, fres);

              if (results.length() == 0) {
                // check if there is a .results.json file from python or R
                // TODO: Read that file into JSONArray
                File f = new File(getResultsDir(), RESULT_FILE);
                if (f.exists() && f.canRead())
                  results = new JSONObject(FileUtils.readFileToString(f, "UTF-8"))
                      .getJSONArray(KEY_RESULT);
              }
              JSONObject meta = setFinishedStatus(results);

              // step 5 create a report (if supported)
              doCreateReport(meta);

              // done here
              if (snapshot != null)
                snapshot.cancel();
            }
          }
        }

        if (serviceError != null) {
          // doProcess threw an exception as serviceError
          setFailedStatus(serviceError);
          statusCode = SERVICE_ERROR;
          LOG.log(Level.SEVERE, null, serviceError);
        }
      } catch (TimeoutException e) {
        LOG.info("service: " + service().getPath()
            + "  timed out after " + timeout + " seconds. service cancelled.");
        setTimedoutStatus();
      } catch (CancellationException e) {
        LOG.log(Level.INFO, "cancelled.");
        setCancelledStatus();
      } catch (Exception e) {
        LOG.log(Level.SEVERE, null, e);
        setFailedStatus(e);
      } finally {
        // manage expiration action
        try {
          final ModelSession ms = Config.getSessionStore().getSession(getSUID());
          if (fres != null)
            processResults(fres);

          if (frep != null)
            processResults(frep);

          // archive management
          new Thread(() -> {
            if (Config.isArchiveEnabled() && metainfo.optBoolean("csip.archive.enabled", true)) {

              try {
                if ((Config.getBoolean(Config.CSIP_ARCHIVE_FAILEDONLY) && serviceError != null) // only archive if failed
                    || !Config.getBoolean(Config.CSIP_ARCHIVE_FAILEDONLY) // always archive
                    || (Config.getBoolean(Config.CSIP_ARCHIVE_ONREQUEST) && metainfo().getBoolean(Config.CSIP_ARCHIVE_ONREQUEST))) { // only if requested.
                  JSONObject mi = metainfo;
                  if (mi != null && "map".equals(mi.optString("phase")))
                    // do not archive in the map phase.
                    return;

                  ArchiveStore as = Config.getArchiveStore();
                  if (as.isAvailable()) {
                    // close log before archiving.
                    LOG.close();
                    long st = System.currentTimeMillis();
                    File archiveZip = zipWorkspace();
                    // move it from session store to archive store.
                    // turn session into archive
                    DateFormat df = Dates.newISOFormat(tz);
                    Date now = new Date();
                    Date expDate = Dates.futureDate(now, Dates.getDurationSec("csip.archive.ttl"));
                    ModelArchive ma = new ModelArchive(df.format(now),
                        df.format(expDate), request().getURL(),
                        ms.getStatus(), ms.getReqIP());

                    if (as.hasArchive(getSUID()))
                      as.removeArchive(getSUID());
                    as.archiveSession(getSUID(), ma, archiveZip);
                    publisher.publish(ARCHIVED, getSUID());
                    FileUtils.deleteQuietly(archiveZip);
                    long en = System.currentTimeMillis();
                    LOG.info("Archived  " + getSUID() + " in " + (en - st) + " ms.");
                  }
                }
              } catch (Exception ex) {
                LOG.log(Level.SEVERE, null, ex);
              }
            }

            // remove the workspace unless requested via config
            boolean delete_ws = true;
            if (Config.getBoolean(Config.CSIP_KEEPWORKSPACE, false)) {
              delete_ws = false;
            } else if (ms.getStatus().equals(FAILED)
                && Config.getBoolean("csip.keepworkspace.failed", false)) {
              delete_ws = false;
            }
            if (delete_ws)
              FileUtils.deleteQuietly(getWorkspaceDir0());

            // keep the results, let the Sweeper remove it after ttl
            // separate therad
            Sweeper r = new Sweeper(hasResultsDir() ? getResultsDir() : null, getSUID());
            long keepResults = getKeepResults();
            if (ms.getStatus().equals(FAILED)) {
              keepResults = Dates.getDurationSec("csip.session.ttl.failed");
            } else if (ms.getStatus().equals(CANCELED)) {
              keepResults = Dates.getDurationSec("csip.session.ttl.cancelled");
            }
            if (keepResults > 0) {
              try {
                Config.getTimer().schedule(r, keepResults * 1000);
              } catch (IllegalStateException E) {
                Config.getNewTimer().schedule(r, keepResults * 1000);
              }
            } else {
              r.run();
            }
          }).start();
        } catch (Exception ex) {
          LOG.log(Level.SEVERE, null, ex);
        }
        LOG.close();   // this might already being closed if archived.
        api.clear();
        // remove the running session.
        Config.getModelTasks().remove(this);
      }
    }


    boolean cancel() {
      onCancel();
      return task.cancel(true);
    }


    ModelDataService getService() {
      return ModelDataService.this;
    }


    @Override
    public String toString() {
      return "\n[" + getSUID() + " - " + status + " - " + start.toString() + "]";
    }


    // status management.
    private void store(JSONObject request, JSONObject metainfo, JSONArray results) {
      try {
        Object results_ = results;
        if (JSONUtils.getPayloadVersion(param) == 2)
          results_ = JSONUtils.toV2(results);
        JSONObject response = JSONUtils.newResponse(request, results_, metainfo);
        File responseFile = new File(getResultsDir(), RESPONSE_FILE);
        Lock l = Config.wsFileLocks.get(responseFile, (File t) -> new ReentrantLock());
        l.lock();
        try {
          FileUtils.writeStringToFile(responseFile,
              JSONUtils.toString(response).replace("\\/", "/"), "UTF-8");
        } finally {
          l.unlock();
        }

        String stat = metainfo.getString(KEY_STATUS);
        String tstamp = metainfo.getString(KEY_TSTAMP);
        String expDate = "";
        if (metainfo.has(KEY_EXPIRATION_DATE))
          expDate = metainfo.getString(KEY_EXPIRATION_DATE);
        String cputime = "";
        if (metainfo.has(KEY_CPU_TIME))
          cputime = metainfo.getString(KEY_CPU_TIME);
        boolean hasReport = metainfo.has("report") && metainfo.getBoolean("report");

        ModelSession ms = new ModelSession(tstamp, expDate, service().getPath(), stat,
            Services.LOCAL_IP_ADDR, cputime, request().getRemoteAddr(), inputs, hasReport, progress);
        Config.getSessionStore().setSession(getSUID(), ms);
      } catch (Exception ex) {
        LOG.log(Level.SEVERE, null, ex);
      }
    }


    private JSONObject updateMetadata(String status) {
      this.status = status;
      JSONObject mi = metainfo;
      try {
        mi.put(KEY_STATUS, status)
            .put(KEY_SUUID, getSUID())
            .put(KEY_CLOUD_NODE, Services.LOCAL_IP_ADDR)
            .put(KEY_REQ_IP, request().getRemoteAddr())
            .put(KEY_SERVICE_URL, request().getURL());
        if (progress != null)
          mi.put(KEY_PROGRESS, progress);

        if (mi.has(KEY_LOG))
          mi.put(KEY_LOG, request().getCodebase() + "q/log/" + getSUID());

        mi.put(KEY_TSTAMP, Dates.newISOFormat(tz).format(start));
      } catch (JSONException ex) {
        LOG.log(Level.SEVERE, null, ex);
      }
      return mi;
    }


    private void setRunningStatus() {
      try {
        JSONObject metainfo = updateMetadata(RUNNING);
        store(emptyReq, metainfo, null);
      } catch (Exception ex) {
        LOG.log(Level.SEVERE, null, ex);
      }
    }


    private JSONObject setFinishedStatus(JSONArray results) {
      try {
        // only is progress is being touched, set it to 100%
        JSONObject metainfo = updateMetadata(isQueued() ? QUEUED : FINISHED);
        metainfo.put(KEY_CPU_TIME, Dates.diffInMillis(start, new Date()));
        Date expDate = Dates.futureDate(getKeepResults());
        metainfo.put(KEY_EXPIRATION_DATE, Dates.newISOFormat(tz).format(expDate));
        if (resultStore != ResultStore.NONE) {
          // only store the result if there are no input attachments
          // and no output file is being produced.
          if (cachedResult == null && (fres == null || fresults().isEmpty()) && attachments().getFilesCount() == 0)
            resultStore.putResult(digest, results);
          metainfo.put("rs_digest", digest);
        }

        // COSU
        COSUImpl cosu = (COSUImpl) api.get("cosu");
        if (cosu != null && cosu.isSupported()) {
          metainfo.put("cosu", cosu.getCOSUMetainfo());
        }

        store(request, metainfo, results);
        publisher.publish(FINISHED, getSUID() + " " + request().getURL());
        return metainfo;
      } catch (JSONException ex) {
        LOG.log(Level.SEVERE, null, ex);
        return null;
      }
    }


    private void setFailedStatus(Throwable err) {
      String message = JSONUtils.getErrorMessage(err);
      try {
        JSONObject metainfo = updateMetadata(FAILED);
        metainfo.put(KEY_CPU_TIME, Dates.diffInMillis(start, new Date()));
        Date expDate = Dates.futureDate(getKeepResults());
        metainfo.put(KEY_EXPIRATION_DATE, Dates.newISOFormat(tz).format(expDate))
            .put(ERROR, message);
        if (Config.getBoolean(Config.CSIP_RESPONSE_STACKTRACE)) {
          JSONArray trace = JSONUtils.getJSONStackTrace(err);
          if (trace != null)
            metainfo.put("stacktrace", trace);
        }
        store(request, metainfo, null);
        publisher.publish(FAILED, getSUID() + " " + request().getURL());
      } catch (JSONException ex) {
        LOG.log(Level.SEVERE, null, ex);
      }
    }


    private void setCancelledStatus() {
      try {
        JSONObject metainfo = updateMetadata(CANCELED);
        metainfo.put(KEY_CPU_TIME, Dates.diffInMillis(start, new Date()));
        store(request, metainfo, null);
        publisher.publish(CANCELED, getSUID() + " " + request().getURL());
      } catch (JSONException ex) {
        LOG.log(Level.SEVERE, null, ex);
      }
    }


    private void setTimedoutStatus() {
      try {
        JSONObject metainfo = updateMetadata(TIMEDOUT);
        metainfo.put(KEY_CPU_TIME, Dates.diffInMillis(start, new Date()));
        store(request, metainfo, null);
        publisher.publish(TIMEDOUT, getSUID() + " " + request().getURL());
      } catch (JSONException ex) {
        LOG.log(Level.SEVERE, null, ex);
      }
    }
  }


  /**
   * Allow the subclass to queue the request.
   *
   * @return
   */
  boolean isQueued() {
    return false;
  }
}