V1_0.java [src/java/m/wepsIntegrated] Revision: default  Date:
/*
 * $Id$
 *
 * This file is part of the Cloud Services Integration Platform (CSIP),
 * a Model-as-a-Service framework, API, and application suite.
 *
 * 2012-2017, OMSLab, Colorado State University.
 *
 * OMSLab licenses this file to you under the MIT license.
 * See the LICENSE file in the project root for more information.
 */
package m.wepsIntegrated;

import csip.Config;
import static csip.Config.CSIP_SESSION_TTL;
import csip.api.server.Executable;
import csip.ModelDataService;
import csip.ModelDataServiceConstants;
import csip.api.server.ServiceException;
import csip.annotations.*;
import static csip.annotations.ResourceType.*;
import static csip.annotations.State.DEVELOPMENT;
import csip.api.client.ModelDataServiceCall;
import csip.utils.Client;
import csip.utils.Parallel;
import csip.utils.Services;
import csip.utils.ZipFiles;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.logging.FileHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;

import javax.ws.rs.*;
import static m.wepsModelConstants.*;
import org.apache.commons.io.FileUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;

/**
 * This implements the CSIP WEPS service.
 *
 * @author od, mh
 */
@Name("WepsIntegrated for ARS")
@Description("WEPS for ARS with integrated cligen & windgen calls")
@VersionInfo("1.0")
@State(DEVELOPMENT)
@Path("m/wepsIntegrated/1.0")
@Polling(first = 2000, next = 1000)

// ARS versions of the binaries
@Resource(file = "/bin/lin-amd64/ARS/default/2023-01-09/weps", id = "weps-default.lin64", type = EXECUTABLE)
@Resource(file = "/bin/lin-amd64/ARS/dev-version/weps", id = "weps-dev.lin64", type = EXECUTABLE)


public class V1_0 extends ModelDataService implements Executable.StdHandler {

    public static final String WIND_DB_FILENAME = "windgen.wdb";
    public static final String WIND_IDX_FILENAME = "windgen.idx";
    public static final String WIND_IDX_NRCS_FILENAME = "windgen_NRCS.idx";
    
    String windgenURL;
    String cligenURL;
    
    double latitude;
    double longitude;
    double windlatitude;
    double windlongitude;
    int simulationYears;
    boolean isNRCS;
    
    Executable weps;
    
    boolean cligenLocal = false;
    
    // Reminder: add new date to json as well
    static final String SERVICE_UPDATE_DATE = "05/10/2023";
    
    
    @Override
    public void preProcess() throws Exception {
        
        setProgress("Initializing");
        
        // For Debugging !!!
        if (metainfo().getBoolean("wepsDebugLog", false)) {
            Logger l = Logger.getLogger("csip-core");
            l.setLevel(Level.FINEST);
            try { 
                FileHandler fh;  
                fh = new FileHandler(new File(workspace().getDir(),"wepsdebug-logfil.txt").getAbsolutePath());  
                l.addHandler(fh);
                SimpleFormatter formatter = new SimpleFormatter();  
                fh.setFormatter(formatter);  
            } catch (SecurityException | IOException e) {  
                e.printStackTrace();  
            }  
        }

        if (parameter().has(PARMITEMWEPSLAT)) {
            latitude = parameter().getDouble(PARMITEMWEPSLAT);
            longitude = parameter().getDouble(PARMITEMWEPSLON);
        } else {
            throw new ServiceException(PARMITEMWEPSLAT+" and/or " +PARMITEMWEPSLON+"not specified");
        }
        
        // in some use case in the Java IF, these vary from the site lat/lon
        if (parameter().has(PARMITEMWEPSWINDLAT)) {
            windlatitude = parameter().getDouble(PARMITEMWEPSWINDLAT);
            windlongitude = parameter().getDouble(PARMITEMWEPSWINDLON);
        } else {
            windlatitude = latitude;
            windlongitude = longitude;
        }
        
        simulationYears = parameter().getInt(PARMITEMWEPSSIMYEARS, 100);
        isNRCS = parameter().getBoolean(PARMITEMISNRCS, false);
        
        
        // use the windgen in this endpoint.
        windgenURL = this.request().getURL().replaceFirst("/m/wepsIntegrated/.*$", "/m/windgen/2.0");
        if (this.request().getScheme().startsWith("https")) {
//            cligenURL = "https://csip.engr.colostate.edu:9083/csip-climate/m/cligen_prism/weps/1.0";  
            // use the standard CSU CSIP cligen prism, since using lat/lon inputs.
            cligenURL = "https://csip.engr.colostate.edu:9083/csip-climate/m/cligen_prism/2.1";            
        } else {
            cligenURL = "http://csip.engr.colostate.edu:8083/csip-climate/m/cligen_prism/2.1";            
        }
        
        // new URL(this.request().getURL()).getHost() is NOT same as this.request().getHost()
        cligenLocal = new URL(cligenURL).getHost().equals(new URL(this.request().getURL()).getHost());
        metainfo().put("wepsIntegrated cligen:", cligenLocal?"local":"remote");

    }
    
