V1_0_raster.java [src/java/m/weather/PRISM] Revision: 20704bec08576449cbe84e805782089f6dcf7c48  Date: Thu Aug 11 10:19:11 MDT 2016
/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package m.weather.PRISM;

import csip.Client;
import csip.ModelDataService;
import static csip.ModelDataService.EXEC_FAILED;
import static csip.ModelDataService.EXEC_OK;
import javax.ws.rs.Path;
import oms3.annotations.*;

import csip.annotations.Resource;
import csip.annotations.Resources;
import static csip.annotations.ResourceType.*;
import csip.utils.JSONUtils;
import csip.utils.Services;
import java.io.*;
import java.text.SimpleDateFormat;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.codehaus.jettison.json.*;

/**
 * PRISM extraction
 *
 * @author od
 */
@Name("PRISM_raster")
@Description("PRISM (http://www.prism.oregonstate.edu/)")
@Path("m/prism_raster/1.0")
@Resources({
    @Resource(file="/python/prism.py", type=FILE, id="driver"),
})

public class V1_0_raster extends ModelDataService {

    File result_file = null;
    int jobcount = 0;
    String error_log = "";
    
    @Override
    protected void preProcess() throws Exception {
    }

    @Override
    protected String process() throws Exception {
        String input_zone_geojson = getStringParam("input_zone_features");
        String units = getStringParam("units", "english");
        String start_date = getStringParam("start_date", "1980-01-01");

        final SimpleDateFormat format;
        if (start_date.indexOf("-") > 0) {
            format = new SimpleDateFormat("yyyy-MM-dd");  
        }
        else {
            format = new SimpleDateFormat("MM/dd/yyyy");  
        }
        String end_date = getStringParam("end_date", format.format(new Date()));
        // 1200 seems to be as good as it gets for a 12 year extraction. I'll
        //    need to do some more testing to see what a good heuristic is.
        final int chunk = getIntParam("chunk", 1200);
        
        // Check for clipped PRISM rasters
        String subset = getStringParam("subset", "");

        JSONObject r = new JSONObject(getRequest().toString());
        Map<String, JSONObject> r_pm = JSONUtils.getParameter(r);
        LOG.info("initial request: " + r_pm);

        // Try splitting into chunks
        long days = chunk + 1;
        final Date sd = format.parse(start_date);
        final Date ed = format.parse(end_date);
        long diff = ed.getTime() - sd.getTime();//as given
        days = TimeUnit.MILLISECONDS.toDays(diff);
        
        if (chunk <= 0 || days <= chunk) {
            File f = getResourceFile("driver");

            result_file = new File(getWorkspaceDir(), "results.csv");
            LOG.info("Writing results to " + result_file.getPath());

            String args[] = {"python", f.getAbsolutePath(), input_zone_geojson,
                units, start_date, end_date, subset};
            for (String arg: args) {
                LOG.info("arguments = " + arg);
            }

            ProcessBuilder pb = new ProcessBuilder(args);
            // Write console to file rather than read it directly because the output
            //   is too larg1Ge and will cause the process to block.
            pb.redirectOutput(result_file);

            Process p = pb.start();
            BufferedReader bfr_error = new BufferedReader(new InputStreamReader(p.getErrorStream()));

            int exitCode = p.waitFor();
            LOG.info("Exit Code : "+exitCode);

            String line = "";
            while ((line=bfr_error.readLine()) != null) {
                LOG.info("Error line:"+line);
                error_log += line + "\n";
            }
            return exitCode == 0 ? EXEC_OK : EXEC_FAILED;

        } else {
            // resumbit the pieces.
            jobcount = (int)Math.ceil((float)days / chunk);
            Services.runParallel(jobcount, new Services.CallableFactory() {

                @Override
                public Callable create(final int i) {
                    return new Callable() {
                        @Override
                        public Void call() throws Exception {
                            // Calculate a start and end date for this chunk
                            Date this_sd = new Date(sd.getTime() + TimeUnit.DAYS.toMillis(i * chunk));
                            Date this_ed = new Date(sd.getTime() + TimeUnit.DAYS.toMillis((i+1) * chunk -1));
                            // Cap calculated end date if it goes beyond requested end date
                            if (i == jobcount-1) {
                                this_ed = ed;
                            }
                            JSONObject request = new JSONObject(getRequest().toString());

                            // adjust parameter
                            Map<String, JSONObject> pm = JSONUtils.getParameter(request);
                            pm.put("start_date", JSONUtils.data("start_date", format.format(this_sd)));
                            pm.put("end_date", JSONUtils.data("end_date", format.format(this_ed)));
                            pm.put("chunk", JSONUtils.data("chunk", -1));
                            JSONObject newrequest = JSONUtils.newRequest(pm);
                            
                            LOG.info(i + ": this_sd =" + format.format(this_sd) + ", this_ed = " + format.format(this_ed));
                            Client cl = new Client();
                            JSONObject response = cl.doPOST(getRequestURL(), newrequest);
                            // output
                            Map<String, JSONObject> res = JSONUtils.getResults(response);
                            

                            // Store the results
                            File res_file = new File(getWorkspaceDir(), i + ".csv");
                            try (Writer writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(res_file), "utf-8"))) {
                                writer.write(res.get("results").toString());
                            } catch (IOException ex) {
                            }  
                            
                            return null;
                        }
                    };
                }
            });
        }
        return EXEC_OK;
    }

    @Override
    protected void postProcess() throws Exception {
        // Check for results file if this was not chunked
        if (jobcount == 0) {
            byte[] result = Files.readAllBytes(Paths.get(result_file.getPath()));
            JSONObject results = new JSONObject(new String(result));
            putResult("results", results, "JSON string");
            if (error_log != "") {
                putResult("error", error_log, "JSON string");
            }
        }
        else {
            // Read all the chunks
            JSONObject chunks = null;
            LOG.info("There are " + jobcount + " jobs");
            for (int i=0; i<jobcount; i++) {
                File job_file = new File(getWorkspaceDir(), i + ".csv");
                byte[] result_bytes = Files.readAllBytes(Paths.get(job_file.getPath()));
                String result_str = new String(result_bytes);
                JSONObject json_result = new JSONObject(result_str);
                JSONObject json_result_value = new JSONObject(JSONUtils.getValue(json_result));
                JSONObject job_output = json_result_value.getJSONObject("output");
                
                if (chunks == null) {
                    chunks = job_output;
                }
                else {
                    // output is a map of feature gid to results
                    Iterator<String> iter = job_output.keys();
                    while (iter.hasNext()) {
                        String gid = iter.next();
                        // output is a json string.
                        LOG.info("processing json_output gid key = " + gid);
                        String this_output = job_output.getString(gid);
                        JSONArray climate_data = new JSONArray(this_output);
                        // concatenate the climate data for this feature
                        JSONArray current_climate_data = chunks.getJSONArray(gid);
                        // Skip header
                        for (int irow=1; irow<climate_data.length(); irow++) {
                            current_climate_data.put(climate_data.get(irow));
                        }
                        chunks.put(gid, current_climate_data);
                    }
                }
            }
            JSONObject new_output = new JSONObject();
            new_output.put("output", chunks);
            putResult("results", new_output, "JSON string");
        }
    }
}