V1_0.java [src/java/m/timeseries/query] Revision: 5b62b922fcd6602aeb057d237fdc02e08c32db1f  Date: Thu Nov 15 16:15:23 MST 2018
/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package m.timeseries.query;

import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import javax.ws.rs.Path;
import csip.ModelDataService;
import csip.ServiceException;
import csip.annotations.*;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.bson.conversions.Bson;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;

/**
 * Mongodb timeseries data insertion
 *
 * @author Dave Patterson
 */
@Name("Timeseries query")
@Description("Query a timeseries collection in mongo.")
@Author(org = "CSU")
@License(License.MIT)
@Path("m/query/1.0")
public class V1_0 extends ModelDataService {

  MongoClient mongo;
  MongoDatabase db;

  JSONArray results = new JSONArray();

  protected void open(String mongoclienturi) {
    MongoClientURI u = new MongoClientURI(mongoclienturi);
    mongo = new MongoClient(u);
    if (u.getDatabase() == null) {
      throw new RuntimeException("Missing database in mongo uri: " + mongoclienturi);
    }
    db = mongo.getDatabase(u.getDatabase());
  }

  protected void close() {
    mongo.close();
  }

  @Override
  public void doProcess() throws Exception {
    String mongoUri = getStringParam("mongo_uri");
    String mongoCollection = getStringParam("mongo_collection");
    String startDate = getStringParam("start_date", null);
    String endDate = getStringParam("end_date", null);
    boolean metadataOnly = getBooleanParam("metadata_only", true);
    boolean computeStats = getBooleanParam("compute_stats", false);
    boolean computeStatsLocal = getBooleanParam("compute_stats_local", false);
    String searchId = getStringParam("search_id", null);
    JSONObject searchFeature = getJSONParam("search_feature", null);

    run(mongoUri, mongoCollection, searchFeature, searchId, startDate, endDate, metadataOnly, computeStats, computeStatsLocal);
  }

  @Override
  protected void postProcess() throws Exception {
    putResult("result", results);
  }

  void run(String mongoUri, String mongoCollection, JSONObject searchFeature, String searchId,
          String startDateStr, String endDateStr, boolean metadataOnly, boolean computeStats, boolean computeStatsLocal)
          throws JSONException, ServiceException {

    open(mongoUri);
    MongoCollection<Document> collection = db.getCollection(mongoCollection);

    SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd");
    ft.setTimeZone(TimeZone.getTimeZone("UTC"));
    Date startDate = null;
    Date endDate = null;
    if (startDateStr != null) {
      try {
        startDate = ft.parse(startDateStr);
        endDate = ft.parse(endDateStr);
      } catch (ParseException ex) {
        throw new ServiceException(ex);
      }
    }

    Document locationsOnly = new Document().append("location_id", new Document().append("$exists", false));
    if (searchFeature == null) {
      FindIterable<Document> c;
      if (searchId == null) {
        // No search and no location filter, so get all docs in collection
        c = collection.find(locationsOnly);
      } else {
        // Search by document ID.
        // Check for object ID (will be a hexidecimal string)
        boolean idTest = searchId.matches("\\p{XDigit}+");
        if (idTest) {
          ObjectId search_objid = new ObjectId(searchId);
          c = collection.find(Filters.eq("_id", search_objid));
        } else {
          // Simple search using string
          c = collection.find(Filters.eq("_id", searchId));
        }
      }
      for (Document locationDoc : c) {
        JSONObject res = getResult(collection, locationDoc, startDate, endDate, metadataOnly, computeStats, computeStatsLocal);
        results.put(res);
      }
    } else {
      JSONArray features = searchFeature.getJSONArray("features");
      for (int i = 0; i < features.length(); i++) {
        JSONObject feat = features.getJSONObject(i);
        List<Bson> aggMatch = null;
        try {
          aggMatch = getMatch(feat, searchId);
        } catch (IOException ex) {
          throw new ServiceException(ex);
        }
        AggregateIterable<Document> c = collection.aggregate(aggMatch);
        for (Document locationDoc : c) {
          JSONObject res = getResult(collection, locationDoc, startDate, endDate, metadataOnly, computeStats, computeStatsLocal);
          results.put(res);
        }
      }
    }
    close();
  }

