V1_0.java [src/java/m/wepsIntegrated] Revision: default Date:
/*
* $Id$
*
* This file is part of the Cloud Services Integration Platform (CSIP),
* a Model-as-a-Service framework, API, and application suite.
*
* 2012-2017, OMSLab, Colorado State University.
*
* OMSLab licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package m.wepsIntegrated;
import csip.Config;
import static csip.Config.CSIP_SESSION_TTL;
import csip.api.server.Executable;
import csip.ModelDataService;
import csip.ModelDataServiceConstants;
import csip.api.server.ServiceException;
import csip.annotations.*;
import static csip.annotations.ResourceType.*;
import static csip.annotations.State.DEVELOPMENT;
import csip.api.client.ModelDataServiceCall;
import csip.utils.Client;
import csip.utils.Parallel;
import csip.utils.Services;
import csip.utils.ZipFiles;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.logging.FileHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import javax.ws.rs.*;
import static m.wepsModelConstants.*;
import org.apache.commons.io.FileUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
/**
* This implements the CSIP WEPS service.
*
* @author od, mh
*/
@Name("WepsIntegrated for ARS")
@Description("WEPS for ARS with integrated cligen & windgen calls")
@VersionInfo("1.0")
@State(DEVELOPMENT)
@Path("m/wepsIntegrated/1.0")
@Polling(first = 2000, next = 1000)
// ARS versions of the binaries
@Resource(file = "/bin/lin-amd64/ARS/default/2023-01-09/weps", id = "weps-default.lin64", type = EXECUTABLE)
@Resource(file = "/bin/lin-amd64/ARS/dev-version/weps", id = "weps-dev.lin64", type = EXECUTABLE)
public class V1_0 extends ModelDataService implements Executable.StdHandler {
public static final String WIND_DB_FILENAME = "windgen.wdb";
public static final String WIND_IDX_FILENAME = "windgen.idx";
public static final String WIND_IDX_NRCS_FILENAME = "windgen_NRCS.idx";
String windgenURL;
String cligenURL;
double latitude;
double longitude;
double windlatitude;
double windlongitude;
int simulationYears;
boolean isNRCS;
Executable weps;
boolean cligenLocal = false;
// Reminder: add new date to json as well
static final String SERVICE_UPDATE_DATE = "05/10/2023";
@Override
public void preProcess() throws Exception {
setProgress("Initializing");
// For Debugging !!!
if (metainfo().getBoolean("wepsDebugLog", false)) {
Logger l = Logger.getLogger("csip-core");
l.setLevel(Level.FINEST);
try {
FileHandler fh;
fh = new FileHandler(new File(workspace().getDir(),"wepsdebug-logfil.txt").getAbsolutePath());
l.addHandler(fh);
SimpleFormatter formatter = new SimpleFormatter();
fh.setFormatter(formatter);
} catch (SecurityException | IOException e) {
e.printStackTrace();
}
}
if (parameter().has(PARMITEMWEPSLAT)) {
latitude = parameter().getDouble(PARMITEMWEPSLAT);
longitude = parameter().getDouble(PARMITEMWEPSLON);
} else {
throw new ServiceException(PARMITEMWEPSLAT+" and/or " +PARMITEMWEPSLON+"not specified");
}
// in some use case in the Java IF, these vary from the site lat/lon
if (parameter().has(PARMITEMWEPSWINDLAT)) {
windlatitude = parameter().getDouble(PARMITEMWEPSWINDLAT);
windlongitude = parameter().getDouble(PARMITEMWEPSWINDLON);
} else {
windlatitude = latitude;
windlongitude = longitude;
}
simulationYears = parameter().getInt(PARMITEMWEPSSIMYEARS, 100);
isNRCS = parameter().getBoolean(PARMITEMISNRCS, false);
// use the windgen in this endpoint.
windgenURL = this.request().getURL().replaceFirst("/m/wepsIntegrated/.*$", "/m/windgen/2.0");
if (this.request().getScheme().startsWith("https")) {
// cligenURL = "https://csip.engr.colostate.edu:9083/csip-climate/m/cligen_prism/weps/1.0";
// use the standard CSU CSIP cligen prism, since using lat/lon inputs.
cligenURL = "https://csip.engr.colostate.edu:9083/csip-climate/m/cligen_prism/2.1";
} else {
cligenURL = "http://csip.engr.colostate.edu:8083/csip-climate/m/cligen_prism/2.1";
}
// new URL(this.request().getURL()).getHost() is NOT same as this.request().getHost()
cligenLocal = new URL(cligenURL).getHost().equals(new URL(this.request().getURL()).getHost());
metainfo().put("wepsIntegrated cligen:", cligenLocal?"local":"remote");
}
/**
*
* @throws Exception
*/
@Override
public void doProcess() throws Exception {
// MEH from csip-weps V5_0
try {
setProgress("Calling Cligen/Windgen");
if (metainfo().getBoolean("wepsWindCliParallel", false)) {
Parallel.run(
() -> {
fetchClimate();
},
() -> {
fetchWind();
}
);
} else {
setProgress("Calling Cligen");
fetchClimate();
setProgress("Calling Windgen");
fetchWind();
}
} catch (Exception e) {
File lf = new File(workspace().getDir(),"wepsdebug-logfil.txt");
String msg = "\n";
if (lf.exists()) {
FileReader r = new FileReader(lf);
char cbuf[] = new char[256];
while (r.ready()) {
r.read(cbuf);
msg += String.copyValueOf(cbuf);
}
}
throw new ServiceException("WEPS cligen/windgen error :" + e.getMessage() + msg);
}
if (parameter().has("windGenFileName")) {
// this parm allows us to make the windgen filename
// match what was sent in the runfile being sent to Weps
File newFile = new File (workspace().getDir(), parameter().getString("windGenFileName"));
File windFile = new File (workspace().getDir(), "win_gen.win");
windFile.renameTo(newFile);
}
setProgress("Calling Weps");
String modelVersion = metainfo().getString("modelVersion", "");
if (modelVersion.contentEquals("dev-version.lin64")) {
weps = resources().getExe("weps-dev.lin64");
} else if (modelVersion.contentEquals("dev-version")) {
weps = resources().getExe("weps-dev.lin64");
} else {
weps = resources().getExe("weps-default.lin64");
}
weps.setStdoutHandler(this);
String parmHydrologyMethod = "-W" + parameter().getInt("hydrologyMethod", 1);
String parmResurfacing = "-u" + parameter().getInt("resurfacing", 0);
String parmInitialization = "-I" + parameter().getInt("initialization", 1);
String parmConfidenceInterval = "-t" + parameter().getInt("confidenceInterval", 0);
String parmFurrowEffect = "-T" + parameter().getInt("furrowEffect", 0);
String parmCalibrationMode = parameter().has("calibrationMode") ? "-C" + parameter().getInt("calibrationMode", 0) : "";
String parmCalibrationCycles = parameter().has("calibrationCycles") ? "-Z" + parameter().getInt("calibrationCycles", 0) : "";
weps.setArguments(parmHydrologyMethod,
parmResurfacing,
parmInitialization,
parmConfidenceInterval,
parmFurrowEffect,
parmCalibrationMode,
parmCalibrationCycles,
"-P" + workspace().getDir()
);
int result = weps.exec();
setProgress("Weps Complete");
if (result != 0) {
throw new ServiceException("WEPS exit error :" + result);
}
}
@Override
protected void postProcess() throws Exception {
setProgress("Building result data");
metainfo().put(PARMOUTUPDATED, SERVICE_UPDATE_DATE);
metainfo().put(PARMOUTMODELNAME, weps.getName());
metainfo().put("CSIP_SESSION_TTL", Config.getString(CSIP_SESSION_TTL));
File[] outFiles = workspace().getDir().listFiles( (File file) -> file.getName().endsWith(".out"));
ArrayList<File> stdFiles2 = new ArrayList(
Arrays.asList(workspace().getDir().listFiles( (File file) ->
file.getName().endsWith("stdout.txt") ||
file.getName().endsWith("stderr.txt") ||
file.getName().endsWith("logfil.txt")))
);
if (metainfo().getBoolean("wepsDebugLog", false)) {
// crude, but it works.
File sysLogFile = new File (Services.getResultsDir(getSUID()), ModelDataServiceConstants.LOG_FILE);
FileUtils.copyFileToDirectory(sysLogFile, workspace().getDir());
sysLogFile = new File (workspace().getDir(), ModelDataServiceConstants.LOG_FILE);
stdFiles2.add(sysLogFile);
}
File[] otherRetFiles = workspace().getDir().listFiles( (File file) -> {
String name = file.getName();
return (name.endsWith(".cli") || name.endsWith(".par") || name.endsWith(".win") || name.endsWith(".wdb"));
});
File[] subRegionFiles = workspace().getDir().listFiles( (File file) -> file.getName().startsWith("subregion"));
File erodFilesDir = new File(workspace().getDir(),"sae_in_out_files");
ArrayList<File> zFilesList = new ArrayList();
boolean subRegionResults = metainfo().getBoolean("returnSubRegionResults", false);
boolean zipResults = metainfo().getBoolean("zipResults", false);
if (zipResults) {
File zFile = new File(workspace().getDir(),"results.zip");
zFilesList.addAll(Arrays.asList(outFiles));
zFilesList.addAll(Arrays.asList(otherRetFiles));
File interpZipFile = new File(workspace().getDir(),"interpolate.zip");
if (interpZipFile.exists()) {
zFilesList.add(interpZipFile);
}
ArrayList<File> zFilesListSubReg = new ArrayList();
ArrayList<File> zFilesListErod = new ArrayList();
File stdzFile = new File(workspace().getDir(),"logFiles.zip");
Parallel.run(false,
() -> {
try {
if (subRegionResults) {
for (File f : subRegionFiles) {
File srzFile = new File(workspace().getDir(),f.getName()+".zip");
ZipFiles.zip(f,srzFile);
zFilesListSubReg.add(srzFile);
}
}
} catch (IOException e) {
}
},
() -> {
try {
// These files may be triggered by an option in the .runx file
// instead of as a command line option
if (erodFilesDir.exists()) {
File erzFile = new File(workspace().getDir(),erodFilesDir.getName()+".zip");
ZipFiles.zip(erodFilesDir, erzFile);
zFilesListErod.add(erzFile);
}
} catch (IOException e) {
}
},
() -> {
try {
if (outFiles.length > 0) {
ZipFiles.zip(stdzFile, stdFiles2);
}
} catch (IOException e) {
}
}
);
try {
zFilesList.addAll(zFilesListSubReg);
zFilesList.addAll(zFilesListErod);
zFilesList.add(stdzFile);
ZipFiles.zip(zFile, zFilesList);
results().put(zFile);
} catch (IOException e) {
}
} else {
results().put(outFiles);
results().put(stdFiles2.toArray(new File[0]));
results().put(otherRetFiles);
if (subRegionResults) {
results().put(subRegionFiles);
}
// These files may be triggered by an option in the .runx file
// instead of as a command line option
if (erodFilesDir.exists()) {
results().put(erodFilesDir);
}
}
setProgress("Result data ready");
super.postProcess();
}
@Override
public void handle(String out) {
String s;
int start;
int end;
//Subregion 2 Year 12 of 30
// or
//Erosion Year 1 of 30
if ((start = out.lastIndexOf("Year")) >= 0) {
start = out.lastIndexOf('\n',start)+1;
if ((end = out.indexOf('\n', start)) == -1) {
end = out.length();
}
if (out.charAt(end - 1) == '\r') {
end--;
}
s = out.substring(start, end);
try {
setProgress(s);
} catch (ServiceException ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, null, ex);
}
}
}
private void fetchClimate() throws JSONException, Exception {
LOG.info("MEH test msg cligen starting");
int pingValidCnt = 0;
for (int i=0; i<15; i++) {
long pingTime = Client.ping(cligenURL, 1000);
LOG.info("MEH test msg cligen ping:"+pingTime);
pingValidCnt = (pingTime > -1) ? pingValidCnt + 1 : 0;
if (pingValidCnt >= 5) {
break;
}
}
if (pingValidCnt < 5) {
throw new ServiceException("Climate service not available: count=" + pingValidCnt);
}
int cliTimeout = metainfo().getInt("cligenTimeout",20);
if (cliTimeout == 20) {
// adjust the timeout with the sim length (roughly)
// cliTimeout = Math.round((simulationYears - 20)*(float).8) + 70;
cliTimeout = (simulationYears - 20) + 70;
}
ModelDataServiceCall cliRes = new ModelDataServiceCall();
if (parameter().has(PARMITEMStateId)) {
cliRes = cliRes
.put(PARMITEMStateId, parameter().getInt(PARMITEMStateId, -11))
.put(PARMITEMStationId, parameter().getInt(PARMITEMStationId, -1));
}
// A manual way to create the input_zone_features JSON
// .put("input_zone_features", new JSONObject(
// "{ 'type': 'FeatureCollection',"
// + " 'features': [{"
// + " 'type': 'Feature',"
// + " 'properties': {"
// + " 'name': 'pt one',"
// + " 'gid': 1"
// + " },"
// + " 'geometry': {"
// + " 'type': 'Point',"
// + " 'coordinates': ["
// + " " + longitude + ","
// + " " + latitude
// + " ]"
// + " }"
// + " }]"
// + " }"))
cliRes = cliRes
.put("startyear", 1)
.put("duration", simulationYears)
.put("outputFile", "cli_gen.cli")
.put("usePRISM", true)
.put("input_zone_features", new JSONObject()
.put("type", "FeatureCollection")
.put("features", new JSONArray().put(
new JSONObject()
.put("type", "Feature")
.put("properties", new JSONObject()
.put("name", "pt one")
.put("gid", 1)
)
.put("geometry", new JSONObject()
.put("type", "Point")
.put("coordinates", new JSONArray()
.put(longitude).put(latitude)
)
)
)
)
)
.url(cligenURL)
.withDefaultLogger()
.withTimeout(cliTimeout)
.withCache(false)
// .withRetries(4)
// .withRetryPause(10)
.putMeta("csip.session.ttl","180")
.putMeta("archive","true")
// .call();
;
ModelDataServiceCall cliCall = cliRes;
// on infosys server (at least), we frequently
// get a timeout on the 1st call to cligen.
// This is an attempt to "fail fast" (at 20 secs)
// but then to increase the timeout with each retry
// (in case the extra time is really required)
// 15 secs is sufficient for a 100 year request to cligen.
for (int i=0; i<5; i++) {
LOG.info("MEH test msg cligen call. timeout:"+cliTimeout+" reps:"+i);
if (i>0) {
setProgress("Cligen retry:"+i);
}
cliRes = cliCall.call();
if (cliRes.serviceFinished() && !cliRes.serviceFailed()) {
break;
}
cliTimeout += 20 * (i+1);
cliCall.withTimeout(cliTimeout);
}
LOG.info("MEH test msg cligen complete");
if (cliRes.serviceFinished() && !cliRes.serviceFailed()) {
try {
List<String>resNames = cliRes.getNames();
if (cligenLocal) {
for (String name : resNames) {
String value = cliRes.getString(name);
if (value.contains("http")) {
File f = workspace().getFile(name);
cliRes.download(name, f);
}
}
} else {
// if cligen not on this server,
// use the archive since it is zipped
// to lessen download time
for (String name : resNames) {
if (name.startsWith("archive")) {
File f = workspace().getFile(name);
cliRes.download(name, f);
ZipFiles.unzip(f);
Files.delete(f.toPath());
break;
}
}
}
} catch (Exception e) {
throw new ServiceException("Climate service error (download): " + e.getMessage());
}
} else {
throw new ServiceException("Climate service error: " + cliRes.getError());
}
}
private void fetchWind() throws Exception {
LOG.info("MEH test msg windgen starting");
ModelDataServiceCall windInput = new ModelDataServiceCall();
int wban = parameter().getInt(PARMITEM_WBAN, 0);
if (wban > 0 && wban < 999999) {
windInput.put(PARMITEM_WBAN, wban);
} else {
windInput.put("latitude", windlatitude);
windInput.put("longitude", windlongitude);
}
ModelDataServiceCall windRes = windInput
.put("duration", simulationYears)
.put("outputFile", "win_gen.win")
.put(PARMITEMISNRCS, isNRCS)
.url(windgenURL)
.withDefaultLogger()
.withTimeout(60)
.withCache(false)
.withRetries(2)
.withRetryPause(10)
.call();
LOG.info("MEH test msg windgen complete");
if (windRes.serviceFinished() && !windRes.serviceFailed()) {
try {
List<String>resNames = windRes.getNames();
// for (String fname : outFileNames) {
// if (s.endsWith("win") || s.endsWith("wdb") || s.endsWith("txt")) {
// if (s.endsWith("win") || s.endsWith("wdb") || s.endsWith("txt")) {
// windRes.download(s, workspace().getFile(s));
// }
// }
for (String name : resNames) {
String value = windRes.getString(name);
if (value.contains("http")) {
File f = workspace().getFile(name);
windRes.download(name, f);
}
}
} catch (Exception e) {
throw new ServiceException("Wind service error (download): " + e.getMessage());
}
} else {
throw new ServiceException("Wind service error: " + windRes.getError());
}
}
}