Services.java [src/csip/utils] Revision: f27b0b136832f2e9c4a345cbdef5967b2442f743  Date: Fri Apr 21 16:24:27 MDT 2017
/*
 * $Id$
 *
 * This file is part of the Cloud Services Integration Platform (CSIP),
 * a Model-as-a-Service framework, API and application suite.
 *
 * 2012-2017, Olaf David and others, OMSLab, Colorado State University.
 *
 * OMSLab licenses this file to you under the MIT license.
 * See the LICENSE file in the project root for more information.
 */
package csip.utils;

import csip.Config;
import csip.ModelDataService;
import csip.ServiceException;
import csip.SessionLogger;
import java.io.*;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.security.MessageDigest;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.UriBuilder;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveException;
import org.apache.commons.compress.archivers.ArchiveInputStream;
import org.apache.commons.compress.archivers.ArchiveStreamFactory;
import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.glassfish.jersey.media.multipart.BodyPart;
import org.glassfish.jersey.media.multipart.BodyPartEntity;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;

/**
 * service utilities.
 *
 * @author Olaf David
 */
public class Services {

    public static final String LOCAL_IP_ADDR = getLocalIP();
    public static final int ENSEMBLE_THREADS = 10;

    static final Logger LOG = Logger.getLogger(Services.class.getName());


    /**
     * Returns the current local IP address or an empty string in error case /
     * when no network connection is up.
     *
     * @return Returns the current local IP address or an empty string in error
     * case.
     * @since 0.1.0
     */
    private static String getLocalIP() {

        String ipOnly = "";
        try {
            Enumeration<NetworkInterface> nifs = NetworkInterface.getNetworkInterfaces();
            if (nifs == null) {
                return "";
            }
            while (nifs.hasMoreElements()) {
                NetworkInterface nif = nifs.nextElement();
                if (!nif.isLoopback() && nif.isUp() && !nif.isVirtual()) {
                    Enumeration<InetAddress> adrs = nif.getInetAddresses();
                    while (adrs.hasMoreElements()) {
                        InetAddress adr = adrs.nextElement();
                        if (adr != null && !adr.isLoopbackAddress() && (nif.isPointToPoint() || !adr.isLinkLocalAddress())) {
                            String adrIP = adr.getHostAddress();
                            String adrName = nif.isPointToPoint() ? adrIP : adr.getCanonicalHostName();
                            if (!adrName.equals(adrIP)) {
                                return adrIP;
                            } else {
                                ipOnly = adrIP;
                            }
                        }
                    }
                }
            }
            if (ipOnly.length() == 0) {
                return null;
            }
            return ipOnly;
        } catch (SocketException ex) {
            return null;
        }
    }


    static {
        Calendar uuidEpoch = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
        uuidEpoch.clear();
        uuidEpoch.set(1582, 9, 15, 0, 0, 0); // 9 = October
        epochMillis = uuidEpoch.getTime().getTime();
    }

    static long epochMillis;
//    

//    private static long getTime(UUID uuid) {
//        return (uuid.timestamp() / 10000L) + epochMillis;
//    }
//    static long getTime(String uuid) {
//        UUID u = UUID.fromString(uuid);
//        return (u.timestamp() / 10000L) + epochMillis;
//    }
    static SimpleDateFormat f = new SimpleDateFormat("/dd/HH");


    private static synchronized String getPrefix(String uuid) {
        UUID u = UUID.fromString(uuid);
        long time = (u.timestamp() / 10000L) + epochMillis;
        return f.format(new Date(time));
    }

//    public static synchronized String getPrefix(UUID uuid) {
//        SimpleDateFormat f = new SimpleDateFormat("/dd/HH");
//        return f.format(new Date(getTime(uuid)));
//    }

    public static File getResultsDir(String suid) {
        return new File(Config.getString("csip.results.dir", "/tmp/csip/results") + getPrefix(suid), suid);
    }


