V1_0.java [src/java/m/timeseries/insert] Revision: 16640ffd9a769fa1571ae755f818e68e0c1260f6  Date: Wed Nov 14 09:39:09 MST 2018
/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package m.timeseries.insert;

import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.UpdateOptions;
import javax.ws.rs.Path;
import csip.ModelDataService;
import csip.SessionLogger;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.TimeZone;
import org.bson.Document;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import utils.TimeseriesTable;
import csip.annotations.*;
        
/**
 * Mongodb timeseries data insertion
 *
 * @author Dave Patterson
 */
@Name("Timeseries insert")
@Description("Update a timeseries collection in mongo.")
@Author(org="CSU")
@License(License.MIT)

@Path("m/insert/1.0")
public class V1_0 extends ModelDataService {

    MongoClient mongo;
    MongoDatabase db;
    JSONObject results;

    protected void open(String mongoclienturi) {
        MongoClientURI u = new MongoClientURI(mongoclienturi);
        mongo = new MongoClient(u);
        if (u.getDatabase() == null) {
            throw new RuntimeException("Missing database in mongo uri: " + mongoclienturi);
        }
        if (!u.getDatabase().equals("csip_timeseries")) {
            throw new RuntimeException("Can only modify csip_timeseries collection");
        }
        db = mongo.getDatabase(u.getDatabase());
    }


    protected void close() {
        mongo.close();
    }


    @Override
    public void doProcess() throws Exception {
        String mongo_uri = getStringParam("mongo_uri");
        String mongo_collection = getStringParam("mongo_collection");
        String doc_id = getStringParam("id", null);
        String data = getStringParam("data");
        File data_file = getWorkspaceFile(data);
        String data_col = getStringParam("data_column", null);
        JSONObject metadata = getJSONParam("metadata");
        String date_fmt = getStringParam("date_format");
        JSONArray locationArray = getJSONArrayParam("location", null);
        List<Double> location = null;
        if (locationArray != null) {
            location = Arrays.asList(Double.parseDouble(locationArray.get(0).toString()), Double.parseDouble(locationArray.get(1).toString()));
        }
        run(mongo_uri, mongo_collection, doc_id, data_file, data_col, metadata, date_fmt, location, LOG);
    }
    
    void run(String mongo_uri, String mongo_collection, String doc_id, File data_file, String data_col, JSONObject metadata,
            String date_fmt, List<Double> location, SessionLogger LOG) throws FileNotFoundException, IOException, JSONException, ParseException {

        open(mongo_uri);
        MongoCollection<Document> collection = db.getCollection(mongo_collection);

        SimpleDateFormat ft = new SimpleDateFormat(date_fmt);
        ft.setTimeZone(TimeZone.getTimeZone("UTC"));

        // Create single document to store the data
        Document doc = new Document();

        Document find = new Document().append("_id", doc_id);
        FindIterable<Document> c = collection.find(find);
        for (Document finddoc: c) {
            doc = finddoc;
            LOG.info("Updating existing doc");
        }
        
        // Load current timeseries data
        TimeseriesTable tbl = new TimeseriesTable(doc);           
        // Append new data
        tbl.readCSV(data_file, ft, data_col);
            
        if (doc_id != null) {
            doc.append("_id", doc_id);
        }

        Document metadata_doc = new Document();
        Iterator itr = metadata.keys();
        while (itr.hasNext()) {
            String key = (String)itr.next();
            metadata_doc.put(key, metadata.get(key));
        }
        doc.append("metadata", metadata_doc);
        doc.append("header", tbl.header);
        doc.append("data", tbl.data);
        doc.append("date_fmt", date_fmt);
        doc.append("location", new Document().append("type", "Point").append("coordinates", location));
        
        if (doc_id == null) {
            collection.insertOne(doc);
        }
        else {
            Document query = new Document().append("_id", doc_id);
            UpdateOptions options = new UpdateOptions().upsert(true);
            collection.replaceOne(query, doc, options);
        }
        results = new JSONObject();
        results.put("updated_document_id", doc.get("_id").toString());
        close();
    }


    @Override
    protected void postProcess() throws Exception {
        putResult("result", results);
    }
    
    public static void main(String [] args) throws FileNotFoundException, IOException, JSONException, ParseException {
        V1_0 timeseries = new V1_0();
        JSONObject metadata = new JSONObject();
        metadata.put("title", "main test");
        metadata.put("author", "Dave");
        File data_file = new File("/home/dave/work/csip-timeseries/test/service_tests/m/insert/V1_0/insert-req/data_append.csv");
        SessionLogger LOG = new SessionLogger();
        List<Double> location = Arrays.asList(-105.0, 40.5);
        timeseries.run("mongodb://eds0.engr.colostate.edu:27017/csip_timeseries", "test_coll", "test_ref", data_file, "Total Precip", metadata, "yyyy-MM-dd", location, LOG);
        
        // Append some data. New data start at index 942.
        data_file = new File("/home/dave/work/csip-timeseries/test/service_tests/m/insert/V1_0/insert-req/data.csv");
        timeseries.run("mongodb://eds0.engr.colostate.edu:27017/csip_timeseries", "test_coll", "test_ref", data_file, "Total Precip", metadata, "yyyy-MM-dd", location, LOG);   
    }
}