QueryService.java [src/csip] Revision:   Date:
/*
 * $Id$
 *
 * This file is part of the Cloud Services Integration Platform (CSIP),
 * a Model-as-a-Service framework, API and application suite.
 *
 * 2012-2022, Olaf David and others, OMSLab, Colorado State University.
 *
 * OMSLab licenses this file to you under the MIT license.
 * See the LICENSE file in the project root for more information.
 */
package csip;

import csip.api.server.Executable;
import static csip.Config.getString;
import csip.annotations.Gzip;
import csip.annotations.Resource;
import csip.annotations.ResourceType;
import csip.utils.Binaries;
import csip.utils.Services;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StringReader;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
import org.apache.commons.io.IOUtils;
import org.codehaus.jettison.json.JSONException;
import csip.utils.JSONUtils;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.StringWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.UriBuilder;
import org.codehaus.jettison.json.JSONObject;

/**
 * QueryService
 *
 * @author Olaf David
 */
@Path("/q")
public class QueryService {

  static final Logger LOG = Config.LOG;


  //    @GET
  //    @Path("status")
  //    @Produces(MediaType.APPLICATION_JSON)
  //    public String getStatus(@Context UriInfo uriInfo) throws Exception {
  //        LOG.log(Level.INFO, "HTTP/GET {0}", uriInfo.getRequestUri().toString());
  //
  //        int running = 0;
  //        int finished = 0;
  //        int cancelled = 0;
  //        int failed = 0;
  //
  //        Config.SessionStore s = Config.getSessionStore();
  //        for (String id : s.keys(0, Integer.MAX_VALUE, null, false)) {
  //            ModelSession v = s.getSession(id);
  //            if (v == null) {
  //                continue;  // this should not happen
  //            }
  //            String st = v.getStatus();
  //            if (st.equals(ModelDataService.RUNNING)) {
  //                running++;
  //            } else if (st.equals(ModelDataService.FAILED)) {
  //                failed++;
  //            } else if (st.equals(ModelDataService.CANCELED)) {
  //                cancelled++;
  //            } else if (st.equals(ModelDataService.FINISHED)) {
  //                finished++;
  //            }
  //        }
  //        JSONObject o = new JSONObject();
  //        o.put("running", running);
  //        o.put("failed", failed);
  //        o.put("canceled", cancelled);
  //        o.put("finished", finished);
  //        return o.toString(2);
  //    }
  /**
   * Change the configuration.
   *
   * @param req
   * @param arg
   * @return
   * @throws JSONException
   */
  @POST
  @Path("r")
  @Consumes(MediaType.TEXT_PLAIN)
  public String r(@Context HttpServletRequest req, String arg) throws Exception {
    Utils.checkRemoteAccessACL(req);
    @Resource(file = "bash", type = ResourceType.REFERENCE, id = "_")
    class _C_ {
    }
    Executable e = Binaries.getResourceExe0(_C_.class, "_",
        new File(System.getProperty("java.io.tmpdir")), new SessionLogger());
    e.setArguments("-c", arg);
    StringWriter w = new StringWriter();
    e.redirectOutput(w);
    e.redirectError(w);
    e.exec();
    return Services.LOCAL_IP_ADDR + "\n----\n" + w.toString();
  }


  @GET
  @Path("running")
  @Produces(MediaType.TEXT_PLAIN)
  public String getRunning(@Context HttpServletRequest req) {
    SessionStore s = Config.getSessionStore();
    return Long.toString(s.countSessionsByState("Running"));
  }