    public static File getWorkDir(String suid) {
        return new File(Config.getString("csip.work.dir", "/tmp/csip/work") + getPrefix(suid), suid);
    }


//    public static void main(String[] args) {
//        
//        System.out.println(getPrefix("6b7a63c1-16b6-11e5-83a9-112c53ec8f43"));
////        TimeBasedGenerator gen = Generators.timeBasedGenerator();
////        UUID i = gen.generate();
////        System.out.println(getTime(i));
////        System.out.println(new Date(getTime(i)));
//    }
    /**
     * Create a dummy callable
     *
     * @return a callable that does nothing.
     * @throws Exception
     */
    public static Callable<String> dummyCallable() throws Exception {
        return new Callable<String>() {
            @Override
            public String call() throws Exception {
                return ModelDataService.EXEC_OK;
            }
        };
    }

    public static class FormDataParameter {

        String name;
        InputStream is;
        String filename;
        String value;


        public FormDataParameter(BodyPart bp) {
            FormDataContentDisposition fd = (FormDataContentDisposition) bp.getContentDisposition();
            name = fd.getName();
            if (fd.getFileName() != null) {
                BodyPartEntity bpe = (BodyPartEntity) bp.getEntity();
                is = bpe.getInputStream();
                filename = fd.getFileName();
            } else {
                value = bp.getEntityAs(String.class);
            }
        }


        public InputStream getInputStream() {
            return is;
        }


        public boolean isFile() {
            return filename != null;
        }


        public String getValue() {
            return value;
        }


        public String getFilename() {
            return filename;
        }


        public String getName() {
            return name;
        }
    }


    /**
     * Creates a map of Strings pointing to the input streams fo files
     *
     * @param b
     * @return The form parameter
     */
    public static Map<String, FormDataParameter> getFormParameter(List<BodyPart> b) {
        Map<String, FormDataParameter> m = new HashMap<>();
        for (BodyPart bp : b) {
            FormDataContentDisposition fd = (FormDataContentDisposition) bp.getContentDisposition();
            m.put(fd.getName(), new FormDataParameter(bp));
        }
        return m;
    }


    @Deprecated
    public static File[] toFiles(String... n) {
        File[] f = new File[n.length];
        for (int i = 0; i < f.length; i++) {
            f[i] = new File(n[i]);
        }
        return f;
    }


    public static String replaceHostinURI(URI uri, String newHost) throws Exception {
//        String port = Config.getString("vm.port", "8080");
//        return "http://" + newHost + ":" + port + uri.toURL().getPath();
        int port = Config.getInt("csip.peer.port", 8080);
        UriBuilder b = UriBuilder.fromUri(uri);
        URI u = b.host(newHost).port(port).build();
        LOG.info("replace in Host: " + u.toString());
        return u.toString();
    }


    public static URI toPublicURL(String u) {
        UriBuilder b = UriBuilder.fromUri(u);
        String s = Config.getString("csip.public.scheme");
        if (s != null) {
            b = b.scheme(s);
        }
        s = Config.getString("csip.public.host");
        if (s != null) {
            b = b.host(s);
        }
        s = Config.getString("csip.public.port");
        if (s != null) {
            b = b.port(Integer.parseInt(s));
        }
        return b.build();
    }


    public static URI toPublicURL(URI u) {
        UriBuilder b = UriBuilder.fromUri(u);
        String s = Config.getString("csip.public.scheme");
        if (s != null) {
            b = b.scheme(s);
        }
        s = Config.getString("csip.public.host");
        if (s != null) {
            b = b.host(s);
        }
        s = Config.getString("csip.public.port");
        if (s != null) {
            b = b.port(Integer.parseInt(s));
        }
        return b.build();
    }


