QueryService.java [src/csip] Revision: eb62ecc0fa3a63f4285e8875e6800a01d41d8b9f  Date: Mon Apr 04 14:57:32 MDT 2016
/*
 * $Id$
 * 
 * This file is part of the Cloud Services Integration Platform (CSIP),
 * 2010-2013, Olaf David and others, Colorado State University.
 *
 * CSIP is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, version 2.1.
 *
 * CSIP is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with OMS.  If not, see <http://www.gnu.org/licenses/lgpl.txt>.
 */
package csip;

import csip.Config.ArchiveStore;
import csip.Config.SessionStore;
import csip.utils.Dates;
import csip.utils.Services;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StringReader;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
import org.apache.commons.io.IOUtils;
import org.codehaus.jettison.json.JSONException;
import csip.utils.JSONUtils;
import java.io.ByteArrayOutputStream;
import java.text.DateFormat;
import java.util.Date;
import java.util.Set;
import java.util.logging.Logger;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import org.codehaus.jettison.json.JSONObject;

/**
 * QueryService
 *
 * @author Olaf David
 */
@Path("/q")
public class QueryService {

    static final Logger LOG = Logger.getLogger(QueryService.class.getName());


    /**
     * Check the backend for problems.
     *
     * @param uriInfo
     * @return
     * @throws JSONException
     * @throws Exception
     */
    @GET
    @Path("check")
    @Produces(MediaType.APPLICATION_JSON)
    public String getCheck(@Context UriInfo uriInfo) throws JSONException, Exception {
        JSONObject o = new JSONObject();
        File work = new File(Config.getString("csip.work.dir"));
        work.mkdirs();

        o.put("node", Services.LOCAL_IP_ADDR);
        o.put("work.dir", work.toString());
        o.put("work.exists", work.exists() ? "OK" : "Failed");
        o.put("work.canread", work.canRead() ? "OK" : "Failed");
        o.put("work.canexecute", work.canExecute() ? "OK" : "Failed");
        o.put("work.canwrite", work.canWrite() ? "OK" : "Failed");
        o.put("work.canwrite", work.canWrite() ? "OK" : "Failed");
        o.put("work.available.mb", work.getUsableSpace() / 1024L / 1024L);

        File res = new File(Config.getString("csip.results.dir"));
        res.mkdirs();
        o.put("result.dir", res.toString());
        o.put("result.exists", res.exists() ? "OK" : "Failed");
        o.put("result.canread", res.canRead() ? "OK" : "Failed");
        o.put("result.canexecute", res.canExecute() ? "OK" : "Failed");
        o.put("result.canwrite", res.canWrite() ? "OK" : "Failed");
        o.put("result.available.mb", res.getUsableSpace() / 1024L / 1024L);

        DateFormat df = Dates.newISOFormat();
        String now = df.format(new Date());
        SessionStore s = Config.getSessionStore();
        Set<String> keys = s.keys(0, 0, null, true);
        int expired = 0;
        int running = 0;
        int finished = 0;
        int cancelled = 0;
        int failed = 0;
        for (String id : keys) {
            ModelSession v = s.getSession(id);
            if (v == null) {
                continue;  // this should not happen
            }
            String exp = v.getExpDate();
            if (exp != null && !exp.isEmpty() && exp.compareTo(now) < 0) {  // check if zombie (passed expiration)
                expired++;
            }

            String st = v.getStatus();
            if (st.equals(ModelDataService.RUNNING)) {
                running++;
            } else if (st.equals(ModelDataService.FAILED)) {
                failed++;
            } else if (st.equals(ModelDataService.CANCELED)) {
                cancelled++;
            } else if (st.equals(ModelDataService.FINISHED)) {
                finished++;
            }
        }

        // session store check.
        o.put("session.store", s.getClass().getName());
        try {
            Config.getSessionStore().ping();
            o.put("session.store.ping", "OK");
        } catch (Exception E) {
            o.put("session.store.ping", E.getMessage());
        }

        o.put("sessions.total", s.getCount());
        o.put("sessions.expired", expired);
        o.put("sessions.finished", finished);
        o.put("sessions.cancelled", cancelled);
        o.put("sessions.failed", failed);
        o.put("sessions.running", running);

        o.put("archive.enabled", Config.isArchiveEnabled());
        o.put("archive.store", Config.getArchiveStore().getClass().getName());
        if (Config.isArchiveEnabled()) {
            expired = 0;
            failed = 0;
            cancelled = 0;
            ArchiveStore a = Config.getArchiveStore();
            for (String key : a.keys(0, 0, null, true)) {
                ModelArchive ar = a.getArchive(key);
                String exp = ar.getEtime();
                if (exp != null && !exp.isEmpty() && exp.compareTo(now) < 0) {  // check if zombie (passed expiration)
                    expired++;
                }

                String st = ar.getStatus();
                if (st.equals(ModelDataService.FAILED)) {
                    failed++;
                } else if (st.equals(ModelDataService.CANCELED)) {
                    cancelled++;
                }
            }
            o.put("archive.total", a.getCount());
            o.put("archive.expired", expired);
            o.put("archive.cancelled", cancelled);
            o.put("archive.failed", failed);

            o.put("accesslog.enabled", Config.isAccessLogEnabled());
            o.put("accesslog.store", Config.getAccessLogStore().getClass().getName());
        }
        return o.toString(4).replace("\\/", "/");
    }


//    public StreamingOutput getOutputFile(@Context UriInfo uriInfo,
//            @PathParam("suid") String suid,
//            @PathParam("resource") String file) {
//        try {
//            File f = new File(Services.getResultsDir(suid), file);
//            if (f.exists()) {
//                LOG.info("Found file " + f);
//                return output(new FileInputStream(f));
//            } else {
//                // try to find the file.
//                ModelSession session = Config.getSessionStore().getSession(suid);
//                if (session == null) {
//                    return output(JSONUtils.error("suid unknown"));
//                }
//                // but the file should be here, avoid infinite loop here.
//                if (session.getNodeIP().equals(Services.LOCAL_IP_ADDR)) {
//                    return output(JSONUtils.error("file not found, local: " + Services.LOCAL_IP_ADDR + " session: " + session.getNodeIP()));
//                }
//                // where is that session?
//                String redirect = Services.replaceHostinURI(uriInfo.getBaseUri(), session.getNodeIP());
//                InputStream is = redirectGet(redirect + "q/" + suid + "/" + file);
//                return output(is);
//            }
//        } catch (Exception E) {
//            throw new WebApplicationException(E);
//        }
//    }
    @GET
    @Produces(MediaType.WILDCARD)
    @Path("{suid}/{resource}")
    public Response getOutputFile(@Context UriInfo uriInfo,
            @PathParam("suid") String suid,
            @PathParam("resource") String file) {
        try {
            File f = new File(Services.getResultsDir(suid), file);
            if (f.exists()) {
                LOG.info("Found file " + f);
                return Response.ok(output(new FileInputStream(f))).header("Content-Disposition", "attachment; filename=" + file).build();
            } else {
                // try to find the file.
                ModelSession session = Config.getSessionStore().getSession(suid);
                if (session == null) {
                    return Response.ok(JSONUtils.error("suid not found: " + suid).toString(), MediaType.APPLICATION_JSON).build();
                }
                // but the file should be here, avoid infinite loop here.
                if (session.getNodeIP().equals(Services.LOCAL_IP_ADDR)) {
                    return Response.ok(JSONUtils.error("file not found, local: " + Services.LOCAL_IP_ADDR + " session: " + session.getNodeIP()).toString(),
                            MediaType.APPLICATION_JSON).build();
                }
                // where is that session?
                String redirect = Services.replaceHostinURI(uriInfo.getBaseUri(), session.getNodeIP());
                return redirectGet0(redirect + "q/" + suid + "/" + file);
            }
        } catch (Exception E) {
            throw new WebApplicationException(E);
        }
    }