  /**
   * Check the backend for problems.
   *
   * @param req
   * @return
   * @throws JSONException
   * @throws Exception
   */
  @GET
  @Path("check")
  @Produces(MediaType.APPLICATION_JSON)
  public String getCheck(@Context HttpServletRequest req) throws JSONException, Exception {
    Utils.checkRemoteAccessACL(req);

    JSONObject o = new JSONObject();
    File work = new File(Config.getString("csip.work.dir"));
    work.mkdirs();

    o.put("node.address", Services.LOCAL_IP_ADDR);
    o.put("cpus", Runtime.getRuntime().availableProcessors());
    o.put("memory.free", Utils.humanReadableByteCount(Runtime.getRuntime().freeMemory(), true));
    o.put("memory.max", Utils.humanReadableByteCount(Runtime.getRuntime().maxMemory(), true));
    o.put("memory.total", Utils.humanReadableByteCount(Runtime.getRuntime().totalMemory(), true));
    o.put("work.dir", work.toString());
    o.put("work.exists", work.exists());
    o.put("work.canread", work.canRead());
    o.put("work.canexecute", work.canExecute());
    o.put("work.canwrite", work.canWrite());
    o.put("work.canwrite", work.canWrite());
    o.put("work.usable.mb", Utils.humanReadableByteCount(work.getUsableSpace(), true));
    o.put("work.free.mb", Utils.humanReadableByteCount(work.getFreeSpace(), true));
    o.put("work.total.mb", Utils.humanReadableByteCount(work.getTotalSpace(), true));

    File res = new File(Config.getString("csip.results.dir"));
    res.mkdirs();
    o.put("result.dir", res.toString());
    o.put("result.exists", res.exists());
    o.put("result.canread", res.canRead());
    o.put("result.canexecute", res.canExecute());
    o.put("result.canwrite", res.canWrite());
    o.put("result.usable.mb", Utils.humanReadableByteCount(res.getUsableSpace(), true));
    o.put("result.free.mb", Utils.humanReadableByteCount(res.getFreeSpace(), true));
    o.put("result.total.mb", Utils.humanReadableByteCount(res.getTotalSpace(), true));

//        DateFormat df = Dates.newISOFormat();
//        String now = df.format(new Date());
    SessionStore s = Config.getSessionStore();
    switch (getString("csip.session.backend")) {
      case "mongodb":
        String url = getString("csip.session.mongodb.uri");
        if (url != null) {
          try {
            UriBuilder b = UriBuilder.fromUri(url);
            InetAddress address = InetAddress.getByName(b.build().getHost());
            o.put("csip.session.mongodb.address", address.getHostAddress());
          } catch (UnknownHostException ex) {
            LOG.log(Level.WARNING, null, ex);
          }
        }
        break;
    }

//        Set<String> keys = s.keys(0, Integer.MAX_VALUE, null, true);
//        int expired = 0;
//        int running = 0;
//        int finished = 0;
//        int cancelled = 0;
//        int failed = 0;
//        for (String id : keys) {
//            ModelSession v = s.getSession(id);
//            if (v == null) {
//                continue;  // this should not happen
//            }
//            String exp = v.getExpDate();
//            if (exp != null && !exp.isEmpty() && exp.compareTo(now) < 0) {  // check if zombie (passed expiration)
//                expired++;
//            }
//
//            String st = v.getStatus();
//            if (st.equals(ModelDataService.RUNNING)) {
//                running++;
//            } else if (st.equals(ModelDataService.FAILED)) {
//                failed++;
//            } else if (st.equals(ModelDataService.CANCELED)) {
//                cancelled++;
//            } else if (st.equals(ModelDataService.FINISHED)) {
//                finished++;
//            }
//        }
    // session store check.
    o.put("session.store", s.getClass().getName());
    try {
      Config.getSessionStore().ping();
      o.put("session.store.ping", "OK");
    } catch (Exception E) {
      o.put("session.store.ping", E.getMessage());
    }

    o.put("sessions.total", s.getCount());

//        o.put("sessions.expired", expired);
//        o.put("sessions.finished", finished);
//        o.put("sessions.cancelled", cancelled);
//        o.put("sessions.failed", failed);
//        o.put("sessions.running", running);
    o.put("archive.enabled", Config.isArchiveEnabled());
    o.put("archive.store", Config.getArchiveStore().getClass().getName());
    if (Config.isArchiveEnabled()) {
//            expired = 0;
//            failed = 0;
//            cancelled = 0;
      ArchiveStore a = Config.getArchiveStore();
//            for (String key : a.keys(0, Integer.MAX_VALUE, null, true)) {
//                ModelArchive ar = a.getArchive(key);
//                String exp = ar.getEtime();
//                if (exp != null && !exp.isEmpty() && exp.compareTo(now) < 0) {  // check if zombie (passed expiration)
//                    expired++;
//                }
//
//                String st = ar.getStatus();
//                if (st.equals(ModelDataService.FAILED)) {
//                    failed++;
//                } else if (st.equals(ModelDataService.CANCELED)) {
//                    cancelled++;
//                }
//            }
      o.put("archive.total", a.getCount());
//            o.put("archive.expired", expired);
//            o.put("archive.cancelled", cancelled);
//            o.put("archive.failed", failed);

//            o.put("accesslog.enabled", Config.isAccessLogEnabled());
//            o.put("accesslog.store", Config.getAccessLogStore().getClass().getName());
    }
    return o.toString(2).replace("\\/", "/");
  }


//    public StreamingOutput getOutputFile(@Context UriInfo uriInfo,
//            @PathParam("suid") String suid,
//            @PathParam("resource") String file) {
//        try {
//            File f = new File(Services.getResultsDir(suid), file);
//            if (f.exists()) {
//                LOG.info("Found file " + f);
//                return output(new FileInputStream(f));
//            } else {
//                // try to find the file.
//                ModelSession session = Config.getSessionStore().getSession(suid);
//                if (session == null) {
//                    return output(JSONUtils.error("suid unknown"));
//                }
//                // but the file should be here, avoid infinite loop here.
//                if (session.getNodeIP().equals(Services.LOCAL_IP_ADDR)) {
//                    return output(JSONUtils.error("file not found, local: " + Services.LOCAL_IP_ADDR + " session: " + session.getNodeIP()));
//                }
//                // where is that session?
//                String redirect = Services.replaceHostinURI(uriInfo.getBaseUri(), session.getNodeIP());
//                InputStream is = redirectGet(redirect + "q/" + suid + "/" + file);
//                return output(is);
//            }
//        } catch (Exception E) {
//            throw new WebApplicationException(E);
//        }
//    }
  @GET
  @Produces(MediaType.WILDCARD)
  @Path("{suid}/{resource}")
  public Response getOutputFile(@Context UriInfo uriInfo,
      @PathParam("suid") String suid,
      @PathParam("resource") String file) {
    return getOutputFile0(uriInfo, suid, file, true, gzip);
  }