    /**
     * Copy the formParameter (files) to a directory, if the file is an archive,
     * The content will be extracted.
     *
     * @param log
     * @param dir
     * @param forms
     * @return the extracted files.
     * @throws ServiceException
     */
    public static String[] copyAndExtract(SessionLogger log, File dir, Map<String, FormDataParameter> forms, boolean unpack) throws ServiceException {
        List<String> files = new ArrayList<>();
        for (FormDataParameter fd : forms.values()) {
            if (!fd.isFile()) {
                continue;
            }
            String name = fd.getFilename();
            name = name.replace('\\', '/');
//            name = name.substring(name.lastIndexOf('/') + 1);  // TODo why?

            InputStream fis = fd.getInputStream();
            String lcName = name.toLowerCase();

            // archives
            try {
                if ((lcName.endsWith(".bz2") || lcName.endsWith(".gz")) && unpack) {
                    // wrapper (supports single files, as well as tar.gz / tar.bzls
                    fis = new CompressorStreamFactory().createCompressorInputStream(new BufferedInputStream(fis));
                    name = removeExt(name);
                    lcName = name.toLowerCase();
                }
                if ((lcName.endsWith(".zip") || lcName.endsWith(".tar")) && unpack) {
                    ArchiveInputStream is = new ArchiveStreamFactory().createArchiveInputStream(new BufferedInputStream(fis));
                    ArchiveEntry entry = null;
                    while ((entry = is.getNextEntry()) != null) {
                        if (is.canReadEntryData(entry)) {
                            if (entry.isDirectory()) {
                                new File(dir, entry.getName()).mkdirs();
                            } else {
                                File f = new File(dir, entry.getName());
                                if (!f.getParentFile().exists()) {
                                    f.getParentFile().mkdirs();
                                }
                                try (FileOutputStream ous = new FileOutputStream(f)) {
                                    copyAndCheckFS(is, ous);
                                }
                                files.add(entry.getName());
                                f.setLastModified(entry.getLastModifiedDate().getTime());
                                log.info("Extracted :" + entry.getName() + " as " + f);
                            }
                        }
                    }
                    is.close();
                } else {
                    File f = new File(dir, name);
                    if (!f.getParentFile().exists()) {
                        f.getParentFile().mkdirs();
                    }
                    try (FileOutputStream ous = new FileOutputStream(f)) {
                        copyAndCheckFS(fis, ous);
                    }
                    files.add(name);
                    if (log.isLoggable(Level.INFO)) {
                        log.info("copy form data file: " + name + " to " + dir);
                    }
                }
            } catch (CompressorException | ArchiveException ex) {
                // should not happen
            } catch (IOException ex) {
                throw new ServiceException(ex);
            }
        }
        return files.toArray(new String[files.size()]);
    }


    public static void copyAndCheckFS(InputStream input, OutputStream output) throws IOException {
        byte[] buffer = new byte[4096];
        int n = 0;
        // 85% is full !
//        double fsFull = Config.getDouble("m.fsfull", 0.85);
        while ((n = input.read(buffer)) != -1) {
//            if (BinUtils.getFSUsage(dir) > fsFull) {
//                throw new IOException("File system full.");
//            }
            output.write(buffer, 0, n);
        }
    }


    public static String removeExt(String name) {
        return name.substring(0, name.lastIndexOf("."));
    }


    public static String removeFirstLastChar(String text) {
        return text.substring(1, text.length() - 1);
    }


    public static String md5(File file) {
        if (!file.exists()) {
            throw new IllegalArgumentException("not found: " + file);
        }
        if (file.length() == 0) {
            throw new IllegalArgumentException("empty file: " + file);
        }
        FileInputStream fIn = null;
        try {
            fIn = new FileInputStream(file);
            MessageDigest md = MessageDigest.getInstance("MD5");
            FileChannel fChan = fIn.getChannel();
            ByteBuffer mBuf = ByteBuffer.allocate((int) fChan.size());
            fChan.read(mBuf);
            return toHex(md.digest(mBuf.array()));
        } catch (Exception ex) {
            ex.printStackTrace(System.err);
        } finally {
            try {
                if (fIn != null) {
                    fIn.close();
                }
            } catch (IOException ex) {
            }
        }
        return "";
    }

//////////////////////////////
// private
/////////////////////////////

    private static String toHex(byte[] data) {
        StringBuilder buf = new StringBuilder();
        for (int i = 0; i < data.length; i++) {
            int halfbyte = (data[i] >>> 4) & 0x0F;
            int two_halfs = 0;
            do {
                if ((0 <= halfbyte) && (halfbyte <= 9)) {
                    buf.append((char) ('0' + halfbyte));
                } else {
                    buf.append((char) ('a' + (halfbyte - 10)));
                }
                halfbyte = data[i] & 0x0F;
            } while (two_halfs++ < 1);
        }
        return buf.toString();
    }