  private JSONObject getResult(MongoCollection<Document> c, Document locationDoc, Date startDate, Date endDate,
          boolean metadataOnly, boolean computeStats, boolean computeStatsLocal) throws JSONException {
    String dateFormat = (String) locationDoc.get("date_fmt");
    if (dateFormat == null) {
      dateFormat = "yyyy-MM-dd";
    }
    SimpleDateFormat ft = new SimpleDateFormat(dateFormat);
    ft.setTimeZone(TimeZone.getTimeZone("UTC"));

    List<String> header = (ArrayList) locationDoc.get("header");
    List<List<Object>> data = (ArrayList) locationDoc.get("data");
    List<List<Object>> filtered = new ArrayList();
    JSONObject statResult = new JSONObject();
    JSONObject statResultLocal = new JSONObject();

    if (!metadataOnly || computeStats) {
      Document datesQuery = new Document();
      if ((startDate != null || endDate != null)) {
        datesQuery = new Document();
        if (startDate != null) {
          datesQuery.append("$gte", startDate);
        }
        if (endDate != null) {
          datesQuery.append("$lte", endDate);
        }
        datesQuery = new Document("data.0", datesQuery);
      }

      Document project = new Document(
              "date", new Document("$arrayElemAt", Arrays.asList("$data", 0))
      );
      for (int i = 1; i < header.size(); i++) {
        project.append(header.get(i), new Document("$arrayElemAt", Arrays.asList("$data", i)));
      }
      project.append("month", new Document("$arrayElemAt", Arrays.asList("$data", 0)));
      project.append("year", new Document("$arrayElemAt", Arrays.asList("$data", 0)));

      Document group = new Document("_id", new Document("month", "$month").append("year", "$year"));
      for (int i = 1; i < header.size(); i++) {
        group.append(header.get(i) + "_avg", new Document("$avg", "$" + header.get(i)));
      }

      // Export for robomongo
      List<Bson> aggList = Arrays.asList(
              Aggregates.match(new Document("location_id", locationDoc.getString("_id"))),
              Aggregates.unwind("$data"),
              Aggregates.match(datesQuery),
              Aggregates.project(project),
              new Document("$group", group),
              new Document("$sort", new Document("year", 1).append("month", 1))
      );

      AggregateIterable<Document> iter = c.aggregate(aggList).allowDiskUse(true);
      for (Document d : iter) {
        List<Object> row = (List<Object>) d.get("data");
        // Fix date formatting
        Date dt = (Date) row.get(0);
        row.set(0, ft.format(dt));
        filtered.add(row);
      }

      if (computeStats || computeStatsLocal) {
        Document statsDoc = new Document("_id", null);
        List<String> stats = Arrays.asList("min", "max", "avg", "stdDevPop");
        for (String stat : stats) {
          for (int i = 1; i < header.size(); i++) {
            statsDoc.append(header.get(i) + "_" + stat, new Document("$" + stat, "$" + header.get(i)));
          }
        }

        aggList = Arrays.asList(
                Aggregates.match(new Document("location_id", locationDoc.getString("_id"))),
                Aggregates.unwind("$data"),
                Aggregates.match(datesQuery),
                Aggregates.project(project),
                new Document("$group", statsDoc)
        );
        if (computeStats) {
          iter = c.aggregate(aggList).allowDiskUse(true);
          for (Document d : iter) {
            for (String stat : stats) {
              for (int i = 1; i < header.size(); i++) {
                String key = header.get(i) + "_" + stat;
                statResult.put(key, d.get(key));
              }
            }
          }
        }

        // Add locally computed stats
        if (computeStatsLocal) {
          for (List<Object> objList : filtered) {
            for (int i = 1; i < header.size(); i++) {
              double[] vals = new double[filtered.size()];
              for (int j = 0; j < filtered.size(); j++) {
                vals[j] = (double) filtered.get(j).get(i);
              }
              for (String stat : stats) {
                String key = header.get(i) + "_" + stat;
                if ("min".equals(stat)) {
                  statResultLocal.put(key, oms3.util.Statistics.min(vals));
                }
                if ("max".equals(stat)) {
                  statResultLocal.put(key, oms3.util.Statistics.max(vals));
                }
                if ("avg".equals(stat)) {
                  statResultLocal.put(key, oms3.util.Statistics.mean(vals));
                }
                if ("stdDevPop".equals(stat)) {
                  statResultLocal.put(key, oms3.util.Statistics.stddev(vals));
                }
              }
            }
          }
        }
      }
    }

    JSONObject res = new JSONObject();
    res.put("_id", locationDoc.get("_id"));
    Document location = (Document) locationDoc.get("location");
    if (location != null) {
      List<Double> coords = (List<Double>) location.get("coordinates");
      res.put("location", coords);
    }
    res.put("header", header);
    if (!metadataOnly) {
      res.put("data", filtered);
    }
    Document metadata = (Document) locationDoc.get("metadata");
    res.put("metadata", new JSONObject(metadata.toJson()));

    if (computeStats) {
      res.put("statistics", statResult);
      res.put("statistics_local", statResultLocal);
    }

    return res;
  }