  /**
   *
   * @param uriInfo
   * @param suid
   * @param file
   * @param lock
   * @return
   */
  private Response getOutputFile0(UriInfo uriInfo,
      String suid,
      String file, boolean lock, boolean gzip) {
    LOG.log(Level.INFO, "HTTP/GET {0}", uriInfo.getRequestUri().toString());
    try {
      File f = new File(Services.getResultsDir(suid), file);
      if (f.exists()) {
        LOG.info("Found file " + f);

        ResponseBuilder b = Response.ok(output0(f, lock, gzip))
            .header("Content-Disposition", "attachment; filename=" + file);
        if (gzip) 
          b.header("Content-Encoding", "gzip");
        
        return b.build();

//        return Response.ok(output0(f, lock))
//            .header("Content-Disposition", "attachment; filename=" + file)
//            .build();
      } else {
        // try to find the file.
        ModelSession session = Config.getSessionStore().getSession(suid);
        if (session == null) 
          return Response.ok(JSONUtils.error("suid not found: " + suid).toString(), MediaType.APPLICATION_JSON).build();
        
        // but the file should be here, avoid infinite loop here.
        if (session.getNodeIP().equals(Services.LOCAL_IP_ADDR)) {
          return Response.ok(JSONUtils.error("file not found, local: " + Services.LOCAL_IP_ADDR + " session: " + session.getNodeIP()).toString(),
              MediaType.APPLICATION_JSON).build();
        }
        // where is that session?
        String redirect = Utils.replaceHostinURI(uriInfo.getBaseUri(), session.getNodeIP());
        return redirectGet0(redirect + "q/" + suid + "/" + file);
      }
    } catch (Exception E) {
      throw new WebApplicationException(E);
    }
  }


  /**
   * This service fetches a file from the workspace.
   *
   * @param uriInfo
   * @param suid
   * @param file
   * @param offset
   * @param limit
   * @return
   */
  @GET
  @Produces(MediaType.WILDCARD)
  @Path("/u/{suid}/{resource}/{offset}/{limit}")
  public StreamingOutput getUpdateFile(@Context UriInfo uriInfo,
      @PathParam("suid") String suid,
      @PathParam("resource") String file,
      @PathParam("offset") long offset,
      @PathParam("limit") long limit) {
    LOG.log(Level.INFO, "HTTP/GET {0}", uriInfo.getRequestUri().toString());
    try {
      File f = new File(Services.getWorkDir(suid), file);
      if (f.exists()) {
        LOG.info("Found file " + f);
        final FileInputStream fi = new FileInputStream(f);
        if (offset > 0) 
          fi.skip(offset);
        
        if (limit > 0) {
          return output(new InputStream() {
            long idx = limit;


            @Override
            public int read() throws IOException {
              if (idx-- > 0) 
                return fi.read();
              
              return -1;
            }
          });
        }
        return output(fi);
      } else {
        ModelSession session = Config.getSessionStore().getSession(suid);
        if (session == null) 
          return output(JSONUtils.error("suid unknown"));
        
        if (session.getNodeIP().equals(Services.LOCAL_IP_ADDR)) 
          // but the file should be here.
          return output(JSONUtils.error("file not found, local: " + Services.LOCAL_IP_ADDR + " session: " + session.getNodeIP()));
        
        // redirect
        String redirect = Utils.replaceHostinURI(uriInfo.getBaseUri(), session.getNodeIP());
        InputStream is = redirectGet(redirect + "q/u/" + suid + "/" + file + "/" + offset + "/" + limit);
        return output(is);
      }
    } catch (Exception E) {
      throw new WebApplicationException(E);
    }
  }


