V1_0_raster.java [src/java/m/weather/PRISM] Revision: default Date:
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package m.weather.PRISM;
import csip.Client;
import csip.ModelDataService;
import static csip.ModelDataService.EXEC_FAILED;
import static csip.ModelDataService.EXEC_OK;
import javax.ws.rs.Path;
import oms3.annotations.*;
import csip.annotations.Resource;
import csip.annotations.Resources;
import static csip.annotations.ResourceType.*;
import csip.utils.JSONUtils;
import csip.utils.Services;
import java.io.*;
import java.text.SimpleDateFormat;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.codehaus.jettison.json.*;
/**
* PRISM extraction
*
* @author od
*/
@Name("PRISM_raster")
@Description("PRISM (http://www.prism.oregonstate.edu/)")
@Path("m/prism_raster/1.0")
@Resources({
@Resource(file="/python/prism.py", type=FILE, id="driver"),
})
public class V1_0_raster extends ModelDataService {
File result_file = null;
int jobcount = 0;
String error_log = "";
@Override
protected void preProcess() throws Exception {
}
@Override
protected String process() throws Exception {
String input_zone_geojson = getStringParam("input_zone_features");
String units = getStringParam("units", "english");
String start_date = getStringParam("start_date", "1980-01-01");
final SimpleDateFormat format;
if (start_date.indexOf("-") > 0) {
format = new SimpleDateFormat("yyyy-MM-dd");
}
else {
format = new SimpleDateFormat("MM/dd/yyyy");
}
String end_date = getStringParam("end_date", format.format(new Date()));
// 1200 seems to be as good as it gets for a 12 year extraction. I'll
// need to do some more testing to see what a good heuristic is.
final int chunk = getIntParam("chunk", 1200);
// Check for clipped PRISM rasters
String subset = getStringParam("subset", "");
JSONObject r = new JSONObject(getRequest().toString());
Map<String, JSONObject> r_pm = JSONUtils.getParameter(r);
LOG.info("initial request: " + r_pm);
// Try splitting into chunks
long days = chunk + 1;
final Date sd = format.parse(start_date);
final Date ed = format.parse(end_date);
long diff = ed.getTime() - sd.getTime();//as given
days = TimeUnit.MILLISECONDS.toDays(diff);
if (chunk <= 0 || days <= chunk) {
File f = getResourceFile("driver");
result_file = new File(getWorkspaceDir(), "results.csv");
LOG.info("Writing results to " + result_file.getPath());
String args[] = {"python", f.getAbsolutePath(), input_zone_geojson,
units, start_date, end_date, subset};
for (String arg: args) {
LOG.info("arguments = " + arg);
}
ProcessBuilder pb = new ProcessBuilder(args);
// Write console to file rather than read it directly because the output
// is too larg1Ge and will cause the process to block.
pb.redirectOutput(result_file);
Process p = pb.start();
BufferedReader bfr_error = new BufferedReader(new InputStreamReader(p.getErrorStream()));
int exitCode = p.waitFor();
LOG.info("Exit Code : "+exitCode);
String line = "";
while ((line=bfr_error.readLine()) != null) {
LOG.info("Error line:"+line);
error_log += line + "\n";
}
return exitCode == 0 ? EXEC_OK : EXEC_FAILED;
} else {
// resumbit the pieces.
jobcount = (int)Math.ceil((float)days / chunk);
Services.runParallel(jobcount, new Services.CallableFactory() {
@Override
public Callable create(final int i) {
return new Callable() {
@Override
public Void call() throws Exception {
// Calculate a start and end date for this chunk
Date this_sd = new Date(sd.getTime() + TimeUnit.DAYS.toMillis(i * chunk));
Date this_ed = new Date(sd.getTime() + TimeUnit.DAYS.toMillis((i+1) * chunk -1));
// Cap calculated end date if it goes beyond requested end date
if (i == jobcount-1) {
this_ed = ed;
}
JSONObject request = new JSONObject(getRequest().toString());
// adjust parameter
Map<String, JSONObject> pm = JSONUtils.getParameter(request);
pm.put("start_date", JSONUtils.data("start_date", format.format(this_sd)));
pm.put("end_date", JSONUtils.data("end_date", format.format(this_ed)));
pm.put("chunk", JSONUtils.data("chunk", -1));
JSONObject newrequest = JSONUtils.newRequest(pm);
LOG.info(i + ": this_sd =" + format.format(this_sd) + ", this_ed = " + format.format(this_ed));
Client cl = new Client();
JSONObject response = cl.doPOST(getRequestURL(), newrequest);
// output
Map<String, JSONObject> res = JSONUtils.getResults(response);
// Store the results
File res_file = new File(getWorkspaceDir(), i + ".csv");
try (Writer writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(res_file), "utf-8"))) {
writer.write(res.get("results").toString());
} catch (IOException ex) {
}
return null;
}
};
}
});
}
return EXEC_OK;
}
@Override
protected void postProcess() throws Exception {
// Check for results file if this was not chunked
if (jobcount == 0) {
byte[] result = Files.readAllBytes(Paths.get(result_file.getPath()));
JSONObject results = new JSONObject(new String(result));
putResult("results", results, "JSON string");
if (error_log != "") {
putResult("error", error_log, "JSON string");
}
}
else {
// Read all the chunks
JSONObject chunks = null;
LOG.info("There are " + jobcount + " jobs");
for (int i=0; i<jobcount; i++) {
File job_file = new File(getWorkspaceDir(), i + ".csv");
byte[] result_bytes = Files.readAllBytes(Paths.get(job_file.getPath()));
String result_str = new String(result_bytes);
JSONObject json_result = new JSONObject(result_str);
JSONObject json_result_value = new JSONObject(JSONUtils.getValue(json_result));
JSONObject job_output = json_result_value.getJSONObject("output");
if (chunks == null) {
chunks = job_output;
}
else {
// output is a map of feature gid to results
Iterator<String> iter = job_output.keys();
while (iter.hasNext()) {
String gid = iter.next();
// output is a json string.
LOG.info("processing json_output gid key = " + gid);
String this_output = job_output.getString(gid);
JSONArray climate_data = new JSONArray(this_output);
// concatenate the climate data for this feature
JSONArray current_climate_data = chunks.getJSONArray(gid);
// Skip header
for (int irow=1; irow<climate_data.length(); irow++) {
current_climate_data.put(climate_data.get(irow));
}
chunks.put(gid, current_climate_data);
}
}
}
JSONObject new_output = new JSONObject();
new_output.put("output", chunks);
putResult("results", new_output, "JSON string");
}
}
}