V1_0.java [src/java/m/timeseries/aggregate] Revision: default Date:
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package m.timeseries.aggregate;
import com.mongodb.CommandResult;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import static com.mongodb.client.model.Aggregates.group;
import static com.mongodb.client.model.Aggregates.match;
import static com.mongodb.client.model.Aggregates.project;
import static com.mongodb.client.model.Aggregates.sort;
import static com.mongodb.client.model.Aggregates.unwind;
import com.mongodb.client.model.Filters;
import javax.ws.rs.Path;
import csip.ModelDataService;
import csip.ServiceException;
import csip.annotations.*;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import static java.util.Arrays.asList;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.bson.conversions.Bson;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
/**
* Mongodb timeseries data aggregation
*
* @author Dave Patterson
*/
@Name("Timeseries aggregation")
@Description("Aggregate timeseries data in mongo.")
@Author(org = "CSU")
@License(License.MIT)
@Path("m/aggregate/1.0")
public class V1_0 extends ModelDataService {
MongoClient mongo;
MongoDatabase db;
JSONArray results = new JSONArray();
protected void open(String mongoclienturi) {
MongoClientURI u = new MongoClientURI(mongoclienturi);
mongo = new MongoClient(u);
if (u.getDatabase() == null) {
throw new RuntimeException("Missing database in mongo uri: " + mongoclienturi);
}
db = mongo.getDatabase(u.getDatabase());
}
protected void close() {
mongo.close();
}
@Override
public void doProcess() throws Exception {
String mongoUri = getStringParam("mongo_uri");
String mongoCollection = getStringParam("mongo_collection");
String startDate = getStringParam("start_date", null);
String endDate = getStringParam("end_date", null);
String searchId = getStringParam("search_id", null);
JSONObject searchFeature = getJSONParam("search_feature", null);
String aggregationLevel = getStringParam("aggregation_level", "monthly");
run(mongoUri, mongoCollection, searchFeature, searchId, startDate, endDate, aggregationLevel);
}
@Override
protected void postProcess() throws Exception {
putResult("result", results);
}
/**
*
* @param mongoUri
* @param mongoCollection
* @param searchFeature
* @param searchId
* @param startDateStr
* @param endDateStr
* @param aggregationLevel
* @throws JSONException
* @throws ServiceException
*/
void run(String mongoUri, String mongoCollection, JSONObject searchFeature, String searchId,
String startDateStr, String endDateStr, String aggregationLevel)
throws JSONException, ServiceException {
open(mongoUri);
MongoCollection<Document> collection = db.getCollection(mongoCollection);
SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd");
ft.setTimeZone(TimeZone.getTimeZone("UTC"));
Date startDate = null;
Date endDate = null;
if (startDateStr != null) {
try {
startDate = ft.parse(startDateStr);
endDate = ft.parse(endDateStr);
} catch (ParseException ex) {
throw new ServiceException(ex);
}
}
// This will find all location (top-level) documents by checking that the
// foreign-key location_id does not exist.
Document locationsOnly = new Document().append("location_id", new Document().append("$exists", false));
if (searchFeature == null) {
FindIterable<Document> c;
if (searchId == null) {
// No search and no location filter, so get all docs in collection
c = collection.find(locationsOnly);
} else {
// Search by document ID.
// Check for object ID (will be a hexidecimal string)
boolean idTest = searchId.matches("\\p{XDigit}+");
if (idTest) {
ObjectId search_objid = new ObjectId(searchId);
c = collection.find(Filters.eq("_id", search_objid));
} else {
// Simple search using string
c = collection.find(Filters.eq("_id", searchId));
}
}
for (Document locationDoc : c) {
JSONObject res = getResult(collection, locationDoc, startDate, endDate, aggregationLevel);
results.put(res);
}
} else {
JSONArray features = searchFeature.getJSONArray("features");
for (int i = 0; i < features.length(); i++) {
JSONObject feat = features.getJSONObject(i);
List<Bson> aggMatch = null;
try {
aggMatch = getMatch(feat, searchId);
} catch (IOException ex) {
throw new ServiceException(ex);
}
AggregateIterable<Document> c = collection.aggregate(aggMatch);
for (Document locationDoc : c) {
JSONObject res = getResult(collection, locationDoc, startDate, endDate, aggregationLevel);
results.put(res);
}
}
}
close();
}
private JSONObject getResult(MongoCollection<Document> c, Document locationDoc, Date startDate, Date endDate,
String aggregationLevel) throws JSONException, ServiceException {
String testJS = "db.getCollection(\"bigdata test series\").aggregate([" +
" { \"$match\": { \"location_id\": \"big_yearlychunks\" } }," +
" { \"$unwind\": \"$data\" }," +
" { \"$project\": {" +
" \"tmin\": { \"$arrayElemAt\": [\"$data\", 1] }," +
" \"month\": { $month: { \"$arrayElemAt\": [\"$data\", 0] } }," +
" \"year\": { $year: { \"$arrayElemAt\": [\"$data\", 0] } }," +
" } }," +
" { \"$group\": {" +
" \"_id\": { month: \"$month\", year: \"$year\" }," +
" \"tmin_avg\": { \"$avg\": \"$tmin\" }," +
" \"tmin_min\": { \"$min\": \"$tmin\" }," +
" \"tmin_max\": { \"$max\": \"$tmin\" }," +
" \"month\": { \"$first\": \"$month\" }," +
" \"year\": { \"$first\": \"$year\" }," +
" } }," +
" { \"$sort\": { \"year\": 1, \"month\": 1 } }," +
"])";
List<String> header = (ArrayList) locationDoc.get("header");
List<List<Object>> filtered = new ArrayList();
List<Bson> aggList = asList(
match(new Document("location_id", locationDoc.getString("_id"))),
unwind("$data"),
match(new Document("data.0", filterData(startDate, endDate))),
project(projectData(header)),
// I can't use Aggregates.group because $first in the _id field causes an error.
new Document("$group", groupData(header)),
sort(new Document("year", 1).append("month", 1))
);
AggregateIterable<Document> iter = c.aggregate(aggList).allowDiskUse(true);
for (Document d : iter) {
List<Object> row = new ArrayList<>();
// Fix date formatting
row.add(d.get("year") + "-" + d.get("month") + "-01");
for (int i = 1; i < header.size(); i++) {
String p = header.get(i);
row.add(d.get(p + "_avg"));
}
filtered.add(row);
}
JSONObject res = new JSONObject(locationDoc.toJson());
res.put("header", header);
res.put("data", filtered);
return res;
}
private Document filterData(Date startDate, Date endDate) {
Document datesQuery = new Document();
if ((startDate != null || endDate != null)) {
if (startDate != null) {
datesQuery.append("$gte", startDate);
}
if (endDate != null) {
datesQuery.append("$lte", endDate);
}
}
return datesQuery;
}
private Document projectData(List<String> header) {
Document project = new Document();
for (int i = 1; i < header.size(); i++) {
project.append(header.get(i), new Document("$arrayElemAt", asList("$data", i)));
}
project.append("month", new Document("$month", new Document("$arrayElemAt", asList("$data", 0))));
project.append("year", new Document("$year", new Document("$arrayElemAt", asList("$data", 0))));
return project;
}
private Document groupData(List<String> header) {
Document group = new Document("_id", new Document("month", "$month").append("year", "$year"));
for (int i = 1; i < header.size(); i++) {
group.append(header.get(i) + "_avg", new Document("$avg", "$" + header.get(i)));
}
group.append("month", new Document("$first", "$month"));
group.append("year", new Document("$first", "$year"));
return group;
}
public List<Bson> getMatch(JSONObject feat, String search_id) throws JSONException, IOException {
String geom = feat.getJSONObject("geometry").toString();
Document match_arg = new Document("location",
new Document("$geoWithin",
new Document("$geometry", Document.parse(geom))
)
);
if (search_id != null) {
// Check for object ID (will be a hexidecimal string)
boolean id_test = search_id.matches("\\p{XDigit}+");
if (id_test) {
ObjectId search_objid = new ObjectId(search_id);
match_arg.put("_id", search_objid);
} else {
match_arg.put("_id", search_id);
}
}
Document new_match = new Document("$match", match_arg);
List<Bson> agg_match = new ArrayList<>();
agg_match.add(new_match);
return agg_match;
}
public static void main(String[] args) throws FileNotFoundException, IOException, JSONException, ParseException, ServiceException {
V1_0 timeseries = new V1_0();
JSONObject searchFeature = new JSONObject("{\n"
+ " \"type\": \"FeatureCollection\",\n"
+ " \"features\": [\n"
+ " {\n"
+ " \"geometry\": {\n"
+ " \"type\": \"MultiPolygon\",\n"
+ " \"coordinates\": [\n"
+ " [\n"
+ " [\n"
+ " [\n"
+ " -105.09928580027,\n"
+ " 40.701406829918\n"
+ " ],\n"
+ " [\n"
+ " -105.09447928172,\n"
+ " 40.365324016903\n"
+ " ],\n"
+ " [\n"
+ " -104.72849722611,\n"
+ " 40.358522384503\n"
+ " ],\n"
+ " [\n"
+ " -104.72437735306,\n"
+ " 40.701927386425\n"
+ " ],\n"
+ " [\n"
+ " -105.09928580027,\n"
+ " 40.701406829918\n"
+ " ]\n"
+ " ]\n"
+ " ]\n"
+ " ]\n"
+ " },\n"
+ " \"type\": \"Feature\",\n"
+ " \"properties\": {\n"
+ " \"gid\": 1,\n"
+ " \"name\": \"gid 1\"\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ " }");
String search_id = null;
timeseries.run("mongodb://eds0.engr.colostate.edu:27017/csip_timeseries", "test_coll", searchFeature, search_id, "2014-04-04", "2014-05-08", "monthly");
}
}