  /**
   * Get the response file.
   * @param uriInfo
   * @param suid
   * @return
   */
  @GET
  @Produces(MediaType.APPLICATION_JSON)
  @Path("res/{suid}")
  public String response(@Context UriInfo uriInfo, @PathParam("suid") String suid) {
    return getStringFile(uriInfo, suid, ModelDataService.RESPONSE_FILE, true);
  }


  /**
   * Get the request file.
   * @param uriInfo
   * @param suid
   * @return
   */
  @GET
  @Produces(MediaType.APPLICATION_JSON)
  @Path("req/{suid}")
  public String request(@Context UriInfo uriInfo, @PathParam("suid") String suid) {
    return getStringFile(uriInfo, suid, ModelDataService.REQUEST_FILE, false);
  }


  /**
   * Get the log file.
   * @param uriInfo
   * @param suid
   * @return
   */
  @GET
  @Produces(MediaType.TEXT_PLAIN)
  @Path("log/{suid}")
  public String log(@Context UriInfo uriInfo, @PathParam("suid") String suid) {
    return getStringFile(uriInfo, suid, ModelDataService.LOG_FILE, false);
  }


  /**
   * Get the log file.
   * @param uriInfo
   * @param suid
   * @return
   */
  @GET
  @Produces(MediaType.APPLICATION_JSON)
  @Path("{suid}")
  public String query(@Context UriInfo uriInfo, @PathParam("suid") String suid) {
    return response(uriInfo, suid);
  }

/////////////

  private static InputStream redirectGet(String target) {
    LOG.info("Redirect query to: " + target);
    Client client = ClientBuilder.newClient();
    WebTarget service = client.target(target);
    Response response = service.request(MediaType.WILDCARD).get();
    return response.readEntity(InputStream.class);
  }


  private static Response redirectGet0(String target) {
    LOG.info("Redirect query to: " + target);
    Client client = ClientBuilder.newClient();
    WebTarget service = client.target(target);
    Response response = service.request(MediaType.WILDCARD).get();
    return response;
  }


  private static StreamingOutput output(final JSONObject o) {
    return (OutputStream output) -> {
      try {
        IOUtils.copy(new StringReader(o.toString()), output, "UTF-8");
        output.flush();
      } catch (IOException e) {
        throw new WebApplicationException(e);
      }
    };
  }


  private static StreamingOutput output(final InputStream input) {
    return (OutputStream output) -> {
      IOUtils.copy(input, output);
      input.close();
      output.flush();
    };
  }

  static boolean gzip = Config.getBoolean("csip.q.gzip", false);


  private static StreamingOutput output0(File file, boolean lock, boolean gzip) throws FileNotFoundException {
    return new StreamingOutput() {
      @Override
      public void write(OutputStream output) throws IOException, WebApplicationException {
        Lock l = null;
        try {
          if (lock) {
            l = Config.wsFileLocks.get(file, (File t) -> new ReentrantLock());
            l.lock();
          }
          InputStream input = new BufferedInputStream(new FileInputStream(file));
          if (gzip) {
            GZIPOutputStream out = new GZIPOutputStream(output);
            IOUtils.copy(input, out);
            out.finish();
          } else {
            IOUtils.copy(input, output);
            output.flush();
          }
          input.close();
        } finally {
          if (l != null) 
            l.unlock();
          
        }
      }
    };
  }


  static StreamingOutput output(final byte[] input) {
    return (OutputStream output) -> {
      try {
        IOUtils.write(input, output);
        output.flush();
      } catch (IOException e) {
        throw new WebApplicationException(e);
      }
    };
  }


  private static String toString(StreamingOutput o) throws IOException {
    ByteArrayOutputStream os = new ByteArrayOutputStream();
    o.write(os);
    return os.toString();
  }


  private String getStringFile(UriInfo uriInfo, String suid, String file, boolean lock) {
    LOG.log(Level.INFO, "HTTP/GET {0}", uriInfo.getRequestUri().toString());
    try {
      if (Config.getSessionStore().hasSession(suid)) {
        Response out = getOutputFile0(uriInfo, suid, file, lock, false);

        if (out.getEntity() instanceof StreamingOutput) {
          // inbound, local response
          StreamingOutput o = (StreamingOutput) out.getEntity();
          return toString(o);
        } else {
          // outbound, client response
          InputStream is = out.readEntity(InputStream.class);
          return IOUtils.toString(is, "UTF-8");
        }
      }
      return JSONUtils.error("suid unknown: " + suid).toString();
    } catch (Exception E) {
      LOG.log(Level.SEVERE, null, E);
      return JSONUtils.error("exception fetching suid: " + suid + ", problem: " + E.getMessage()).toString();
    }
  }

}