V1_0.java [src/java/m/ewsf/dk] Revision: default  Date:
/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package m.ewsf.dk;

import csip.utils.*;
import csip.Config;
import csip.api.server.*;
import csip.ModelDataService;
import csip.annotations.*;
import static csip.annotations.ResourceType.*;
import csip.utils.JSONUtils;
import csip.utils.Services;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URI;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import javax.ws.rs.Path;
import javax.ws.rs.core.UriBuilder;
import oms3.io.CSTable;
import oms3.io.DataIO;
import oms3.io.MemoryTable;
import oms3.util.Dates;
import org.apache.commons.io.FileUtils;
import org.codehaus.jettison.json.JSONObject;

/**
 * DK. service implementation. (dk:D.Garen)......'
 *
 * @author od
 */
@Name("dk")
@Description("Detrended Kriging")
@Path("m/dk/1.0")
@Polling(first = 1000, next = 2000)
// resource definition for the mod.exe executable
@Resource(type = EXECUTABLE, file = "/bin/${csip.arch}/dk.exe", args = "-k dk.config", id = "dk")
// list all the files (wildcards alowed) that should be captured as output)
@Resource(type = OUTPUT, file = "*stdout.txt *stderr.txt")
public class V1_0 extends ModelDataService {

    File elevation_grid_file; // in
    File input_data_file; // in
    File zone_grid_file; // in
    //
    String output_file; // out
    String zone_output_file;  //out

    // param
    int regression_method = 2;
    int type_of_data = 1;
    long dk_days = -1;

    // # files as chunks.
    int chunks;
    String total_time;
    String stats[];


    @Override
    protected void preProcess() throws Exception {
        dk_days = parameter().getInt("dk_days", -1);  // default do it alltogether.

        elevation_grid_file = attachments().getFile(parameter().getString("elevation-grid-file"));
        input_data_file = attachments().getFile(parameter().getString("input-data-file"));
        zone_grid_file = attachments().getFile(parameter().getString("zone-grid-file"));

        // output file names.
        output_file = parameter().getString("output-file");
        zone_output_file = parameter().getString("zone-output-file");

        regression_method = parameter().getInt("regression-method");
        type_of_data = parameter().getInt("type-of-data");
    }


    @Override
    protected void doProcess() throws Exception {
        CSTable t = DataIO.table(input_data_file);
        SimpleDateFormat df = DataIO.lookupDateFormat(t, 1);
        String start = t.getInfo().get(DataIO.DATE_START);
        String end = t.getInfo().get(DataIO.DATE_END);

        long diffDays = Dates.diffDayPeriods(toCalendar(df.parse(start)), toCalendar(df.parse(end)));
        LOG.info("Diff days: " + diffDays);

        // check if slicing makes sense.
        if (diffDays <= dk_days) {
            dk_days = -1;  // 
        }

        // the period is small enough, we can run it here
        if (dk_days == -1) {
            String conf = createConfig();
            FileUtils.writeStringToFile(new File(workspace().getDir(), "dk.config"), conf);
            LOG.info(conf);

            // get the resource by its 'id' : dk
            Executable e = resources().getExe("dk");
            int ret = e.exec();
            if (ret == 0) {
                return;
            }
            throw new ServiceException(FileUtils.readFileToString(e.stderr()));
        } else {
            // resubmit the pieces.
            chunks = sliceTable(input_data_file, (int) dk_days);
            LOG.info("slice table into " + chunks + " of " + dk_days);
            List<Long> cpu_time = Collections.synchronizedList(new ArrayList<>(chunks));

            String internalURL = toPeerURI(new URI(request().getURL())).toString();
            long timestart = System.currentTimeMillis();

            Services.runParallel(chunks, new Services.CallableFactory() {

                @Override
                public Callable create(final int i) {

                    // create callable
                    return new Callable() {
                        @Override
                        public Void call() throws Exception {
                            long timestart_i = System.currentTimeMillis();
                            JSONObject request = new JSONObject(request().getRequest().toString());

                            LOG.info("call #" + i + " of " + chunks);
                            // adjust parameter
                            Map<String, JSONObject> pm = JSONUtils.getParameter(request);
                            pm.put("input-data-file", JSONUtils.data("input-data-file", input_data_file.getName() + "." + i));
                            pm.put("dk_days", JSONUtils.data("dk_days", -1));
                            JSONObject newrequest = JSONUtils.newRequest(pm);

                            Client cl = new Client();
                            JSONObject response = cl.doPOST(internalURL, newrequest,
                                    new File[]{
                                        new File(input_data_file + "." + i),
                                        elevation_grid_file,
                                        zone_grid_file
                                    });
                            // output

                            String error = JSONUtils.getErrorStatus(response);
                            if (error != null) {
                                throw new Exception("Error in run #" + i + ": " + error);
                            }
                            LOG.info("done #" + i + " of " + chunks);

                            Map<String, JSONObject> res = JSONUtils.getResults(response);

                            File of = new File(workspace().getDir(), output_file + "-" + i);
                            File zof = new File(workspace().getDir(), zone_output_file + "-" + i);

                            cl.doGET(JSONUtils.getStringParam(res, output_file, ""), of);
                            cl.doGET(JSONUtils.getStringParam(res, zone_output_file, ""), zof);
                            long timeend_i = System.currentTimeMillis();
                            cpu_time.add(timeend_i - timestart_i);
                            return null;
                        }
                    };
                }
            });
            long timeend = System.currentTimeMillis();
            total_time = Duration.ofMillis(timeend - timestart).toString();
            stats = stats(cpu_time);
        }
    }


