V2_0.java [src/java/m/crp/assessmenttool] Revision: default Date:
/*
* $Id$
*
* This file is part of the Cloud Services Integration Platform (CSIP),
* a Model-as-a-Service framework, API, and application suite.
*
* 2012-2019, OMSLab, Colorado State University.
*
* OMSLab licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package m.crp.assessmenttool;
import crp.utils.DBResources;
import static crp.utils.DBResources.LOCAL_SQLSERVER;
import crp.utils.DEMSteepness;
import crp.utils.Region;
import crp.utils.ServiceCall;
import crp.utils.ServiceCallData;
import crp.utils.SoilResult;
import crp.utils.WEPP;
import crp.utils.WEPS;
import crp.utils.WEPSFallowRotation;
import crp.utils.WWESoilParams;
import csip.Config;
import csip.ModelDataService;
import csip.PayloadParameter;
import csip.PayloadResults;
import csip.ServiceException;
import csip.annotations.Description;
import csip.annotations.Name;
import csip.annotations.Polling;
import csip.annotations.Resource;
import csip.utils.JSONUtils;
import data.interpretors.SlopeSteepness;
import static data.interpretors.SlopeSteepness.SLOPE_VERSION_4;
import data.interpretors.SlopeSteepness.SlopeData;
import gisobjects.GISObject;
import gisobjects.GISObjectException;
import gisobjects.GISObjectFactory;
import gisobjects.db.GISEngine;
import gisobjects.db.GISEngineFactory;
import static gisobjects.db.GISEngineFactory.createGISEngine;
import gisobjects.vector.GIS_FeatureCollection;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.Path;
import org.apache.commons.io.FileUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import static soils.utils.SoilUtils.calcLightleWeesieSlopeSlopeLength;
import static soils.utils.SoilUtils.isPalouse;
/**
*
* @author <a href="mailto:shaun.case@colostate.edu">Shaun Case</a>
*/
@Name("CRP Assessment")
@Description("This service consumes a JSON request containing a request identifier and "
+ "CRP Offer geometry and returns sheet/rill water and wind erosion rates for a "
+ "fallow management for up to the three most dominant soil components in a CRP "
+ "Offer area, plus a weighted average for each erosion rate.")
@Path("m/crpassessment/2.0")
@Polling(first = 2000, next = 1000)
@Resource(from = DBResources.class)
public class V2_0 extends ModelDataService {
public static final String SCENARIO_ID = "scenario_id";
public static final String SCENARIO_GEOMETRY = "scenario_geometry";
public static final String USE_DEM_SLOPE_SERVICE = "use_dem_slope";
public static final String SHOW_FULL_OUTPUT = "show_full_output";
public static final String SHOW_DEBUG_OUTPUT = "debug";
public static final String COMPARE_SLOPE_METHODS = "compare_slope_methods";
protected static final int SLEEP_TIMER = 500; //milliseconds to sleep while polling async model calls
protected static final int INITIAL_POLL_WAIT_TIMER = 1000; //milliseconds to sleep while polling async model calls
protected static final int GET_CALL_WAIT = 100; // milliseconds to sleep between internal get requests in loop of testing status of all calls at once.
protected JSONObject scenario_geometry;
protected String scenario_id;
protected GISObject aoa_geometry;
protected JSONObject dem_featureCollection;
protected GISEngine gEngine;
protected String weppURI = "http://csip.engr.colostate.edu:8087/csip-wepp/m/wepp/3.1";
protected String wepsURI = "http://csip.engr.colostate.edu:8092/csip-weps/m/weps/5.2";
protected String regionURI = "http://csip.engr.colostate.edu:8092/csip-weps/m/region/2.0";
protected String soilsURI = "http://csip.engr.colostate.edu:8092/csip-soils/d/wwesoilparams/2.1";
protected String demSteepnessURI = "http://csip.engr.colostate.edu:8092/csip-watershed/m/average_slope/4.0";
protected String outputFile;
protected boolean streamOutputFile = false;
protected double regionLength, regionWidth, regionOrientation;
protected ArrayList<SoilResult> topThreeComponents;
protected ArrayList<SoilResult> alternateSlopeTopThreeComponents;
protected double latitude, longitude;
protected JSONObject rotation;
protected double waterErosionRate = 0.0;
protected double windErosionRate = 0.0;
protected double wAvgTfact = 0.0;
protected SlopeSteepness slopes;
protected boolean useDemSlope = true;
protected boolean fullOutput = true;
protected boolean debugOutput = false;
protected boolean compareSlopes = true;
protected String mupolygonkey = "";
protected String slopeVersion = "";
protected ArrayList<AsyncCalls> weppCalls = new ArrayList<>();
protected ArrayList<AsyncCalls> wepsCalls = new ArrayList<>();
protected ArrayList<AsyncCalls> asyncCalls = new ArrayList<>();
protected void init() {
slopeVersion = SLOPE_VERSION_4;
}
@Override
protected void preProcess() throws Exception {
init();
PayloadParameter params = parameter();
soilsURI = Config.getString("crp.soils", soilsURI);
regionURI = Config.getString("crp.region", regionURI);
weppURI = Config.getString("crp.wepp", weppURI);
wepsURI = Config.getString("crp.weps", wepsURI);
demSteepnessURI = Config.getString("crp.dem.steepness", demSteepnessURI);
// Keep in this order.
compareSlopes = params.getBoolean(COMPARE_SLOPE_METHODS, false);
fullOutput = params.getBoolean(SHOW_FULL_OUTPUT, fullOutput);
if (metainfo().hasName(SHOW_DEBUG_OUTPUT)) {
debugOutput = metainfo().getBoolean(SHOW_DEBUG_OUTPUT);
}
debugOutput = (debugOutput | compareSlopes);
if (params.has(SCENARIO_ID)) {
scenario_id = params.getString(SCENARIO_ID);
} else {
throw new ServiceException("A required input 'scenario_id' was missing. Please specify a scenario_id value.");
}
if (params.has(USE_DEM_SLOPE_SERVICE)) {
useDemSlope = params.getBoolean(USE_DEM_SLOPE_SERVICE);
}
if (params.has(SCENARIO_GEOMETRY)) {
scenario_geometry = params.getParamJSON(SCENARIO_GEOMETRY);
try (Connection connection = resources().getJDBC(LOCAL_SQLSERVER);) {
gEngine = GISEngineFactory.createGISEngine(connection);
aoa_geometry = GISObjectFactory.createGISObject(scenario_geometry, gEngine);
if (aoa_geometry.getType() == GISObject.GISObjectType.featurecollection) { //May it also be a GeometryCollection??
// This is okay, just need to step through each feature, and also need
// to submit the feature collection to watershed/m/averageslope (DEM service), making
// sure that each has a field in properties with which to index it.
validateFeatureCollection((GIS_FeatureCollection) aoa_geometry);
dem_featureCollection = scenario_geometry;
} else {
// If this is a polygon/multipolygon object, need to convert
// it into a featurecollection for the averageslope (DEM service)
if ((aoa_geometry.getType() == GISObject.GISObjectType.polygon
|| aoa_geometry.getType() == GISObject.GISObjectType.multipolygon)) {
if (aoa_geometry.getGeometry().getGeometryType().equalsIgnoreCase("point")) {
GISObject tGeometry = aoa_geometry.bufferFromCentroid(10.0); // 10m radius buffer
aoa_geometry = tGeometry;
scenario_geometry = aoa_geometry.toJSON();
scenario_geometry.put(KEY_NAME, SCENARIO_GEOMETRY);
}
dem_featureCollection = convertPolyToFeatureCollection(aoa_geometry);
} else {
throw new ServiceException("The input geometry must be a Polygon, Multipolygon, or FeatureCollection.");
}
}
// Shortcut to getting the centroid of a shape...Built in to the GISObject source for getLat/Lon if shape is not a point.
latitude = aoa_geometry.getLatitude();
longitude = aoa_geometry.getLongitude();
}
} else {
if (params.has("mupolygonkey")) {
mupolygonkey = params.getString("mupolygonkey");
latitude = params.getDouble("latitude");
longitude = params.getDouble("longitude");
} else {
throw new ServiceException("A required input " + SCENARIO_GEOMETRY + " OR 'mupolygonkey' was missing. Please specify a " + SCENARIO_GEOMETRY + " or a mupolygonkey value.");
}
}
}
protected void addNewAsyncCall(ServiceCall service, Object data) {
ParallelCallData pData = new ParallelCallData(this, data);
service.setCallData(pData);
service.setAsync(false);
asyncCalls.add(new AsyncCalls(pData, service));
}
protected static synchronized void getSoilSlopeData(ServiceCallData sData) {
DEMSteepness slopeCall = (DEMSteepness) sData.serviceCall();
SoilResult soilResult = (SoilResult) ((ParallelCallData) sData.data()).callData();
soilResult.slopes = slopeCall.slopes();
}
protected static synchronized void getSoilsData(ServiceCallData sData) throws ServiceException {
WWESoilParams wweCall = (WWESoilParams) sData.serviceCall();
V2_0 callInstance = ((ParallelCallData) sData.data()).instance();
try {
callInstance.topThreeComponents = wweCall.getTopThree();
} catch (JSONException ex) {
//System.err.println("Throwing exception in getSoilsData: " + ex.getMessage());
throw new ServiceException("Cannot get soils call results: " + ex.getMessage(), ex);
}
}
protected static synchronized void getRegionData(ServiceCallData sData) {
Region regionCall = (Region) sData.serviceCall();
V2_0 callInstance = ((ParallelCallData) sData.data()).instance();
callInstance.regionLength = regionCall.getLength();
callInstance.regionWidth = regionCall.getWidth();
callInstance.regionOrientation = regionCall.getOrientation();
}
protected static synchronized void getWEPSData(ServiceCallData sData) {
WEPS wepsCall = (WEPS) sData.serviceCall();
SoilResult soilResult = (SoilResult) ((ParallelCallData) sData.data()).callData();
soilResult.windErosion = wepsCall.windErosion();
}
protected static synchronized void getWEPPData(ServiceCallData sData) throws ServiceException {
if (null != sData) {
WEPP weppCall = (WEPP) sData.serviceCall();
if (null != weppCall) {
SoilResult soilResult = (SoilResult) ((ParallelCallData) sData.data()).callData();
if (null != soilResult) {
soilResult.waterErosion = weppCall.waterErosion();
} else {
throw new ServiceException("soilResult was null, cannot continue with WEPP data analysis");
}
} else {
throw new ServiceException("weppCall was null, cannot continue with WEPP data analysis");
}
} else {
throw new ServiceException("sData was null, cannot continue with WEPP data analysis");
}
}
@Override
protected void doProcess() throws Exception {
// Build queue of 5 service calls that can be all simultaneiously called.
// Get all soils information for this AoI.
if (!mupolygonkey.isEmpty()) {
addNewAsyncCall(new WWESoilParams(metainfo().toString(), mupolygonkey, soilsURI, V2_0::getSoilsData), null);
} else {
addNewAsyncCall(new WWESoilParams(metainfo().toString(), scenario_geometry, soilsURI, V2_0::getSoilsData), null);
}
// Get region data for this AoI
//addNewAsyncCall(new Region(getMetainfo(), scenario_geometry, regionURI, V2_0::getRegionData), null);
regionOrientation = 0.0;
// We must wait for WWESoilPArams to finish before we can continue so that we
// get all the cokeys we need and fill in the top three list.
// Wait for above services to return results, and clear list of pending calls when done.
waitForParallelCalls(asyncCalls, true);
topThreeComponents = validateTopThree();
// Now get information for each of the top three soils chosen.
if (useDemSlope || compareSlopes) {
for (SoilResult soilResult : topThreeComponents) {
addNewAsyncCall(new DEMSteepness(metainfo().toString(), soilResult.polygonList, demSteepnessURI, V2_0::getSoilSlopeData, slopeVersion), soilResult);
}
}
// We must wait for all WEPP/WEPSSoilInputs to finish before we can continue so that we
// get all the IFC and SOL file data that we need to run the models.
// Wait for above WEPS/WEPP SoilInput services to return results, and clear list of pending calls when done.
if (asyncCalls.size() > 0) {
waitForParallelCalls(asyncCalls, true);
}
// Get DEM slope information, if requested.
if (useDemSlope || compareSlopes) {
getDEMSlopes(topThreeComponents);
}
// Copy these soilResults to new instances for use in comparison model runs.
// No need to re-run WEPPSoilInputs on these since we take the slope steepness
// value from the soilResult structure not the IFC file.
if (compareSlopes) {
alternateSlopeTopThreeComponents = new ArrayList<>();
for (SoilResult soilResult : topThreeComponents) {
SoilResult tSoilResult = new SoilResult(soilResult);
alternateSlopeTopThreeComponents.add(tSoilResult);
}
}
//TODO: Currently rotations are the default fallow operation...final version of this service requires user to provide rotation.
double totalArea = 0.0;
for (SoilResult soilResult : topThreeComponents) {
// Set which soil slope to use before creating the service.
soilResult.usedDEM = useDemSlope;
double length = ((!useDemSlope) ? soilResult.length : calcLightleWeesieSlopeSlopeLength((soilResult.slopeDEM * 100.0), isPalouse(soilResult.areaSymbol)));
addNewAsyncCall(new WEPP(metainfo().toString(), latitude, longitude, soilResult.cokey,
length, ((!useDemSlope) ? soilResult.slope_r : (soilResult.slopeDEM * 100.0)),
getRotation((int) length),
weppURI, V2_0::getWEPPData), soilResult);
addNewAsyncCall(new WEPS(metainfo().toString(), latitude, longitude, 200,
200, regionOrientation, soilResult.cokey, getRotation(0), wepsURI, V2_0::getWEPSData), soilResult);
totalArea += soilResult.area;
wAvgTfact += soilResult.area * soilResult.tfact;
}
if (totalArea > 0.0) {
wAvgTfact /= totalArea;
} else {
wAvgTfact = Double.NaN;
}
if (compareSlopes) {
for (SoilResult soilResult : alternateSlopeTopThreeComponents) {
// Set which soil slope to use before creating the service.
soilResult.usedDEM = !useDemSlope;
double length = ((!useDemSlope) ? soilResult.length : calcLightleWeesieSlopeSlopeLength((soilResult.slopeDEM * 100.0), isPalouse(soilResult.areaSymbol)));
addNewAsyncCall(new WEPP(metainfo().toString(), latitude, longitude, soilResult.cokey,
length, ((!useDemSlope) ? soilResult.slope_r : (soilResult.slopeDEM * 100.0)),
getRotation((int) length),
weppURI, V2_0::getWEPPData), soilResult);
}
}
if (!asyncCalls.isEmpty()) {
// Wait for the simultaneous calls to all finish and set their appropriate data, may not clear the list when done. We'll use this list in writing output data when in debug mode, only.
waitForParallelCalls(asyncCalls, !fullOutput);
} else {
throw new ServiceException("No WEPP/WEPS model runs were created. Cannot proceed.");
}
for (SoilResult soilResult : topThreeComponents) {
if (Double.isNaN(soilResult.waterErosion) || Double.isNaN(soilResult.windErosion)) {
throw new ServiceException("Not a number value encountered while trying to average wind or water erosion values.");
}
waterErosionRate += soilResult.waterErosion * (soilResult.area / totalArea);
windErosionRate += soilResult.windErosion * (soilResult.area / totalArea);
}
}
public static void main(String[] args) {
DecimalFormat t_df = new DecimalFormat("#.#");
System.out.println(t_df.format(Double.NaN));
}
@Override
protected void postProcess() throws Exception {
DecimalFormat df = new DecimalFormat("#.##");
DecimalFormat t_df = new DecimalFormat("#.#");
PayloadResults results = results();
JSONArray soilsResult = new JSONArray();
for (SoilResult soilResult : topThreeComponents) {
JSONArray soilData = new JSONArray();
if (fullOutput) {
soilData.put(JSONUtils.data("mukey", soilResult.mukey));
soilData.put(JSONUtils.data("musym", soilResult.musym));
soilData.put(JSONUtils.data("muname", soilResult.muname));
soilData.put(JSONUtils.data("cokey", soilResult.cokey));
soilData.put(JSONUtils.data("soilName", soilResult.soilName));
soilData.put(JSONUtils.data("slope_method", (!soilResult.usedDEM ? "SDM slope_r value" : "DEM Slope Service")));
soilData.put(JSONUtils.data("slope_r", soilResult.slope_r));
soilData.put(JSONUtils.data("length", soilResult.length));
soilData.put(JSONUtils.data("dem_slope", ((!Double.isNaN(soilResult.slopeDEM)) ? soilResult.slopeDEM * 100.0 : "NONE")));
soilData.put(JSONUtils.data("dem_slope_length", ((!soilResult.usedDEM) ? "NONE" : calcLightleWeesieSlopeSlopeLength((soilResult.slopeDEM * 100.0), isPalouse(soilResult.areaSymbol)))));
soilData.put(JSONUtils.data("soilName", soilResult.soilName));
soilData.put(JSONUtils.data("soilLongName", soilResult.soilLongName));
soilData.put(JSONUtils.data("tfact", Double.parseDouble(t_df.format(soilResult.tfact))));
if (debugOutput) {
JSONArray polygons = new JSONArray();
for (JSONObject polygon : soilResult.polygonList) {
polygons.put(polygon);
}
soilData.put(JSONUtils.data("intersected_polygons", polygons));
}
}
soilData.put(JSONUtils.data("soilPtr", soilResult.cokey));
soilData.put(JSONUtils.data("soilName", soilResult.soilName));
soilData.put(JSONUtils.data("area_pct", soilResult.area_pct)); //, "The percentage of the component of the mapunit intersection with the CRP Offer Area", "Percent"));
soilData.put(JSONUtils.data("area", soilResult.area));//, "Soil Component Area (Acres) in the CRP Offer area", "Acres"));
soilData.put(JSONUtils.data("WaterSoilLoss", Double.parseDouble(df.format(soilResult.waterErosion))));//, "Average Annual Soil Loss by Water", "ton/ac/yr"));
soilData.put(JSONUtils.data("WindSoilLoss", Double.parseDouble(df.format(soilResult.windErosion))));//, "Average Annual Soil Loss by Wind", "ton/ac/yr"));
soilsResult.put(soilData);
}
results.put("top_3_soils", soilsResult);
results.put("WtAvgWaterSoilLoss", Double.parseDouble(df.format(waterErosionRate)));//, "Weighted Average Annual Soil Loss by Water", "ton/ac/yr");
results.put("WtAvgWindSoilLoss", Double.parseDouble(df.format(windErosionRate)));//, "Weighted Average Annual Soil Loss by Wind", "ton/ac/yr");
results.put("WtAvgTotalSoilLoss", Double.parseDouble(df.format(waterErosionRate + windErosionRate)));// , "Weighted Average Annual Soil Loss by Water and Wind", "ton/ac/yr");
results.put("WtAvgTfact", Double.parseDouble(t_df.format(wAvgTfact)));//, "Weighted Average tfact among the top three (or less) soils chosen.");
if (debugOutput) {
JSONArray soilsDebugResults = new JSONArray();
JSONArray subCalls = new JSONArray();
for (AsyncCalls aCall : asyncCalls) {
JSONArray allModelData = new JSONArray();
SoilResult soilResult = (SoilResult) aCall.callData().callData();
switch (aCall.service().getClass().getName()) {
case "crp.utils.WEPP":
JSONObject cResults = aCall.service().getResults();
String suid = cResults.getJSONObject("metainfo").getString(KEY_SUUID);
FileUtils.writeStringToFile(new File(workspace().getDir(), "WEPP-" + suid + "_" + soilResult.mukey + "-" + soilResult.cokey + ".json"), cResults.toString());
// allModelData.put(JSONUtils.data("mukey", soilResult.mukey));
// allModelData.put(JSONUtils.data("cokey", soilResult.cokey));
// allModelData.put(JSONUtils.data("soilName", soilResult.soilName));
// allModelData.put(JSONUtils.data("slope_method", (!soilResult.usedDEM ? "SDM slope_r value" : "DEM Slope Service")));
// allModelData.put(JSONUtils.data("slope_r", soilResult.slope_r));
// allModelData.put(JSONUtils.data("dem_slope", ((!Double.isNaN(soilResult.slopeDEM)) ? soilResult.slopeDEM * 100.0 : "NONE")));
// allModelData.put(JSONUtils.data("length", soilResult.length));
// allModelData.put(JSONUtils.data("soilName", soilResult.soilName));
// allModelData.put(JSONUtils.data("soilLongName", soilResult.soilLongName));
// allModelData.put(JSONUtils.data("area_pct", soilResult.area_pct, "The percentage of the component of the mapunit intersection with the CRP Offer Area", "Percent"));
// allModelData.put(JSONUtils.data("area", soilResult.area, "Soil Component Area (Acres) in the CRP Offer area", "Acres"));
// allModelData.put(JSONUtils.data("WaterSoilLoss", EvalResult.writeDouble(soilResult.waterErosion), "Average Annual Soil Loss by Water", "ton/ac/yr"));
// allModelData.put(JSONUtils.data("WindSoilLoss", EvalResult.writeDouble(soilResult.windErosion), "Average Annual Soil Loss by Wind", "ton/ac/yr"));
// allModelData.put(JSONUtils.data("wepp_request", aCall.service().requestJSON(), "Request JSON sent to the WEPP model"));
// allModelData.put(JSONUtils.data("wepp_result", aCall.service().getResults(), "Result JSON returned from the WEPP model"));
// subCalls.put(allModelData);
break;
}
}
// soilsDebugResults.put(JSONUtils.data("WEPP_CALLS", subCalls));
//
// subCalls = new JSONArray();
for (AsyncCalls aCall : asyncCalls) {
JSONArray allModelData = new JSONArray();
SoilResult soilResult = (SoilResult) aCall.callData().callData();
switch (aCall.service().getClass().getName()) {
case "crp.utils.WEPS":
JSONObject cResults = aCall.service().getResults();
String suid = cResults.getJSONObject("metainfo").getString(KEY_SUUID);
FileUtils.writeStringToFile(new File(workspace().getDir(), "WEPS-" + suid + "_" + soilResult.mukey + "-" + soilResult.cokey + ".json"), cResults.toString());
// allModelData.put(JSONUtils.data("mukey", soilResult.mukey));
// allModelData.put(JSONUtils.data("cokey", soilResult.cokey));
// allModelData.put(JSONUtils.data("soilName", soilResult.soilName));
// allModelData.put(JSONUtils.data("slope_method", (!soilResult.usedDEM ? "SDM slope_r value" : "DEM Slope Service")));
// allModelData.put(JSONUtils.data("slope_r", soilResult.slope_r));
// allModelData.put(JSONUtils.data("dem_slope", ((!Double.isNaN(soilResult.slopeDEM)) ? soilResult.slopeDEM * 100.0 : "NONE")));
// allModelData.put(JSONUtils.data("length", soilResult.length));
// allModelData.put(JSONUtils.data("soilName", soilResult.soilName));
// allModelData.put(JSONUtils.data("soilLongName", soilResult.soilLongName));
// allModelData.put(JSONUtils.data("area_pct", soilResult.area_pct, "The percentage of the component of the mapunit intersection with the CRP Offer Area", "Percent"));
// allModelData.put(JSONUtils.data("area", soilResult.area, "Soil Component Area (Acres) in the CRP Offer area", "Acres"));
// allModelData.put(JSONUtils.data("WaterSoilLoss", EvalResult.writeDouble(soilResult.waterErosion), "Average Annual Soil Loss by Water", "ton/ac/yr"));
// allModelData.put(JSONUtils.data("WindSoilLoss", EvalResult.writeDouble(soilResult.windErosion), "Average Annual Soil Loss by Wind", "ton/ac/yr"));
// allModelData.put(JSONUtils.data("weps_request", aCall.service().requestJSON(), "Request JSON sent to the WEPS model"));
// allModelData.put(JSONUtils.data("weps_result", aCall.service().getResults(), "Result JSON returned from the WEPS model"));
// subCalls.put(allModelData);
break;
}
}
// soilsDebugResults.put(JSONUtils.data("WEPS_CALLS", subCalls));
//
// results.put("DEBUGGING_DATA", soilsDebugResults);
results.put(workspace().getDir());
}
}
protected void waitForParallelCalls(ArrayList<AsyncCalls> asyncCalls, boolean clearCalls) throws ServiceException {
if (!asyncCalls.isEmpty()) {
List<Future<String>> taskList = new ArrayList<>();
//ArrayList<ServiceCall> tasks = new ArrayList<>();
ExecutorService executor = Executors.newCachedThreadPool();
// for (AsyncCalls asyncCall : asyncCalls) {
// tasks.add(asyncCall.service());
// }
try {
// Using this per task method instead of invokeAll as testing with invokeAll()
// seems to overload the web server front end of this same context, under some circumstances.
// A 150ms wait between them is not much of a loss in time, but can prevent
// unfair loading of the services frontends, and allows other clients to interleave their calls here too.
for (AsyncCalls asyncCall : asyncCalls) {
taskList.add(executor.submit(asyncCall.service()));
Thread.sleep(150);
}
//taskList = executor.invokeAll(tasks);
} catch (Exception ex) {
for (int i = 0; i < taskList.size(); i++) {
Future<String> task = taskList.get(i);
if (!task.isDone()) {
task.cancel(true); // Attempt to force a stop of this task, may fail.
// try {
// // Force a cancel operation on the running service call, csip-core
// // allows for this, take advantage of it. This should also cause the running thread to finish.
// asyncCalls.get(i).service().cancel();
// } catch (ServiceException e) {
// Logger.getLogger(V1_0.class.getName()).log(Level.SEVERE, "Couldn't cancel this service call: " + e.getMessage(), e);
// }
}
}
// Not using async calls, so cannot take advantage of csip-core cancel operation, must just force stop on running threads of sync calls.
// Cleanup any previously canceled tasks that did not stop.
executor.shutdownNow();
throw new ServiceException("Forced to cancel pending service requests due to a timeout.", ex);
}
int count = 0;
for (Future<String> task : taskList) {
try {
// Effectively the same as wait or wait for CountDownLatch, as this pauses
// this thread until the task is "Done" and the return value is available.
// We are not concerned about doing anything with these service call threads'
// data here because each thread had a CallBack function to call in order to
// have its data used by this class already. Here we are just waiting for
// them to all finish and then cleaning up the pool.
if (task.get().equalsIgnoreCase("finished")) {
count++;
}
} catch (InterruptedException | ExecutionException ex) {
// Any interrupt at this point means this service needs to fail.
executor.shutdownNow(); //don't wait, force all existing, outstanding calls, if any, to stop.
throw new ServiceException("Cannot continue with the CRP Assessment. A task get() function for an external service call was interrupted: " + ex.getMessage(), ex);
}
}
// Clean up the list if desired. (Some calling code reuses this list, so this is an option here.
if (clearCalls) {
asyncCalls.clear();
}
// If we didn't get all of our parallel calls completed, then we've failed.
if (count != taskList.size()) {
// Should be all done if we got here, but if we're in this block, then
// something is still pending and hung for some reason. ShutdownNow will force these to stop.
// A plain "shutdown()" won't work here as it allows a thread to finish and we want to force a stop.
executor.shutdownNow();
throw new ServiceException("Not all calls finished, some have failed. Check other logs for error messages.");
}
} else {
throw new ServiceException("No asynchronous calls were created. Cannot proceed with wait.");
}
}
protected void getDEMSlopes(ArrayList<SoilResult> soils) throws ServiceException, SQLException, GISObjectException, JSONException, IOException {
try (Connection connection = resources().getJDBC(LOCAL_SQLSERVER);) {
gEngine = GISEngineFactory.createGISEngine(connection);
for (SoilResult soilResult : soils) {
double tSlope = Double.NaN;
//slopes = getDEMSteepness(soilResult.polygonList);
slopes = soilResult.slopes;
if (null != slopes) {
if (!slopes.badSlopeData() && slopes.slopeDataMessages().isEmpty()) {
double wAvgSlope = 0.0;
double totalAcres = 0.0;
//Get overall average slope for these intersected polygons from the result file.
for (int i = 0; i < slopes.numSlopes(); i++) {
SlopeData tSlopeData = slopes.slope(i + 1);
if (null != tSlopeData) {
double tAvg = tSlopeData.mean();
double tAcres = 0.0;
JSONObject polyJSON = soilResult.polygonList.get(i);
GISObject tVector = GISObjectFactory.createGISObject(polyJSON, gEngine);
tAcres = tVector.areaInAcres();
wAvgSlope += tAcres * tAvg;
totalAcres += tAcres;
}
}
if (0.0 != totalAcres) {
tSlope = (wAvgSlope / totalAcres);
if (tSlope <= 0.0) {
tSlope = 0.01;
}
} else {
tSlope = 0.0;
Logger.getLogger(ServiceCall.class.getName()).log(Level.WARNING, "Total acres for this cokey's," + soilResult.cokey + " mapunit intersections was zero. Setting DEM slope to 0 .");
}
} else {
throw new ServiceException("The DEM returned slopes had bad data in the returned file. " + ((!slopes.slopeDataMessages().isEmpty()) ? " Bad data message: " + slopes.slopeDataMessages() : ""));
}
} else {
throw new ServiceException("No data was returned from the DEM slope servcie for this cokey, " + soilResult.cokey + " and intersected mapunit polygons");
}
if (Double.isNaN(tSlope)) {
throw new ServiceException("DEM slope calculation requested, but no DEM slope could be found for this soil: " + soilResult.cokey);
}
soilResult.slopeDEM = tSlope;
}
}
}
//TODO finish when Jack is ready.
protected JSONObject getRotation(int length) throws JSONException {
return new WEPSFallowRotation(length).rotation();
}
protected SlopeSteepness getDEMSteepness(ArrayList<JSONObject> intersectedPolygons) throws ServiceException {
if (!demSteepnessURI.equals("NONE")) {
DEMSteepness slopeCall = new DEMSteepness(metainfo().toString(), intersectedPolygons, demSteepnessURI, slopeVersion);
slopeCall.call();
return slopeCall.slopes();
}
return null;
}
protected void validateFeatureCollection(GIS_FeatureCollection featureCollection) throws ServiceException {
int featureCount = featureCollection.getFeatureCount();
if (0 < featureCount) {
String id;
try {
for (int i = 0; i < featureCount; i++) {
if (null != (id = featureCollection.getFeatureAttribute(i, "id"))) {
if (id.isEmpty()) {
throw new ServiceException("Feature " + (i + 1) + " in the input JSON FeatureCollection does not have a property value for 'id'. Cannot proceed without this value.");
}
} else {
throw new ServiceException("Feature " + (i + 1) + " in the input JSON FeatureCollection does not have a property 'id'. Cannot proceed without it.");
}
}
} catch (GISObjectException ex) {
throw new ServiceException("Could not find the 'id' attribute within the first feature in this input JSON FeatureCollection. Cannot proceed: " + ex.getMessage(), ex);
}
} else {
throw new ServiceException("No features were found in the input JSON FeatureCollection.");
}
}
protected JSONObject convertPolyToFeatureCollection(GISObject polygon_object) throws ServiceException, IOException, JSONException {
return new JSONObject(
" {\n"
+ " \"name\": \"boundary\",\n"
+ " \"value\": [{\n"
+ " \"type\": \"FeatureCollection\",\n"
+ " \"features\": [\n"
+ " {\n"
+ " \"id\": \"1\",\n"
+ " \"type\": \"Feature\",\n"
+ " \"properties\": {},\n"
+ " \"geometry\":" + polygon_object.toJSON()
+ " }\n"
+ " ]\n"
+ " }]"
+ "}\n"
);
}
protected ArrayList<JSONObject> filterPolygons(ArrayList<JSONObject> polygonList) throws SQLException, ServiceException, GISObjectException, JSONException, IOException {
ArrayList<JSONObject> ret_val = new ArrayList<>();
try (Connection gisDb = resources().getJDBC(LOCAL_SQLSERVER);
GISEngine tEngine = createGISEngine(gisDb);) {
for (JSONObject poly : polygonList) {
GISObject shape = GISObjectFactory.createGISObject(poly, tEngine);
if (shape.areaInAcres() > 0.1) {
ret_val.add(shape.toJSON());
// } else {
// GISObject nShape = shape.bufferFromCentroid(100);
// if (null != nShape) {
// ret_val.add(nShape.toJSON());
// }
}
}
}
return ret_val;
}
protected ArrayList<SoilResult> validateTopThree() throws ServiceException {
ArrayList<SoilResult> nTopThree = new ArrayList<>();
if (topThreeComponents.isEmpty()) {
throw new ServiceException("There were no valid soil intersection areas returned for this input geometry. Please check your location again and be sure it is not mostly overlapping a water feature or does not have valid soil data in SDM.");
}
for (SoilResult soilResult : topThreeComponents) {
ArrayList<JSONObject> polygons;
try {
polygons = filterPolygons(soilResult.polygonList);
if (polygons.size() > 0) {
soilResult.setPolygons(polygons);
nTopThree.add(soilResult);
}
} catch (SQLException | GISObjectException | JSONException | IOException ex) {
throw new ServiceException("Cannot proceed with analysis of this AoI. Failed trying to validate returned soils intersection polygons for size. " + ex.getMessage(), ex);
}
}
if (nTopThree.size() <= 0) {
throw new ServiceException("Cannot proceed with analysis of this AoI. All of the resulting soils intersection polygons are too small in size. ");
}
return nTopThree;
}
// public class ASYNCCall {
//
// public SoilResult soilResult;
// public ServiceCall serviceCall;
// public boolean callFinished = false;
//
// public ASYNCCall(SoilResult _soilResult, ServiceCall _serviceCall) {
// soilResult = _soilResult;
// serviceCall = _serviceCall;
// }
// }
public class ParallelCallData {
private V2_0 instance = null;
private Object callData = null;
private CountDownLatch latch;
public ParallelCallData(V2_0 instance, Object data) {
this.instance = instance;
callData = data;
}
public V2_0 instance() {
return instance;
}
public Object callData() {
return callData;
}
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
}
public class AsyncCalls {
private FutureTask<String> task;
private ParallelCallData callData;
private ServiceCall service;
public AsyncCalls(ParallelCallData callData, ServiceCall service) {
this.task = new FutureTask<>(service);
this.callData = callData;
this.service = service;
}
public FutureTask<String> task() {
return task;
}
public ParallelCallData callData() {
return callData;
}
public ServiceCall service() {
return service;
}
}
}