    /**
     * This service fetches a file from the workspace.
     *
     * @param uriInfo
     * @param suid
     * @param file
     * @param offset
     * @param limit
     * @return
     */
    @GET
    @Produces(MediaType.WILDCARD)
    @Path("/u/{suid}/{resource}/{offset}/{limit}")
    public StreamingOutput getUpdateFile(@Context UriInfo uriInfo,
            @PathParam("suid") String suid,
            @PathParam("resource") String file,
            @PathParam("offset") long offset,
            @PathParam("limit") long limit) {
        try {
            File f = new File(Services.getWorkDir(suid), file);
            if (f.exists()) {
                LOG.info("Found file " + f);
                final FileInputStream fi = new FileInputStream(f);
                if (offset > 0) {
                    fi.skip(offset);
                }
                if (limit > 0) {
                    return output(new InputStream() {
                        long idx = limit;


                        @Override
                        public int read() throws IOException {
                            if (idx-- > 0) {
                                return fi.read();
                            }
                            return -1;
                        }
                    });
                }
                return output(fi);
            } else {
                ModelSession session = Config.getSessionStore().getSession(suid);
                if (session == null) {
                    return output(JSONUtils.error("suid unknown"));
                }
                if (session.getNodeIP().equals(Services.LOCAL_IP_ADDR)) {
                    // but the file should be here.
                    return output(JSONUtils.error("file not found, local: " + Services.LOCAL_IP_ADDR + " session: " + session.getNodeIP()));
                }
                // redirect
                String redirect = Services.replaceHostinURI(uriInfo.getBaseUri(), session.getNodeIP());
                InputStream is = redirectGet(redirect + "q/u/" + suid + "/" + file + "/" + offset + "/" + limit);
                return output(is);
            }
        } catch (Exception E) {
            throw new WebApplicationException(E);
        }
    }