    /**
     *
     * @throws Exception
     */
    @Override
    public void doProcess() throws Exception {
        // MEH from csip-weps V5_0
        try {
            setProgress("Calling Cligen/Windgen");
            if (metainfo().getBoolean("wepsWindCliParallel", false)) {
                Parallel.run(
                    () -> {
                      fetchClimate();
                    },
                    () -> {
                      fetchWind();
                    }
                );
            } else {
                setProgress("Calling Cligen");
                fetchClimate();
                setProgress("Calling Windgen");
                fetchWind();
            }
        } catch (Exception e) {
            File lf = new File(workspace().getDir(),"wepsdebug-logfil.txt");
            String msg = "\n";
            if (lf.exists()) {
                FileReader r = new FileReader(lf);
                char cbuf[] = new char[256];
                while (r.ready()) {
                    r.read(cbuf);
                    msg += String.copyValueOf(cbuf);
                }
            }
            throw new ServiceException("WEPS cligen/windgen error :" + e.getMessage() + msg);
        }
        
        if (parameter().has("windGenFileName")) {
            // this parm allows us to make the windgen filename 
            // match what was sent in the runfile being sent to Weps
            File newFile = new File (workspace().getDir(), parameter().getString("windGenFileName"));
            File windFile = new File (workspace().getDir(), "win_gen.win");
            windFile.renameTo(newFile);
        }
        
        setProgress("Calling Weps");
        
        String modelVersion = metainfo().getString("modelVersion", "");
        
        if (modelVersion.contentEquals("dev-version.lin64")) {
            weps = resources().getExe("weps-dev.lin64");
        } else  if (modelVersion.contentEquals("dev-version")) {
            weps = resources().getExe("weps-dev.lin64");
        } else {
            weps = resources().getExe("weps-default.lin64");
        }

        weps.setStdoutHandler(this);

        String parmHydrologyMethod = "-W" + parameter().getInt("hydrologyMethod", 1);
        String parmResurfacing = "-u" + parameter().getInt("resurfacing", 0);
        String parmInitialization = "-I" + parameter().getInt("initialization", 1);
        String parmConfidenceInterval = "-t" + parameter().getInt("confidenceInterval", 0);
        String parmFurrowEffect = "-T" + parameter().getInt("furrowEffect", 0);
        String parmCalibrationMode = parameter().has("calibrationMode") ? "-C" + parameter().getInt("calibrationMode", 0) : "";
        String parmCalibrationCycles = parameter().has("calibrationCycles") ? "-Z" + parameter().getInt("calibrationCycles", 0) : "";

        weps.setArguments(parmHydrologyMethod,
                parmResurfacing,
                parmInitialization,
                parmConfidenceInterval,
                parmFurrowEffect,
                parmCalibrationMode,
                parmCalibrationCycles,
                "-P" + workspace().getDir()
        );

        int result = weps.exec();
        setProgress("Weps Complete");
        if (result != 0) {
            throw new ServiceException("WEPS exit error :" + result);
        }
    }


