V1_0.java [src/java/m/timeseries/insert] Revision: 389b3d79059255a8a1db1540cd57a287e9dbd653 Date: Fri Aug 31 10:22:24 MDT 2018
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package m.timeseries.insert;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.UpdateOptions;
import javax.ws.rs.Path;
import csip.ModelDataService;
import csip.SessionLogger;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.TimeZone;
import org.bson.Document;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import utils.TimeseriesTable;
import csip.annotations.*;
/**
* Mongodb timeseries data insertion
*
* @author Dave Patterson
*/
@Name("Timeseries insert")
@Description("Update a timeseries collection in mongo.")
@Author(org="CSU")
@License(License.MIT)
@Path("m/insert/1.0")
public class V1_0 extends ModelDataService {
MongoClient mongo;
MongoDatabase db;
JSONObject results;
protected void open(String mongoclienturi) {
MongoClientURI u = new MongoClientURI(mongoclienturi);
mongo = new MongoClient(u);
if (u.getDatabase() == null) {
throw new RuntimeException("Missing database in mongo uri: " + mongoclienturi);
}
if (!u.getDatabase().equals("csip_timeseries")) {
throw new RuntimeException("Can only modify csip_timeseries collection");
}
db = mongo.getDatabase(u.getDatabase());
}
protected void close() {
mongo.close();
}
@Override
public void doProcess() throws Exception {
String mongo_uri = getStringParam("mongo_uri");
String mongo_collection = getStringParam("mongo_collection");
String doc_id = getStringParam("id", null);
String data = getStringParam("data");
File data_file = getWorkspaceFile(data);
String data_col = getStringParam("data_column", null);
JSONObject metadata = getJSONParam("metadata");
String date_fmt = getStringParam("date_format");
JSONArray locationArray = getJSONArrayParam("location");
List<Double> location = Arrays.asList(Double.parseDouble(locationArray.get(0).toString()), Double.parseDouble(locationArray.get(1).toString()));
run(mongo_uri, mongo_collection, doc_id, data_file, data_col, metadata, date_fmt, location, LOG);
}
void run(String mongo_uri, String mongo_collection, String doc_id, File data_file, String data_col, JSONObject metadata,
String date_fmt, List<Double> location, SessionLogger LOG) throws FileNotFoundException, IOException, JSONException, ParseException {
open(mongo_uri);
MongoCollection<Document> collection = db.getCollection(mongo_collection);
SimpleDateFormat ft = new SimpleDateFormat(date_fmt);
ft.setTimeZone(TimeZone.getTimeZone("UTC"));
// Create single document to store the data
Document doc = new Document();
Document find = new Document().append("_id", doc_id);
FindIterable<Document> c = collection.find(find);
for (Document finddoc: c) {
doc = finddoc;
LOG.info("Updating existing doc");
}
// Load current timeseries data
TimeseriesTable tbl = new TimeseriesTable(doc);
// Append new data
tbl.readCSV(data_file, ft, data_col);
if (doc_id != null) {
doc.append("_id", doc_id);
}
Document metadata_doc = new Document();
Iterator itr = metadata.keys();
while (itr.hasNext()) {
String key = (String)itr.next();
metadata_doc.put(key, metadata.get(key));
}
doc.append("metadata", metadata_doc);
doc.append("header", tbl.header);
doc.append("data", tbl.data);
doc.append("date_fmt", date_fmt);
doc.append("location", new Document().append("type", "Point").append("coordinates", location));
if (doc_id == null) {
collection.insertOne(doc);
}
else {
Document query = new Document().append("_id", doc_id);
UpdateOptions options = new UpdateOptions().upsert(true);
collection.replaceOne(query, doc, options);
}
results = new JSONObject();
results.put("updated_document_id", doc.get("_id").toString());
close();
}
@Override
protected void postProcess() throws Exception {
putResult("result", results);
}
public static void main(String [] args) throws FileNotFoundException, IOException, JSONException, ParseException {
V1_0 timeseries = new V1_0();
JSONObject metadata = new JSONObject();
metadata.put("title", "main test");
metadata.put("author", "Dave");
File data_file = new File("/home/dave/work/csip-timeseries/test/service_tests/m/insert/V1_0/insert-req/data_append.csv");
SessionLogger LOG = new SessionLogger();
List<Double> location = Arrays.asList(-105.0, 40.5);
timeseries.run("mongodb://eds0.engr.colostate.edu:27017/csip_timeseries", "test_coll", "test_ref", data_file, "Total Precip", metadata, "yyyy-MM-dd", location, LOG);
// Append some data. New data start at index 942.
data_file = new File("/home/dave/work/csip-timeseries/test/service_tests/m/insert/V1_0/insert-req/data.csv");
timeseries.run("mongodb://eds0.engr.colostate.edu:27017/csip_timeseries", "test_coll", "test_ref", data_file, "Total Precip", metadata, "yyyy-MM-dd", location, LOG);
}
}