QueryService.java [src/csip] Revision: default Date:
/*
* $Id$
*
* This file is part of the Cloud Services Integration Platform (CSIP),
* a Model-as-a-Service framework, API and application suite.
*
* 2012-2022, Olaf David and others, OMSLab, Colorado State University.
*
* OMSLab licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package csip;
import csip.api.server.Executable;
import static csip.Config.getString;
import csip.annotations.Gzip;
import csip.annotations.Resource;
import csip.annotations.ResourceType;
import csip.utils.Binaries;
import csip.utils.Services;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StringReader;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
import org.apache.commons.io.IOUtils;
import org.codehaus.jettison.json.JSONException;
import csip.utils.JSONUtils;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.StringWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.UriBuilder;
import org.codehaus.jettison.json.JSONObject;
/**
* QueryService
*
* @author Olaf David
*/
@Path("/q")
public class QueryService {
static final Logger LOG = Config.LOG;
// @GET
// @Path("status")
// @Produces(MediaType.APPLICATION_JSON)
// public String getStatus(@Context UriInfo uriInfo) throws Exception {
// LOG.log(Level.INFO, "HTTP/GET {0}", uriInfo.getRequestUri().toString());
//
// int running = 0;
// int finished = 0;
// int cancelled = 0;
// int failed = 0;
//
// Config.SessionStore s = Config.getSessionStore();
// for (String id : s.keys(0, Integer.MAX_VALUE, null, false)) {
// ModelSession v = s.getSession(id);
// if (v == null) {
// continue; // this should not happen
// }
// String st = v.getStatus();
// if (st.equals(ModelDataService.RUNNING)) {
// running++;
// } else if (st.equals(ModelDataService.FAILED)) {
// failed++;
// } else if (st.equals(ModelDataService.CANCELED)) {
// cancelled++;
// } else if (st.equals(ModelDataService.FINISHED)) {
// finished++;
// }
// }
// JSONObject o = new JSONObject();
// o.put("running", running);
// o.put("failed", failed);
// o.put("canceled", cancelled);
// o.put("finished", finished);
// return o.toString(2);
// }
/**
* Change the configuration.
*
* @param req
* @param arg
* @return
* @throws JSONException
*/
@POST
@Path("r")
@Consumes(MediaType.TEXT_PLAIN)
public String r(@Context HttpServletRequest req, String arg) throws Exception {
Utils.checkRemoteAccessACL(req);
@Resource(file = "bash", type = ResourceType.REFERENCE, id = "_")
class _C_ {
}
Executable e = Binaries.getResourceExe0(_C_.class, "_",
new File(System.getProperty("java.io.tmpdir")), new SessionLogger());
e.setArguments("-c", arg);
StringWriter w = new StringWriter();
e.redirectOutput(w);
e.redirectError(w);
e.exec();
return Services.LOCAL_IP_ADDR + "\n----\n" + w.toString();
}
@GET
@Path("running")
@Produces(MediaType.TEXT_PLAIN)
public String getRunning(@Context HttpServletRequest req) {
SessionStore s = Config.getSessionStore();
return Long.toString(s.countSessionsByState("Running"));
}
/**
* Check the backend for problems.
*
* @param req
* @return
* @throws JSONException
* @throws Exception
*/
@GET
@Path("check")
@Produces(MediaType.APPLICATION_JSON)
public String getCheck(@Context HttpServletRequest req) throws JSONException, Exception {
Utils.checkRemoteAccessACL(req);
JSONObject o = new JSONObject();
File work = new File(Config.getString("csip.work.dir"));
work.mkdirs();
o.put("node.address", Services.LOCAL_IP_ADDR);
o.put("cpus", Runtime.getRuntime().availableProcessors());
o.put("memory.free", Utils.humanReadableByteCount(Runtime.getRuntime().freeMemory(), true));
o.put("memory.max", Utils.humanReadableByteCount(Runtime.getRuntime().maxMemory(), true));
o.put("memory.total", Utils.humanReadableByteCount(Runtime.getRuntime().totalMemory(), true));
o.put("work.dir", work.toString());
o.put("work.exists", work.exists());
o.put("work.canread", work.canRead());
o.put("work.canexecute", work.canExecute());
o.put("work.canwrite", work.canWrite());
o.put("work.canwrite", work.canWrite());
o.put("work.usable.mb", Utils.humanReadableByteCount(work.getUsableSpace(), true));
o.put("work.free.mb", Utils.humanReadableByteCount(work.getFreeSpace(), true));
o.put("work.total.mb", Utils.humanReadableByteCount(work.getTotalSpace(), true));
File res = new File(Config.getString("csip.results.dir"));
res.mkdirs();
o.put("result.dir", res.toString());
o.put("result.exists", res.exists());
o.put("result.canread", res.canRead());
o.put("result.canexecute", res.canExecute());
o.put("result.canwrite", res.canWrite());
o.put("result.usable.mb", Utils.humanReadableByteCount(res.getUsableSpace(), true));
o.put("result.free.mb", Utils.humanReadableByteCount(res.getFreeSpace(), true));
o.put("result.total.mb", Utils.humanReadableByteCount(res.getTotalSpace(), true));
// DateFormat df = Dates.newISOFormat();
// String now = df.format(new Date());
SessionStore s = Config.getSessionStore();
switch (getString("csip.session.backend")) {
case "mongodb":
String url = getString("csip.session.mongodb.uri");
if (url != null) {
try {
UriBuilder b = UriBuilder.fromUri(url);
InetAddress address = InetAddress.getByName(b.build().getHost());
o.put("csip.session.mongodb.address", address.getHostAddress());
} catch (UnknownHostException ex) {
LOG.log(Level.WARNING, null, ex);
}
}
break;
}
// Set<String> keys = s.keys(0, Integer.MAX_VALUE, null, true);
// int expired = 0;
// int running = 0;
// int finished = 0;
// int cancelled = 0;
// int failed = 0;
// for (String id : keys) {
// ModelSession v = s.getSession(id);
// if (v == null) {
// continue; // this should not happen
// }
// String exp = v.getExpDate();
// if (exp != null && !exp.isEmpty() && exp.compareTo(now) < 0) { // check if zombie (passed expiration)
// expired++;
// }
//
// String st = v.getStatus();
// if (st.equals(ModelDataService.RUNNING)) {
// running++;
// } else if (st.equals(ModelDataService.FAILED)) {
// failed++;
// } else if (st.equals(ModelDataService.CANCELED)) {
// cancelled++;
// } else if (st.equals(ModelDataService.FINISHED)) {
// finished++;
// }
// }
// session store check.
o.put("session.store", s.getClass().getName());
try {
Config.getSessionStore().ping();
o.put("session.store.ping", "OK");
} catch (Exception E) {
o.put("session.store.ping", E.getMessage());
}
o.put("sessions.total", s.getCount());
// o.put("sessions.expired", expired);
// o.put("sessions.finished", finished);
// o.put("sessions.cancelled", cancelled);
// o.put("sessions.failed", failed);
// o.put("sessions.running", running);
o.put("archive.enabled", Config.isArchiveEnabled());
o.put("archive.store", Config.getArchiveStore().getClass().getName());
if (Config.isArchiveEnabled()) {
// expired = 0;
// failed = 0;
// cancelled = 0;
ArchiveStore a = Config.getArchiveStore();
// for (String key : a.keys(0, Integer.MAX_VALUE, null, true)) {
// ModelArchive ar = a.getArchive(key);
// String exp = ar.getEtime();
// if (exp != null && !exp.isEmpty() && exp.compareTo(now) < 0) { // check if zombie (passed expiration)
// expired++;
// }
//
// String st = ar.getStatus();
// if (st.equals(ModelDataService.FAILED)) {
// failed++;
// } else if (st.equals(ModelDataService.CANCELED)) {
// cancelled++;
// }
// }
o.put("archive.total", a.getCount());
// o.put("archive.expired", expired);
// o.put("archive.cancelled", cancelled);
// o.put("archive.failed", failed);
// o.put("accesslog.enabled", Config.isAccessLogEnabled());
// o.put("accesslog.store", Config.getAccessLogStore().getClass().getName());
}
return o.toString(2).replace("\\/", "/");
}
// public StreamingOutput getOutputFile(@Context UriInfo uriInfo,
// @PathParam("suid") String suid,
// @PathParam("resource") String file) {
// try {
// File f = new File(Services.getResultsDir(suid), file);
// if (f.exists()) {
// LOG.info("Found file " + f);
// return output(new FileInputStream(f));
// } else {
// // try to find the file.
// ModelSession session = Config.getSessionStore().getSession(suid);
// if (session == null) {
// return output(JSONUtils.error("suid unknown"));
// }
// // but the file should be here, avoid infinite loop here.
// if (session.getNodeIP().equals(Services.LOCAL_IP_ADDR)) {
// return output(JSONUtils.error("file not found, local: " + Services.LOCAL_IP_ADDR + " session: " + session.getNodeIP()));
// }
// // where is that session?
// String redirect = Services.replaceHostinURI(uriInfo.getBaseUri(), session.getNodeIP());
// InputStream is = redirectGet(redirect + "q/" + suid + "/" + file);
// return output(is);
// }
// } catch (Exception E) {
// throw new WebApplicationException(E);
// }
// }
@GET
@Produces(MediaType.WILDCARD)
@Path("{suid}/{resource}")
public Response getOutputFile(@Context UriInfo uriInfo,
@PathParam("suid") String suid,
@PathParam("resource") String file) {
return getOutputFile0(uriInfo, suid, file, true, gzip);
}
/**
*
* @param uriInfo
* @param suid
* @param file
* @param lock
* @return
*/
private Response getOutputFile0(UriInfo uriInfo,
String suid,
String file, boolean lock, boolean gzip) {
LOG.log(Level.INFO, "HTTP/GET {0}", uriInfo.getRequestUri().toString());
try {
File f = new File(Services.getResultsDir(suid), file);
if (f.exists()) {
LOG.info("Found file " + f);
ResponseBuilder b = Response.ok(output0(f, lock, gzip))
.header("Content-Disposition", "attachment; filename=" + file);
if (gzip)
b.header("Content-Encoding", "gzip");
return b.build();
// return Response.ok(output0(f, lock))
// .header("Content-Disposition", "attachment; filename=" + file)
// .build();
} else {
// try to find the file.
ModelSession session = Config.getSessionStore().getSession(suid);
if (session == null)
return Response.ok(JSONUtils.error("suid not found: " + suid).toString(), MediaType.APPLICATION_JSON).build();
// but the file should be here, avoid infinite loop here.
if (session.getNodeIP().equals(Services.LOCAL_IP_ADDR)) {
return Response.ok(JSONUtils.error("file not found, local: " + Services.LOCAL_IP_ADDR + " session: " + session.getNodeIP()).toString(),
MediaType.APPLICATION_JSON).build();
}
// where is that session?
String redirect = Utils.replaceHostinURI(uriInfo.getBaseUri(), session.getNodeIP());
return redirectGet0(redirect + "q/" + suid + "/" + file);
}
} catch (Exception E) {
throw new WebApplicationException(E);
}
}
/**
* This service fetches a file from the workspace.
*
* @param uriInfo
* @param suid
* @param file
* @param offset
* @param limit
* @return
*/
@GET
@Produces(MediaType.WILDCARD)
@Path("/u/{suid}/{resource}/{offset}/{limit}")
public StreamingOutput getUpdateFile(@Context UriInfo uriInfo,
@PathParam("suid") String suid,
@PathParam("resource") String file,
@PathParam("offset") long offset,
@PathParam("limit") long limit) {
LOG.log(Level.INFO, "HTTP/GET {0}", uriInfo.getRequestUri().toString());
try {
File f = new File(Services.getWorkDir(suid), file);
if (f.exists()) {
LOG.info("Found file " + f);
final FileInputStream fi = new FileInputStream(f);
if (offset > 0)
fi.skip(offset);
if (limit > 0) {
return output(new InputStream() {
long idx = limit;
@Override
public int read() throws IOException {
if (idx-- > 0)
return fi.read();
return -1;
}
});
}
return output(fi);
} else {
ModelSession session = Config.getSessionStore().getSession(suid);
if (session == null)
return output(JSONUtils.error("suid unknown"));
if (session.getNodeIP().equals(Services.LOCAL_IP_ADDR))
// but the file should be here.
return output(JSONUtils.error("file not found, local: " + Services.LOCAL_IP_ADDR + " session: " + session.getNodeIP()));
// redirect
String redirect = Utils.replaceHostinURI(uriInfo.getBaseUri(), session.getNodeIP());
InputStream is = redirectGet(redirect + "q/u/" + suid + "/" + file + "/" + offset + "/" + limit);
return output(is);
}
} catch (Exception E) {
throw new WebApplicationException(E);
}
}
/**
* Get the response file.
* @param uriInfo
* @param suid
* @return
*/
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("res/{suid}")
public String response(@Context UriInfo uriInfo, @PathParam("suid") String suid) {
return getStringFile(uriInfo, suid, ModelDataService.RESPONSE_FILE, true);
}
/**
* Get the request file.
* @param uriInfo
* @param suid
* @return
*/
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("req/{suid}")
public String request(@Context UriInfo uriInfo, @PathParam("suid") String suid) {
return getStringFile(uriInfo, suid, ModelDataService.REQUEST_FILE, false);
}
/**
* Get the log file.
* @param uriInfo
* @param suid
* @return
*/
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("log/{suid}")
public String log(@Context UriInfo uriInfo, @PathParam("suid") String suid) {
return getStringFile(uriInfo, suid, ModelDataService.LOG_FILE, false);
}
/**
* Get the log file.
* @param uriInfo
* @param suid
* @return
*/
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("{suid}")
public String query(@Context UriInfo uriInfo, @PathParam("suid") String suid) {
return response(uriInfo, suid);
}
/////////////
private static InputStream redirectGet(String target) {
LOG.info("Redirect query to: " + target);
Client client = ClientBuilder.newClient();
WebTarget service = client.target(target);
Response response = service.request(MediaType.WILDCARD).get();
return response.readEntity(InputStream.class);
}
private static Response redirectGet0(String target) {
LOG.info("Redirect query to: " + target);
Client client = ClientBuilder.newClient();
WebTarget service = client.target(target);
Response response = service.request(MediaType.WILDCARD).get();
return response;
}
private static StreamingOutput output(final JSONObject o) {
return (OutputStream output) -> {
try {
IOUtils.copy(new StringReader(o.toString()), output, "UTF-8");
output.flush();
} catch (IOException e) {
throw new WebApplicationException(e);
}
};
}
private static StreamingOutput output(final InputStream input) {
return (OutputStream output) -> {
IOUtils.copy(input, output);
input.close();
output.flush();
};
}
static boolean gzip = Config.getBoolean("csip.q.gzip", false);
private static StreamingOutput output0(File file, boolean lock, boolean gzip) throws FileNotFoundException {
return new StreamingOutput() {
@Override
public void write(OutputStream output) throws IOException, WebApplicationException {
Lock l = null;
try {
if (lock) {
l = Config.wsFileLocks.get(file, (File t) -> new ReentrantLock());
l.lock();
}
InputStream input = new BufferedInputStream(new FileInputStream(file));
if (gzip) {
GZIPOutputStream out = new GZIPOutputStream(output);
IOUtils.copy(input, out);
out.finish();
} else {
IOUtils.copy(input, output);
output.flush();
}
input.close();
} finally {
if (l != null)
l.unlock();
}
}
};
}
static StreamingOutput output(final byte[] input) {
return (OutputStream output) -> {
try {
IOUtils.write(input, output);
output.flush();
} catch (IOException e) {
throw new WebApplicationException(e);
}
};
}
private static String toString(StreamingOutput o) throws IOException {
ByteArrayOutputStream os = new ByteArrayOutputStream();
o.write(os);
return os.toString();
}
private String getStringFile(UriInfo uriInfo, String suid, String file, boolean lock) {
LOG.log(Level.INFO, "HTTP/GET {0}", uriInfo.getRequestUri().toString());
try {
if (Config.getSessionStore().hasSession(suid)) {
Response out = getOutputFile0(uriInfo, suid, file, lock, false);
if (out.getEntity() instanceof StreamingOutput) {
// inbound, local response
StreamingOutput o = (StreamingOutput) out.getEntity();
return toString(o);
} else {
// outbound, client response
InputStream is = out.readEntity(InputStream.class);
return IOUtils.toString(is, "UTF-8");
}
}
return JSONUtils.error("suid unknown: " + suid).toString();
} catch (Exception E) {
LOG.log(Level.SEVERE, null, E);
return JSONUtils.error("exception fetching suid: " + suid + ", problem: " + E.getMessage()).toString();
}
}
}