V1_0.java [src/java/m/ewsf/dk] Revision: default Date:
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package m.ewsf.dk;
import csip.utils.*;
import csip.Config;
import csip.api.server.*;
import csip.ModelDataService;
import csip.annotations.*;
import static csip.annotations.ResourceType.*;
import csip.utils.JSONUtils;
import csip.utils.Services;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URI;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import javax.ws.rs.Path;
import javax.ws.rs.core.UriBuilder;
import oms3.io.CSTable;
import oms3.io.DataIO;
import oms3.io.MemoryTable;
import oms3.util.Dates;
import org.apache.commons.io.FileUtils;
import org.codehaus.jettison.json.JSONObject;
/**
* DK. service implementation. (dk:D.Garen)......'
*
* @author od
*/
@Name("dk")
@Description("Detrended Kriging")
@Path("m/dk/1.0")
@Polling(first = 1000, next = 2000)
// resource definition for the mod.exe executable
@Resource(type = EXECUTABLE, file = "/bin/${csip.arch}/dk.exe", args = "-k dk.config", id = "dk")
// list all the files (wildcards alowed) that should be captured as output)
@Resource(type = OUTPUT, file = "*stdout.txt *stderr.txt")
public class V1_0 extends ModelDataService {
File elevation_grid_file; // in
File input_data_file; // in
File zone_grid_file; // in
//
String output_file; // out
String zone_output_file; //out
// param
int regression_method = 2;
int type_of_data = 1;
long dk_days = -1;
// # files as chunks.
int chunks;
String total_time;
String stats[];
@Override
protected void preProcess() throws Exception {
dk_days = parameter().getInt("dk_days", -1); // default do it alltogether.
elevation_grid_file = attachments().getFile(parameter().getString("elevation-grid-file"));
input_data_file = attachments().getFile(parameter().getString("input-data-file"));
zone_grid_file = attachments().getFile(parameter().getString("zone-grid-file"));
// output file names.
output_file = parameter().getString("output-file");
zone_output_file = parameter().getString("zone-output-file");
regression_method = parameter().getInt("regression-method");
type_of_data = parameter().getInt("type-of-data");
}
@Override
protected void doProcess() throws Exception {
CSTable t = DataIO.table(input_data_file);
SimpleDateFormat df = DataIO.lookupDateFormat(t, 1);
String start = t.getInfo().get(DataIO.DATE_START);
String end = t.getInfo().get(DataIO.DATE_END);
long diffDays = Dates.diffDayPeriods(toCalendar(df.parse(start)), toCalendar(df.parse(end)));
LOG.info("Diff days: " + diffDays);
// check if slicing makes sense.
if (diffDays <= dk_days) {
dk_days = -1; //
}
// the period is small enough, we can run it here
if (dk_days == -1) {
String conf = createConfig();
FileUtils.writeStringToFile(new File(workspace().getDir(), "dk.config"), conf);
LOG.info(conf);
// get the resource by its 'id' : dk
Executable e = resources().getExe("dk");
int ret = e.exec();
if (ret == 0) {
return;
}
throw new ServiceException(FileUtils.readFileToString(e.stderr()));
} else {
// resubmit the pieces.
chunks = sliceTable(input_data_file, (int) dk_days);
LOG.info("slice table into " + chunks + " of " + dk_days);
List<Long> cpu_time = Collections.synchronizedList(new ArrayList<>(chunks));
String internalURL = toPeerURI(new URI(request().getURL())).toString();
long timestart = System.currentTimeMillis();
Services.runParallel(chunks, new Services.CallableFactory() {
@Override
public Callable create(final int i) {
// create callable
return new Callable() {
@Override
public Void call() throws Exception {
long timestart_i = System.currentTimeMillis();
JSONObject request = new JSONObject(request().getRequest().toString());
LOG.info("call #" + i + " of " + chunks);
// adjust parameter
Map<String, JSONObject> pm = JSONUtils.getParameter(request);
pm.put("input-data-file", JSONUtils.data("input-data-file", input_data_file.getName() + "." + i));
pm.put("dk_days", JSONUtils.data("dk_days", -1));
JSONObject newrequest = JSONUtils.newRequest(pm);
Client cl = new Client();
JSONObject response = cl.doPOST(internalURL, newrequest,
new File[]{
new File(input_data_file + "." + i),
elevation_grid_file,
zone_grid_file
});
// output
String error = JSONUtils.getErrorStatus(response);
if (error != null) {
throw new Exception("Error in run #" + i + ": " + error);
}
LOG.info("done #" + i + " of " + chunks);
Map<String, JSONObject> res = JSONUtils.getResults(response);
File of = new File(workspace().getDir(), output_file + "-" + i);
File zof = new File(workspace().getDir(), zone_output_file + "-" + i);
cl.doGET(JSONUtils.getStringParam(res, output_file, ""), of);
cl.doGET(JSONUtils.getStringParam(res, zone_output_file, ""), zof);
long timeend_i = System.currentTimeMillis();
cpu_time.add(timeend_i - timestart_i);
return null;
}
};
}
});
long timeend = System.currentTimeMillis();
total_time = Duration.ofMillis(timeend - timestart).toString();
stats = stats(cpu_time);
}
}
@Override
protected void postProcess() throws Exception {
if (chunks > 0) {
// assemble the files back again only if we have to reduce.
combineFiles(new File(workspace().getDir(), output_file), chunks);
combineTables(new File(workspace().getDir(), zone_output_file), chunks);
results().put("chunks", chunks);
results().put("real_time", total_time);
results().put("min_time", stats[0]);
results().put("max_time", stats[1]);
results().put("avg_time", stats[2]);
results().put("total_time", stats[3]);
}
results().put(new File(workspace().getDir(), output_file), "output file");
results().put(new File(workspace().getDir(), zone_output_file), "zone output file");
}
/////////////
/**
* allow rewriting peer URL to avoid leaving AWS for example.
*
* @param u
* @return
*/
public static URI toPeerURI(URI u) {
UriBuilder b = UriBuilder.fromUri(u);
String s = Config.getString("ewsf.dk.scheme");
if (s != null) {
b = b.scheme(s);
}
s = Config.getString("ewsf.dk.host");
if (s != null) {
b = b.host(s);
}
s = Config.getString("ewsf.dk.port");
if (s != null) {
b = b.port(Integer.parseInt(s));
}
return b.build();
}
private String[] stats(List<Long> list) {
if (list.isEmpty()) {
return new String[]{
Duration.ofMillis(0).toString(),
Duration.ofMillis(0).toString(),
Duration.ofMillis(0).toString(),
Duration.ofMillis(0).toString()
};
}
long max = Long.MIN_VALUE;
long min = Long.MAX_VALUE;
long sum = 0;
for (long l : list) {
sum += l;
if (l > max) {
max = l;
}
if (l < min) {
min = l;
}
}
return new String[]{
Duration.ofMillis(min).toString(),
Duration.ofMillis(max).toString(),
Duration.ofMillis(sum / list.size()).toString(),
Duration.ofMillis(sum).toString()
};
}
/**
*
* @param table
* @param ndays
* @throws IOException
* @throws ParseException
*/
static int sliceTable(File table, int ndays) throws IOException, ParseException {
CSTable t = DataIO.table(table);
SimpleDateFormat df = DataIO.lookupDateFormat(t, 1);
List<String> t_col = DataIO.columnNames(t);
MemoryTable t1 = new MemoryTable();
t1.setName(t.getName());
t1.getInfo().putAll(t.getInfo());
t1.setColumns(t_col.toArray(new String[0]));
for (int c = 0; c < t_col.size(); c++) {
t1.getColumnInfo(c + 1).putAll(t.getColumnInfo(c + 1));
}
Iterator<String[]> rows = t.rows().iterator();
int j = 0;
int i = 0;
String[] ro = new String[t_col.size()];
while (rows.hasNext()) {
String[] r = rows.next();
System.arraycopy(r, 1, ro, 0, r.length - 1);
t1.addRow((Object[]) ro);
if (j == 0) {
t1.getInfo().put(DataIO.DATE_START, ro[0]);
Calendar s_ = toCalendar(df.parse(ro[0]));
s_.add(Calendar.DATE, ndays);
t1.getInfo().put(DataIO.DATE_END, df.format(s_.getTime()));
}
if (j++ == ndays) {
PrintWriter w = new PrintWriter(new FileWriter(table.toString() + "." + (i++)));
DataIO.print(t1, w);
w.close();
t1 = new MemoryTable();
t1.setName(t.getName());
t1.getInfo().putAll(t.getInfo());
t1.setColumns(t_col.toArray(new String[0]));
for (int c = 0; c < t_col.size(); c++) {
t1.getColumnInfo(c + 1).putAll(t.getColumnInfo(c + 1));
}
j = 0;
}
}
Calendar s_ = toCalendar(df.parse(ro[0]));
t1.getInfo().put(DataIO.DATE_END, df.format(s_.getTime()));
PrintWriter w = new PrintWriter(new FileWriter(table.toString() + "." + (i++)));
DataIO.print(t1, w);
w.close();
return i;
}
static void combineFiles(File out, int chunks) throws IOException {
for (int i = 0; i < chunks; i++) {
File of = new File(out.getParentFile(), out.getName() + "-" + i);
FileUtils.copyFile(of, FileUtils.openOutputStream(out, true));
}
}
static void combineTables(File out, int chunks) throws IOException {
CSTable t = DataIO.table(new File(out.getParentFile(), out.getName() + "-0"));
SimpleDateFormat df = DataIO.lookupDateFormat(t, 1);
List<String> t_col = DataIO.columnNames(t);
MemoryTable t1 = new MemoryTable();
t1.setName(t.getName());
t1.getInfo().putAll(t.getInfo());
t1.setColumns(t_col.toArray(new String[0]));
for (int c = 0; c < t_col.size(); c++) {
t1.getColumnInfo(c + 1).putAll(t.getColumnInfo(c + 1));
}
String[] ro = new String[t_col.size()];
for (int i = 0; i < chunks; i++) {
File of = new File(out.getParentFile(), out.getName() + "-" + i);
CSTable t2 = DataIO.table(of);
t1.getInfo().put(DataIO.DATE_END, t2.getInfo().get(DataIO.DATE_END));
for (String[] r : t2.rows()) {
System.arraycopy(r, 1, ro, 0, r.length - 1);
t1.addRow((Object[]) ro);
}
}
PrintWriter w = new PrintWriter(new FileWriter(out));
DataIO.print(t1, w);
w.close();
// FileUtils.deleteQuietly(of);
}
static Calendar toCalendar(Date d) {
Calendar c = Calendar.getInstance();
c.setTime(d);
return c;
}
private String createConfig() {
return "#config generated " + new Date() + "\n"
+ "input-data-file-name=" + input_data_file + "\n"
+ "type-of-data=" + type_of_data + "\n"
+ "#Time step of data: 1=hourly; 2=daily; 3=monthly; 4=yearly\n"
+ "time-step=2\n"
+ "coord-system=4\n"
+ "elevation-grid-file-name=" + elevation_grid_file + "\n"
+ "watershed-mask-file-name=\n"
+ "zone-grid-file-name=" + zone_grid_file + "\n"
+ "output-format=1\n"
+ "beginning-day-number=\n"
+ "ending-day-number=\n"
+ "output-precision=1\n"
+ "output-file-name=" + output_file + "\n"
+ "zone-output-file-name=" + zone_output_file + "\n"
+ "regression-method=" + regression_method + "\n"
+ "station-weighting-method=1\n"
+ "timesteps-per-period=1\n"
+ "input-format-csv=true\n"
+ "print-input=false\n"
+ "print-distances=false\n"
+ "print-residuals=false\n"
+ "print-regressions=true\n"
+ "print-weights=false\n";
}
/**
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
sliceTable(new File("/od/tmp/tmax_all_data.csv"), 100);
}
}