V1_0.java [src/java/m/timeseries/insert] Revision: default Date:
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package m.timeseries.insert;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.UpdateOptions;
import javax.ws.rs.Path;
import csip.ModelDataService;
import csip.ServiceException;
import csip.SessionLogger;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.TimeZone;
import org.bson.Document;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import utils.TimeseriesTable;
import csip.annotations.*;
/**
* Mongodb timeseries data insertion
*
* @author Dave Patterson
*/
@Name("Timeseries insert")
@Description("Update a timeseries collection in mongo.")
@Author(org = "CSU")
@License(License.MIT)
@Path("m/insert/1.0")
public class V1_0 extends ModelDataService {
MongoClient mongo;
MongoDatabase db;
JSONObject results;
protected void open(String mongoclienturi) {
MongoClientURI u = new MongoClientURI(mongoclienturi);
mongo = new MongoClient(u);
if (u.getDatabase() == null) {
throw new RuntimeException("Missing database in mongo uri: " + mongoclienturi);
}
if (!u.getDatabase().equals("csip_timeseries")) {
throw new RuntimeException("Can only modify csip_timeseries collection");
}
db = mongo.getDatabase(u.getDatabase());
}
protected void close() {
mongo.close();
}
@Override
public void doProcess() throws ServiceException, JSONException, IOException, FileNotFoundException, ParseException {
String mongoUri = getStringParam("mongo_uri");
String mongoCollection = getStringParam("mongo_collection");
String docId = getStringParam("id", null);
String data = getStringParam("data");
File dataFile = getWorkspaceFile(data);
String dataCol = getStringParam("data_column", null);
JSONObject metadata = getJSONParam("metadata");
String dataFmt = getStringParam("date_format");
JSONArray locationArray = getJSONArrayParam("location", null);
List<Double> location = null;
if (locationArray != null) {
location = Arrays.asList(Double.parseDouble(locationArray.get(0).toString()), Double.parseDouble(locationArray.get(1).toString()));
}
String chunkStrategy = getStringParam("chunk_strategy", TimeseriesTable.CHUNK_YEAR);
run(mongoUri, mongoCollection, docId, dataFile, dataCol, metadata, dataFmt, location, chunkStrategy, LOG);
}
/**
* Add
* @param mongoUri
* @param mongoCollection
* @param docId Mongo collection ID of the document with metadata about the location
* @param dataFile The CSV file to insert
* @param dataCol The particular column of data to insert if more than one is present.
* leave null to insert all columns.
* @param metadata Additional information about the location
* @param dataFmt Date format of csv date column.
* @param location Point coordinates [longtitude, latitude]
* @param LOG optional logger
* @throws FileNotFoundException
* @throws IOException
* @throws JSONException
* @throws ParseException
* @throws ServiceException
*/
void run(String mongoUri, String mongoCollection, String docId, File dataFile, String dataCol, JSONObject metadata,
String dataFmt, List<Double> location, String chunkStrategy, SessionLogger LOG)
throws FileNotFoundException, IOException, JSONException, ParseException, ServiceException {
open(mongoUri);
MongoCollection<Document> collection = db.getCollection(mongoCollection);
SimpleDateFormat ft = new SimpleDateFormat(dataFmt);
ft.setTimeZone(TimeZone.getTimeZone("UTC"));
TimeseriesTable tbl = new TimeseriesTable();
// Insert data
tbl.readCSV(dataFile, ft, dataCol, chunkStrategy);
// Create document with location metadata
Document locationDoc = new Document();
if (docId != null) {
locationDoc.append("_id", docId);
}
// Use a subdocument for the metadata.
Document metadataDoc = new Document();
Iterator itr = metadata.keys();
while (itr.hasNext()) {
String key = (String) itr.next();
metadataDoc.put(key, metadata.get(key));
}
locationDoc.append("metadata", metadataDoc);
locationDoc.append("header", tbl.header);
locationDoc.append("date_fmt", dataFmt);
if (location != null) {
locationDoc.append("location", new Document().append("type", "Point").append("coordinates", location));
}
if (docId == null) {
collection.insertOne(locationDoc);
} else {
Document query = new Document().append("_id", docId);
UpdateOptions options = new UpdateOptions().upsert(true);
collection.replaceOne(query, locationDoc, options);
}
results = new JSONObject();
results.put("updated_document_id", locationDoc.get("_id").toString());
tbl.insertData(collection, locationDoc);
close();
}
@Override
protected void postProcess() {
putResult("result", results);
}
public static void main(String[] args) throws FileNotFoundException, IOException, JSONException, ParseException, ServiceException {
V1_0 timeseries = new V1_0();
JSONObject metadata = new JSONObject();
metadata.put("title", "main test");
metadata.put("author", "Dave");
File data_file = new File("/home/dave/work/csip-timeseries/test/service_tests/m/insert/V1_0/insert-req/data_append.csv");
SessionLogger LOG = new SessionLogger();
List<Double> location = Arrays.asList(-105.0, 40.5);
timeseries.run("mongodb://eds0.engr.colostate.edu:27017/csip_timeseries", "test_coll", "test_ref",
data_file, "Total Precip", metadata, "yyyy-MM-dd", location, TimeseriesTable.CHUNK_YEAR, LOG);
// Append some data. New data start at index 942.
data_file = new File("/home/dave/work/csip-timeseries/test/service_tests/m/insert/V1_0/insert-req/data.csv");
timeseries.run("mongodb://eds0.engr.colostate.edu:27017/csip_timeseries", "test_coll", "test_ref",
data_file, "Total Precip", metadata, "yyyy-MM-dd", location, TimeseriesTable.CHUNK_YEAR, LOG);
}
}