    @Override
    protected void postProcess() throws Exception {
        if (chunks > 0) {
            // assemble the files back again only if we have to reduce.
            combineFiles(new File(workspace().getDir(), output_file), chunks);
            combineTables(new File(workspace().getDir(), zone_output_file), chunks);
            results().put("chunks", chunks);
            results().put("real_time", total_time);
            results().put("min_time", stats[0]);
            results().put("max_time", stats[1]);
            results().put("avg_time", stats[2]);
            results().put("total_time", stats[3]);
        }
        results().put(new File(workspace().getDir(), output_file), "output file");
        results().put(new File(workspace().getDir(), zone_output_file), "zone output file");
    }

/////////////    

    /**
     * allow rewriting peer URL to avoid leaving AWS for example.
     *
     * @param u
     * @return
     */
    public static URI toPeerURI(URI u) {
        UriBuilder b = UriBuilder.fromUri(u);
        String s = Config.getString("ewsf.dk.scheme");
        if (s != null) {
            b = b.scheme(s);
        }
        s = Config.getString("ewsf.dk.host");
        if (s != null) {
            b = b.host(s);
        }
        s = Config.getString("ewsf.dk.port");
        if (s != null) {
            b = b.port(Integer.parseInt(s));
        }
        return b.build();
    }


    private String[] stats(List<Long> list) {
        if (list.isEmpty()) {
            return new String[]{
                Duration.ofMillis(0).toString(),
                Duration.ofMillis(0).toString(),
                Duration.ofMillis(0).toString(),
                Duration.ofMillis(0).toString()
            };
        }
        long max = Long.MIN_VALUE;
        long min = Long.MAX_VALUE;
        long sum = 0;
        for (long l : list) {
            sum += l;
            if (l > max) {
                max = l;
            }
            if (l < min) {
                min = l;
            }
        }
        return new String[]{
            Duration.ofMillis(min).toString(),
            Duration.ofMillis(max).toString(),
            Duration.ofMillis(sum / list.size()).toString(),
            Duration.ofMillis(sum).toString()
        };
    }