    /**
     * Get the response file.
     * @param uriInfo
     * @param suid
     * @return
     */
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Path("res/{suid}")
    public String response(@Context UriInfo uriInfo, @PathParam("suid") String suid) {
        return getStringFile(uriInfo, suid, ModelDataService.RESPONSE_FILE);
    }


    /**
     * Get the request file.
     * @param uriInfo
     * @param suid
     * @return
     */
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Path("req/{suid}")
    public String request(@Context UriInfo uriInfo, @PathParam("suid") String suid) {
        return getStringFile(uriInfo, suid, ModelDataService.REQUEST_FILE);
    }


    /**
     * Get the log file.
     * @param uriInfo
     * @param suid
     * @return
     */
    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("log/{suid}")
    public String log(@Context UriInfo uriInfo, @PathParam("suid") String suid) {
        return getStringFile(uriInfo, suid, ModelDataService.LOG_FILE);
    }


    /**
     * Get the log file.
     * @param uriInfo
     * @param suid
     * @return
     */
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Path("{suid}")
    public String query(@Context UriInfo uriInfo, @PathParam("suid") String suid) {
        return response(uriInfo, suid);
    }

/////////////

    private static InputStream redirectGet(String target) {
        LOG.info("Redirect query to: " + target);
        Client client = ClientBuilder.newClient();
        WebTarget service = client.target(target);
        Response response = service.request(MediaType.WILDCARD).get();
        return response.readEntity(InputStream.class);
    }


    private static Response redirectGet0(String target) {
        LOG.info("Redirect query to: " + target);
        Client client = ClientBuilder.newClient();
        WebTarget service = client.target(target);
        Response response = service.request(MediaType.WILDCARD).get();
        return response;
    }


    private static StreamingOutput output(final JSONObject o) {
        return (OutputStream output) -> {
            try {
                IOUtils.copy(new StringReader(o.toString()), output);
            } catch (IOException e) {
                throw new WebApplicationException(e);
            }
        };
    }


    private static StreamingOutput output(final InputStream input) {
        return (OutputStream output) -> {
            IOUtils.copy(input, output);
        };
    }


    static StreamingOutput output(final byte[] input) {
        return (OutputStream output) -> {
            try {
                IOUtils.write(input, output);
            } catch (IOException e) {
                throw new WebApplicationException(e);
            }
        };
    }


    private static String toString(StreamingOutput o) throws IOException {
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        o.write(os);
        return os.toString();
    }


    private String getStringFile(UriInfo uriInfo, String suid, String file) {
        try {
            if (Config.getSessionStore().hasSession(suid)) {
                Response out = getOutputFile(uriInfo, suid, file);

                if (out.getEntity() instanceof StreamingOutput) {
                    // inbound, local response
                    StreamingOutput o = (StreamingOutput) out.getEntity();
                    return toString(o);
                } else {
                    // outbound, client response
                    InputStream is = out.readEntity(InputStream.class);
                    return IOUtils.toString(is);
                }
            }
            return JSONUtils.error("suid unknown: " + suid).toString();
        } catch (Exception E) {
            return JSONUtils.error("exception fetching suid: " + suid + ", problem: " + E.getMessage()).toString();
        }
    }

}