    public interface CallableFactory {

        Callable create(int i);
    }


//    public static List<Future<JSONObject>> run(int max, final CallableFactory callable) {
//        return run(max, Config.getInt("codebase.threadpool", ENSEMBLE_THREADS), callable);
//    }
//
//
//    public static List<Future<JSONObject>> run(int max, int threads, final CallableFactory callable) {
//        final CountDownLatch latch = new CountDownLatch(max);
//        final int attempts = 3;
//        final boolean fail_all = true;
//
//        final ExecutorService executor = Executors.newFixedThreadPool(threads);
//        List<Future<JSONObject>> resp = new ArrayList<>();
//        for (int i = 0; i < max; i++) {
//            final int ii = i;
//            resp.add(executor.submit(new Callable<JSONObject>() {
//                @Override
//                public JSONObject call() throws Exception {
//                    JSONObject o = null;
//                    String err_msg = null;
//                    Callable<JSONObject> ca = callable.create(ii);
//                    int a = attempts;
//                    // allow trying this multiple times.
//                    while (a-- > 0 && o == null) {
//                        try {
//                            o = ca.call();
//                        } catch (Exception E) {
//                            err_msg = E.getMessage();
//                        }
//                    }
//                    if (o == null && fail_all) {
//                        executor.shutdown();
//                        throw new ServiceException("Failed service :" + ii + " " + err_msg);
//                    }
//                    latch.countDown();
//                    return o == null ? JSONUtils.error(err_msg) : o;
//                }
//            }));
//        }
//        try {
//            latch.await();
//        } catch (InterruptedException ex) {
//        }
//        executor.shutdown();
//        return resp;
//    }
    static synchronized ExecutorService getES(int nthreads, int bq_len) {
        BlockingQueue<Runnable> bq = new ArrayBlockingQueue<>(nthreads + bq_len);
        RejectedExecutionHandler eh = new ThreadPoolExecutor.CallerRunsPolicy();
        ExecutorService es = new ThreadPoolExecutor(nthreads, nthreads, 0L, TimeUnit.MILLISECONDS, bq, eh);
        return es;
    }


    public static void runParallel(int count, CallableFactory factory) {
        runParallel(count, Config.getInt("csip.service.peers", 4), factory);
    }


    public static void runParallel(int count, int threads, CallableFactory factory) {
        runParallel(count, threads, Config.getInt("csip.internal.call.attempts", 4),
                Config.getInt("csip.internal.bq", 4), factory);
    }


    public static void runParallel(int count, int threads, final int attempts, int bq, CallableFactory factory) {

        // have the number of threads being bound by count
        int threads_ = Math.min(count, threads);
        final ExecutorService exec = getES(threads_, bq);
        final CountDownLatch latch = new CountDownLatch(count);
        for (int i = 0; i < count; i++) {
            final Callable c = factory.create(i);
            exec.submit(new Runnable() {
                @Override
                public void run() {
                    int a = attempts;
                    Exception Ex = null;
                    while (a > 0) {
                        try {
                            c.call();
                            break;
                        } catch (Exception E) {
                            System.err.println("Failed #" + a);
                            Ex = E;
                            a--;
                        }
                    }
                    if (Ex != null) {
                        System.err.println("Failed all attempts, last exception:");
                        Ex.printStackTrace(System.err);
                        exec.shutdownNow();
                    }
                    latch.countDown();
                }
            });
        }
        try {
            latch.await();
        } catch (InterruptedException ex) {
        }
        exec.shutdownNow();
    }


//    public static List<Future<JSONObject>> run0(int max, int threads, final CallableFactory callable) {
//        final CountDownLatch latch = new CountDownLatch(max);
//        final ExecutorService executor = Executors.newFixedThreadPool(threads);
//        List<Future<JSONObject>> resp = new ArrayList<>();
//        for (int i = 0; i < max; i++) {
//            final int ii = i;
//            resp.add(executor.submit(new Callable<JSONObject>() {
//                @Override
//                public JSONObject call() throws Exception {
//                    JSONObject o = null;
//                    String err_msg = null;
//                    Callable<JSONObject> ca = callable.create(ii);
//                    try {
//                        o = ca.call();
//                    } catch (Exception E) {
//                        err_msg = E.getMessage();
//                    } catch (AssertionError E) {
//                        err_msg = E.getMessage();
//                    }
//                    latch.countDown();
//                    return (err_msg != null) ? JSONUtils.error(err_msg) : o;
//                }
//            }));
//        }
//        try {
//            latch.await();
//        } catch (InterruptedException ex) {
//        }
//        executor.shutdownNow();
//        return resp;
//    }
    /**
     * run all models at once.
     *
     * @param models
     * @return the list of futures
     * @throws ExecutionException
     */
    public static List<Future<JSONObject>> runEnsemble(List<Callable<JSONObject>> models) throws ExecutionException {

        final ExecutorService executor = Executors.newFixedThreadPool(Config.getInt("codebase.threadpool", ENSEMBLE_THREADS));
        final CountDownLatch barrier = new CountDownLatch(models.size());
        final List<Future<JSONObject>> results = new ArrayList<>();
        // Model callables
        for (final Callable<JSONObject> ca : models) {
            results.add(executor.submit(new Callable<JSONObject>() {
                @Override
                public JSONObject call() {
                    JSONObject res = null;
                    try {
                        res = ca.call();
                    } catch (Exception E) {
                        executor.shutdownNow();
                    }
                    barrier.countDown();
                    return res;
                }
            }));
        }

        try {
            barrier.await();
        } catch (InterruptedException E) {
        }

        executor.shutdown();
        return results;
    }