    @Override
    protected void postProcess() throws Exception {
        
        setProgress("Building result data");
        metainfo().put(PARMOUTUPDATED, SERVICE_UPDATE_DATE);
        metainfo().put(PARMOUTMODELNAME, weps.getName());
        metainfo().put("CSIP_SESSION_TTL", Config.getString(CSIP_SESSION_TTL));

        File[] outFiles = workspace().getDir().listFiles( (File file) -> file.getName().endsWith(".out"));
        ArrayList<File> stdFiles2 = new ArrayList(
            Arrays.asList(workspace().getDir().listFiles( (File file) -> 
                file.getName().endsWith("stdout.txt") || 
                file.getName().endsWith("stderr.txt") ||
                file.getName().endsWith("logfil.txt")))
        );
        if (metainfo().getBoolean("wepsDebugLog", false)) {
            // crude, but it works.
            File sysLogFile = new File (Services.getResultsDir(getSUID()), ModelDataServiceConstants.LOG_FILE);
            FileUtils.copyFileToDirectory(sysLogFile, workspace().getDir());
            sysLogFile = new File (workspace().getDir(), ModelDataServiceConstants.LOG_FILE);
            stdFiles2.add(sysLogFile);
        }
                
        File[] otherRetFiles = workspace().getDir().listFiles( (File file) -> {
            String name = file.getName();
            return (name.endsWith(".cli") || name.endsWith(".par") || name.endsWith(".win") || name.endsWith(".wdb"));
        });
        File[] subRegionFiles = workspace().getDir().listFiles( (File file) -> file.getName().startsWith("subregion"));
        File erodFilesDir = new File(workspace().getDir(),"sae_in_out_files");
        ArrayList<File> zFilesList = new ArrayList();
        
        boolean subRegionResults = metainfo().getBoolean("returnSubRegionResults", false);
        
        boolean zipResults = metainfo().getBoolean("zipResults", false);
        
        if (zipResults) {
            File zFile = new File(workspace().getDir(),"results.zip");
            
            zFilesList.addAll(Arrays.asList(outFiles));
            zFilesList.addAll(Arrays.asList(otherRetFiles));
            File interpZipFile = new File(workspace().getDir(),"interpolate.zip");
            if (interpZipFile.exists()) {
                zFilesList.add(interpZipFile);
            }
            
            ArrayList<File> zFilesListSubReg = new ArrayList();
            ArrayList<File> zFilesListErod = new ArrayList();
            File stdzFile = new File(workspace().getDir(),"logFiles.zip");
            Parallel.run(false,
                () -> {
                    try {
                        if (subRegionResults) {
                            for (File f : subRegionFiles) {
                                File srzFile = new File(workspace().getDir(),f.getName()+".zip");
                                ZipFiles.zip(f,srzFile);
                                zFilesListSubReg.add(srzFile);
                            }
                        }
                    } catch (IOException e) {
                    }
                },
                () -> {
                    try {
                        // These files may be triggered by an option in the .runx file
                        // instead of as a command line option
                        if (erodFilesDir.exists()) {
                            File erzFile = new File(workspace().getDir(),erodFilesDir.getName()+".zip");
                            ZipFiles.zip(erodFilesDir, erzFile);
                            zFilesListErod.add(erzFile);
                        }
                    } catch (IOException e) {
                    }
                },
                () -> {
                    try {
                        if (outFiles.length > 0) {
                            ZipFiles.zip(stdzFile, stdFiles2);
                        }
                    } catch (IOException e) {
                    }
                }
            );

            try {
                zFilesList.addAll(zFilesListSubReg);
                zFilesList.addAll(zFilesListErod);
                zFilesList.add(stdzFile);
                ZipFiles.zip(zFile, zFilesList);
                results().put(zFile);
            } catch (IOException e) {
            }
        } else {
            results().put(outFiles);
            results().put(stdFiles2.toArray(new File[0]));
            results().put(otherRetFiles);
            if (subRegionResults) {
                results().put(subRegionFiles);
            }
            // These files may be triggered by an option in the .runx file
            // instead of as a command line option
            if (erodFilesDir.exists()) {
                results().put(erodFilesDir);
            }
        }
        
        setProgress("Result data ready");

        super.postProcess();
    }