  public List<Bson> getMatch(JSONObject feat, String search_id) throws JSONException, IOException {
    String geom = feat.getJSONObject("geometry").toString();
    Document match_arg = new Document("location",
            new Document("$geoWithin",
                    new Document("$geometry", Document.parse(geom))
            )
    );
    if (search_id != null) {
      // Check for object ID (will be a hexidecimal string)
      boolean id_test = search_id.matches("\\p{XDigit}+");
      if (id_test) {
        ObjectId search_objid = new ObjectId(search_id);
        match_arg.put("_id", search_objid);
      } else {
        match_arg.put("_id", search_id);
      }
    }
    Document new_match = new Document("$match", match_arg);
    List<Bson> agg_match = new ArrayList<>();
    agg_match.add(new_match);
    return agg_match;
  }

  public static void main(String[] args) throws FileNotFoundException, IOException, JSONException, ParseException, ServiceException {
    V1_0 timeseries = new V1_0();
    JSONObject searchFeature = new JSONObject("{\n"
            + "                \"type\": \"FeatureCollection\",\n"
            + "                \"features\": [\n"
            + "                  {\n"
            + "                    \"geometry\": {\n"
            + "                      \"type\": \"MultiPolygon\",\n"
            + "                      \"coordinates\": [\n"
            + "                        [\n"
            + "                          [\n"
            + "                            [\n"
            + "                              -105.09928580027,\n"
            + "                              40.701406829918\n"
            + "                            ],\n"
            + "                            [\n"
            + "                              -105.09447928172,\n"
            + "                              40.365324016903\n"
            + "                            ],\n"
            + "                            [\n"
            + "                              -104.72849722611,\n"
            + "                              40.358522384503\n"
            + "                            ],\n"
            + "                            [\n"
            + "                              -104.72437735306,\n"
            + "                              40.701927386425\n"
            + "                            ],\n"
            + "                            [\n"
            + "                              -105.09928580027,\n"
            + "                              40.701406829918\n"
            + "                            ]\n"
            + "                          ]\n"
            + "                        ]\n"
            + "                      ]\n"
            + "                    },\n"
            + "                    \"type\": \"Feature\",\n"
            + "                    \"properties\": {\n"
            + "                      \"gid\": 1,\n"
            + "                      \"name\": \"gid 1\"\n"
            + "                    }\n"
            + "                  }\n"
            + "                ]\n"
            + "            }");
    String search_id = null;
    timeseries.run("mongodb://eds0.engr.colostate.edu:27017/csip_timeseries", "test_coll", searchFeature, search_id, "2014-04-04", "2014-05-08", false, true, false);
  }
}