    /**
     * Slice a original request into single runs.
     *
     * @param req
     * @param path
     * @return the mapped list of ensembles.
     */
    public static List<Callable<JSONObject>> mapEnsemble(JSONObject req, String path) throws JSONException {

        String codebase = Config.getString("codebase.url", "http://csip.engr.colostate.edu:8081/rest");

        JSONObject metainfo = req.getJSONObject(ModelDataService.KEY_METAINFO);
        if (!req.has(ModelDataService.KEY_METAINFO) || !metainfo.has(ModelDataService.KEY_PARAMETERSETS)) {
            return null;
        }
        List<Callable<JSONObject>> runs = new ArrayList<Callable<JSONObject>>();
        if (metainfo.has(ModelDataService.KEY_PARAMETERSETS)) {
            JSONArray psets = req.getJSONArray(ModelDataService.KEY_PARAMETER);
            for (int i = 0; i < metainfo.getInt(ModelDataService.KEY_PARAMETERSETS); i++) {
                JSONArray pset = psets.getJSONArray(i);
                JSONObject single_req = JSONUtils.newRequest(pset, new JSONObject());
                RestCallable mv = new RestCallable(single_req, codebase + path);
                runs.add(mv);
            }
        }
        return runs;
    }


    static boolean isFailed(JSONObject res) throws JSONException {
        return res.getJSONObject(ModelDataService.KEY_METAINFO).getString(ModelDataService.KEY_STATUS).equals("Failed");
    }


    public static JSONObject reduceEnsemble(List<Future<JSONObject>> ens, JSONObject orig_req) throws Exception {
        JSONArray results = new JSONArray();
        for (Future<JSONObject> future : ens) {
            JSONObject res = future.get();
            if (isFailed(res)) {
                orig_req.getJSONObject(ModelDataService.KEY_METAINFO).put(ModelDataService.KEY_STATUS, "Failed");
            }
            results.put(res.get(ModelDataService.KEY_RESULT));
        }
        return JSONUtils.newResponse(orig_req.getJSONArray(ModelDataService.KEY_PARAMETER), results, orig_req.getJSONObject(ModelDataService.KEY_METAINFO));
    }

    static public class RestCallable implements Callable<JSONObject> {

        JSONObject req;
        String url;


        public RestCallable(JSONObject req, String url) {
            this.req = req;
            this.url = url;
        }


        @Override
        public JSONObject call() throws Exception {
            Client client = ClientBuilder.newClient();
            WebTarget service = client.target(UriBuilder.fromUri(url).build());
            return service.request(MediaType.APPLICATION_JSON).post(Entity.json(req), JSONObject.class);
        }
    }
}