    /**
     *
     * @param table
     * @param ndays
     * @throws IOException
     * @throws ParseException
     */
    static int sliceTable(File table, int ndays) throws IOException, ParseException {
        CSTable t = DataIO.table(table);

        SimpleDateFormat df = DataIO.lookupDateFormat(t, 1);
        List<String> t_col = DataIO.columnNames(t);

        MemoryTable t1 = new MemoryTable();
        t1.setName(t.getName());
        t1.getInfo().putAll(t.getInfo());
        t1.setColumns(t_col.toArray(new String[0]));
        for (int c = 0; c < t_col.size(); c++) {
            t1.getColumnInfo(c + 1).putAll(t.getColumnInfo(c + 1));
        }

        Iterator<String[]> rows = t.rows().iterator();
        int j = 0;
        int i = 0;
        String[] ro = new String[t_col.size()];
        while (rows.hasNext()) {
            String[] r = rows.next();
            System.arraycopy(r, 1, ro, 0, r.length - 1);
            t1.addRow((Object[]) ro);

            if (j == 0) {
                t1.getInfo().put(DataIO.DATE_START, ro[0]);
                Calendar s_ = toCalendar(df.parse(ro[0]));
                s_.add(Calendar.DATE, ndays);
                t1.getInfo().put(DataIO.DATE_END, df.format(s_.getTime()));
            }

            if (j++ == ndays) {
                PrintWriter w = new PrintWriter(new FileWriter(table.toString() + "." + (i++)));
                DataIO.print(t1, w);
                w.close();

                t1 = new MemoryTable();
                t1.setName(t.getName());
                t1.getInfo().putAll(t.getInfo());
                t1.setColumns(t_col.toArray(new String[0]));
                for (int c = 0; c < t_col.size(); c++) {
                    t1.getColumnInfo(c + 1).putAll(t.getColumnInfo(c + 1));
                }
                j = 0;
            }
        }
        Calendar s_ = toCalendar(df.parse(ro[0]));
        t1.getInfo().put(DataIO.DATE_END, df.format(s_.getTime()));
        PrintWriter w = new PrintWriter(new FileWriter(table.toString() + "." + (i++)));
        DataIO.print(t1, w);
        w.close();
        return i;
    }


    static void combineFiles(File out, int chunks) throws IOException {
        for (int i = 0; i < chunks; i++) {
            File of = new File(out.getParentFile(), out.getName() + "-" + i);
            FileUtils.copyFile(of, FileUtils.openOutputStream(out, true));
        }
    }


    static void combineTables(File out, int chunks) throws IOException {
        CSTable t = DataIO.table(new File(out.getParentFile(), out.getName() + "-0"));

        SimpleDateFormat df = DataIO.lookupDateFormat(t, 1);
        List<String> t_col = DataIO.columnNames(t);

        MemoryTable t1 = new MemoryTable();
        t1.setName(t.getName());
        t1.getInfo().putAll(t.getInfo());
        t1.setColumns(t_col.toArray(new String[0]));
        for (int c = 0; c < t_col.size(); c++) {
            t1.getColumnInfo(c + 1).putAll(t.getColumnInfo(c + 1));
        }

        String[] ro = new String[t_col.size()];
        for (int i = 0; i < chunks; i++) {
            File of = new File(out.getParentFile(), out.getName() + "-" + i);
            CSTable t2 = DataIO.table(of);
            t1.getInfo().put(DataIO.DATE_END, t2.getInfo().get(DataIO.DATE_END));
            for (String[] r : t2.rows()) {
                System.arraycopy(r, 1, ro, 0, r.length - 1);
                t1.addRow((Object[]) ro);
            }
        }

        PrintWriter w = new PrintWriter(new FileWriter(out));
        DataIO.print(t1, w);
        w.close();
//          FileUtils.deleteQuietly(of);
    }


    static Calendar toCalendar(Date d) {
        Calendar c = Calendar.getInstance();
        c.setTime(d);
        return c;
    }


    private String createConfig() {
        return "#config generated " + new Date() + "\n"
                + "input-data-file-name=" + input_data_file + "\n"
                + "type-of-data=" + type_of_data + "\n"
                + "#Time step of data: 1=hourly; 2=daily; 3=monthly; 4=yearly\n"
                + "time-step=2\n"
                + "coord-system=4\n"
                + "elevation-grid-file-name=" + elevation_grid_file + "\n"
                + "watershed-mask-file-name=\n"
                + "zone-grid-file-name=" + zone_grid_file + "\n"
                + "output-format=1\n"
                + "beginning-day-number=\n"
                + "ending-day-number=\n"
                + "output-precision=1\n"
                + "output-file-name=" + output_file + "\n"
                + "zone-output-file-name=" + zone_output_file + "\n"
                + "regression-method=" + regression_method + "\n"
                + "station-weighting-method=1\n"
                + "timesteps-per-period=1\n"
                + "input-format-csv=true\n"
                + "print-input=false\n"
                + "print-distances=false\n"
                + "print-residuals=false\n"
                + "print-regressions=true\n"
                + "print-weights=false\n";
    }


    /**
     *
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        sliceTable(new File("/od/tmp/tmax_all_data.csv"), 100);
    }
}