V1_0.java [src/java/m/timeseries/query] Revision: 7d3c722583da4acd033e8f0582f4a3f3fc0bf348 Date: Thu Nov 01 15:30:27 MDT 2018
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package m.timeseries.query;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import javax.ws.rs.Path;
import oms3.annotations.*;
import csip.ModelDataService;
import csip.ServiceException;
import csip.SessionLogger;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.TimeZone;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.bson.conversions.Bson;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Mongodb timeseries data insertion
*
* @author Dave Patterson
*/
@Name("Timeseries query")
@Description("Query a timeseries collection in mongo.")
@Path("m/query/1.0")
public class V1_0 extends ModelDataService {
MongoClient mongo;
MongoDatabase db;
JSONArray results = new JSONArray();
protected void open(String mongoclienturi) {
MongoClientURI u = new MongoClientURI(mongoclienturi);
mongo = new MongoClient(u);
if (u.getDatabase() == null) {
throw new RuntimeException("Missing database in mongo uri: " + mongoclienturi);
}
db = mongo.getDatabase(u.getDatabase());
}
protected void close() {
mongo.close();
}
@Override
public void doProcess() throws Exception {
String mongo_uri = getStringParam("mongo_uri");
String mongo_collection = getStringParam("mongo_collection");
String start_date = getStringParam("start_date", null);
String end_date = getStringParam("end_date", null);
boolean metadata_only = getBooleanParam("metadata_only", true);
boolean compute_stats = getBooleanParam("compute_stats", false);
String search_id = getStringParam("search_id", null);
JSONObject search_feature;
try {
search_feature = getJSONParam("search_feature", null);
} catch (ServiceException ex) {
throw new RuntimeException(ex);
}
run(mongo_uri, mongo_collection, search_feature, search_id, start_date, end_date, metadata_only, compute_stats, LOG);
}
public void run(String mongo_uri, String mongo_collection, JSONObject search_feature, String search_id,
String start_date_str, String end_date_str, boolean metadata_only, boolean compute_stats, SessionLogger LOG) throws JSONException, ServiceException {
open(mongo_uri);
MongoCollection<Document> collection = db.getCollection(mongo_collection);
SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd");
ft.setTimeZone(TimeZone.getTimeZone("UTC"));
Date start_date;
Date end_date;
try {
start_date = ft.parse(start_date_str);
end_date = ft.parse(end_date_str);
} catch (ParseException ex) {
throw new ServiceException(ex);
}
if (search_feature == null) {
FindIterable<Document> c;
if (search_id == null) {
c = collection.find();
} else {
// Check for object ID (will be a hexidecimal string)
boolean id_test = search_id.matches("\\p{XDigit}+");
if (id_test) {
ObjectId search_objid = new ObjectId(search_id);
c = collection.find(Filters.eq("_id", search_objid));
} else {
c = collection.find(Filters.eq("_id", search_id));
}
}
for (Document doc: c) {
JSONObject res = getResult(collection, doc, start_date, end_date, metadata_only, compute_stats);
results.put(res);
}
} else {
JSONArray features = search_feature.getJSONArray("features");
for (int i = 0; i < features.length(); i++) {
JSONObject feat = features.getJSONObject(i);
List<Bson> agg_match = null;
try {
agg_match = getMatch(feat, search_id);
} catch (IOException ex) {
Logger.getLogger(V1_0.class.getName()).log(Level.SEVERE, null, ex);
}
AggregateIterable<Document> c = collection.aggregate(agg_match);
for (Document doc: c) {
JSONObject res = getResult(collection, doc, start_date, end_date, metadata_only, compute_stats);
results.put(res);
}
}
}
}
private JSONObject getResult(MongoCollection<Document> c, Document doc, Date start_date, Date end_date,
boolean metadata_only, boolean compute_stats) throws JSONException {
String date_fmt = (String)doc.get("date_fmt");
SimpleDateFormat ft = new SimpleDateFormat(date_fmt);
ft.setTimeZone(TimeZone.getTimeZone("UTC"));
List<String> header = (ArrayList)doc.get("header");
JSONArray output = new JSONArray();
List<List<Object>> filtered = new ArrayList();
JSONObject statResult = new JSONObject();
if (!metadata_only) {
Document datesQuery = new Document();
if ((start_date != null || end_date != null)) {
datesQuery = new Document();
if (start_date != null) {
datesQuery.append("$gte", start_date);
}
if (end_date != null) {
datesQuery.append("$lte", end_date);
}
datesQuery = new Document("data.0", datesQuery);
}
Document project = new Document(
"date", new Document("$arrayElemAt", Arrays.asList("$data", 0))
);
for (int i=1; i<header.size(); i++) {
project.append(header.get(i), new Document("$arrayElemAt", Arrays.asList("$data", i)));
}
Document group = new Document("_id", "big").append("data", new Document("$push", "$data"));
// Export for robomongo
List<Bson> aggList = Arrays.asList(
Aggregates.match(new Document("_id", doc.getString("_id"))),
Aggregates.unwind("$data"),
//Aggregates.project(project)
Aggregates.match(datesQuery),
// Wind array back
new Document("$group", group)
);
AggregateIterable<Document> iter = c.aggregate(aggList).allowDiskUse(true);
for (Document d: iter) {
filtered = (List<List<Object>>) d.get("data");
// Fix date formatting
for (List<Object> objList: filtered) {
Date dt = (Date)objList.get(0);
objList.set(0, ft.format(dt));
}
}
if (compute_stats) {
Document statsDoc = new Document("_id", null);
List<String> stats = Arrays.asList("min", "max", "avg", "stdDevPop");
for (String stat: stats) {
for (int i=1; i<header.size(); i++) {
statsDoc.append(header.get(i) + "_" + stat, new Document("$" + stat, "$" + header.get(i)));
}
}
aggList = Arrays.asList(
Aggregates.match(new Document("_id", doc.getString("_id"))),
Aggregates.unwind("$data"),
Aggregates.match(datesQuery),
Aggregates.project(project),
new Document("$group", statsDoc)
);
iter = c.aggregate(aggList).allowDiskUse(true);
for (Document d: iter) {
for (String stat: stats) {
for (int i=1; i<header.size(); i++) {
String key = header.get(i) + "_" + stat;
statResult.put(key, d.get(key));
}
}
}
}
}
JSONObject res = new JSONObject();
res.put("_id", doc.get("_id"));
Document location = (Document)doc.get("location");
List<Double> coords = (List<Double>)location.get("coordinates");
res.put("location", coords);
res.put("header", header);
if (!metadata_only) {
res.put("data", filtered);
}
Document metadata = (Document)doc.get("metadata");
res.put("metadata", new JSONObject(metadata.toJson()));
if (compute_stats) {
res.put("statistics", statResult);
}
return res;
}
@Override
protected void postProcess() throws Exception {
putResult("result", results);
}
public List<Bson> getMatch(JSONObject feat, String search_id) throws JSONException, IOException {
String geom = feat.getJSONObject("geometry").toString();
Document match_arg = new Document("location",
new Document("$geoWithin",
new Document("$geometry", Document.parse(geom))
)
);
if (search_id != null) {
// Check for object ID (will be a hexidecimal string)
boolean id_test = search_id.matches("\\p{XDigit}+");
if (id_test) {
ObjectId search_objid = new ObjectId(search_id);
match_arg.put("_id", search_objid);
} else {
match_arg.put("_id", search_id);
}
}
Document new_match = new Document("$match", match_arg);
List<Bson> agg_match = new ArrayList<>();
agg_match.add(new_match);
return agg_match;
}
public static void main(String [] args) throws FileNotFoundException, IOException, JSONException, ParseException, ServiceException {
V1_0 timeseries = new V1_0();
JSONObject searchFeature = new JSONObject("{\n" +
" \"type\": \"FeatureCollection\",\n" +
" \"features\": [\n" +
" {\n" +
" \"geometry\": {\n" +
" \"type\": \"MultiPolygon\",\n" +
" \"coordinates\": [\n" +
" [\n" +
" [\n" +
" [\n" +
" -105.09928580027,\n" +
" 40.701406829918\n" +
" ],\n" +
" [\n" +
" -105.09447928172,\n" +
" 40.365324016903\n" +
" ],\n" +
" [\n" +
" -104.72849722611,\n" +
" 40.358522384503\n" +
" ],\n" +
" [\n" +
" -104.72437735306,\n" +
" 40.701927386425\n" +
" ],\n" +
" [\n" +
" -105.09928580027,\n" +
" 40.701406829918\n" +
" ]\n" +
" ]\n" +
" ]\n" +
" ]\n" +
" },\n" +
" \"type\": \"Feature\",\n" +
" \"properties\": {\n" +
" \"gid\": 1,\n" +
" \"name\": \"gid 1\"\n" +
" }\n" +
" }\n" +
" ]\n" +
" }");
SessionLogger LOG = new SessionLogger();
String search_id = null;
timeseries.run("mongodb://eds0.engr.colostate.edu:27017/csip_timeseries", "test_coll", searchFeature, search_id, "2014-04-04", "2014-05-08", false, true, LOG);
}
}