    @Override
    public void handle(String out) {
        String s;
        int start;
        int end;
        
        //Subregion            2 Year          12  of          30
        // or
        //Erosion Year           1  of          30

        if ((start = out.lastIndexOf("Year")) >= 0) {
            start = out.lastIndexOf('\n',start)+1;
            if ((end = out.indexOf('\n', start)) == -1) {
                end = out.length();
            }
            if (out.charAt(end - 1) == '\r') {
                end--;
            }
            s = out.substring(start, end);

            try {
                setProgress(s);
            } catch (ServiceException ex) {
                Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

    private void fetchClimate() throws JSONException, Exception {
        LOG.info("MEH test msg cligen starting");
        int pingValidCnt = 0;
        for (int i=0; i<15; i++) {
            long pingTime = Client.ping(cligenURL, 1000);
            LOG.info("MEH test msg cligen ping:"+pingTime);
            
            pingValidCnt = (pingTime > -1) ? pingValidCnt + 1 : 0;
            if (pingValidCnt >= 5) {
                break;
            }
        }
        if (pingValidCnt < 5) {
            throw new ServiceException("Climate service not available: count=" + pingValidCnt);        
        }
        
        int cliTimeout = metainfo().getInt("cligenTimeout",20);
        if (cliTimeout == 20) {
            // adjust the timeout with the sim length (roughly)
//            cliTimeout = Math.round((simulationYears - 20)*(float).8) + 70;
            cliTimeout = (simulationYears - 20) + 70;
        }
        
        ModelDataServiceCall cliRes = new ModelDataServiceCall();
        
        if (parameter().has(PARMITEMStateId)) {
            cliRes = cliRes
                .put(PARMITEMStateId, parameter().getInt(PARMITEMStateId, -11))
                .put(PARMITEMStationId, parameter().getInt(PARMITEMStationId, -1));
        }

// A manual way to create the input_zone_features JSON
//            .put("input_zone_features", new JSONObject(
//                "{     'type': 'FeatureCollection',"
//                + "        'features': [{"
//                + "          'type': 'Feature',"
//                + "          'properties': {"
//                + "            'name': 'pt one',"
//                + "            'gid': 1"
//                + "          },"
//                + "          'geometry': {"
//                + "            'type': 'Point',"
//                + "            'coordinates': ["
//                + "              " + longitude + ","
//                + "              " + latitude
//                + "            ]"
//                + "          }"
//                + "        }]"
//                + "      }"))

        cliRes = cliRes
            .put("startyear", 1)
            .put("duration", simulationYears)
            .put("outputFile", "cli_gen.cli")
            .put("usePRISM", true)
            .put("input_zone_features", new JSONObject()
                .put("type", "FeatureCollection")
                .put("features", new JSONArray().put(
                    new JSONObject()
                        .put("type", "Feature")
                        .put("properties", new JSONObject()
                            .put("name", "pt one")
                            .put("gid", 1)
                        )
                        .put("geometry", new JSONObject()
                            .put("type", "Point")
                            .put("coordinates", new JSONArray()
                                .put(longitude).put(latitude)
                            )
                        )
                    )
                )
            )
            .url(cligenURL)
            .withDefaultLogger()
            .withTimeout(cliTimeout)
            .withCache(false)
//            .withRetries(4)
//            .withRetryPause(10)
            .putMeta("csip.session.ttl","180")
            .putMeta("archive","true")
//            .call();
            ;
        
        ModelDataServiceCall cliCall = cliRes;
        
        // on infosys server (at least), we frequently
        // get a timeout on the 1st call to cligen.
        // This is an attempt to "fail fast" (at 20 secs)
        // but then to increase the timeout with each retry
        // (in case the extra time is really required)
        // 15 secs is sufficient for a 100 year request to cligen.
        for (int i=0; i<5; i++) {
            LOG.info("MEH test msg cligen call. timeout:"+cliTimeout+" reps:"+i);
            if (i>0) {
                setProgress("Cligen retry:"+i);
            }
            cliRes = cliCall.call();
            if (cliRes.serviceFinished() && !cliRes.serviceFailed()) {
                break;
            }
            cliTimeout += 20 * (i+1);
            cliCall.withTimeout(cliTimeout);
        }
        
        LOG.info("MEH test msg cligen complete");

        if (cliRes.serviceFinished() && !cliRes.serviceFailed()) {
            try {
                List<String>resNames = cliRes.getNames();
                if (cligenLocal) {
                    for (String name : resNames) {
                        String value = cliRes.getString(name);
                        if (value.contains("http")) {
                            File f = workspace().getFile(name);
                            cliRes.download(name, f);
                        }
                    }
                } else {
                    // if cligen not on this server,
                    // use the archive since it is zipped
                    // to lessen download time
                    for (String name : resNames) {
                        if (name.startsWith("archive")) {
                            File f = workspace().getFile(name);
                            cliRes.download(name, f);
                            ZipFiles.unzip(f);
                            Files.delete(f.toPath());
                            break;
                        }
                    }
                }
            } catch (Exception e) {
                throw new ServiceException("Climate service error (download): " + e.getMessage());        
            }
        } else {
            throw new ServiceException("Climate service error: " + cliRes.getError());
        }
    }
    
    private void fetchWind() throws Exception {
        LOG.info("MEH test msg windgen starting");
        ModelDataServiceCall windInput = new ModelDataServiceCall();
        int wban = parameter().getInt(PARMITEM_WBAN, 0);
        if (wban > 0 && wban < 999999) {
            windInput.put(PARMITEM_WBAN, wban);
        } else {
            windInput.put("latitude", windlatitude);
            windInput.put("longitude", windlongitude);
        }

        ModelDataServiceCall windRes = windInput
            .put("duration", simulationYears)
            .put("outputFile", "win_gen.win")
            .put(PARMITEMISNRCS, isNRCS)
            .url(windgenURL)
            .withDefaultLogger()
            .withTimeout(60)
            .withCache(false)
            .withRetries(2)
            .withRetryPause(10)
            .call();
        
        LOG.info("MEH test msg windgen complete");
        
        if (windRes.serviceFinished() && !windRes.serviceFailed()) {
            try {
                 List<String>resNames = windRes.getNames();
//               for (String fname : outFileNames) {
//                    if (s.endsWith("win") || s.endsWith("wdb") || s.endsWith("txt")) {
//                    if (s.endsWith("win") || s.endsWith("wdb") || s.endsWith("txt")) {
//                        windRes.download(s, workspace().getFile(s));
//                    }
//                }            
                for (String name : resNames) {
                    String value = windRes.getString(name);
                    if (value.contains("http")) {
                        File f = workspace().getFile(name);
                        windRes.download(name, f);
                    }
                }
            } catch (Exception e) {
                throw new ServiceException("Wind service error (download): " + e.getMessage()); 
            }
        } else {
            throw new ServiceException("Wind service error: " + windRes.getError());
        }
    }
        
}