V1_0.java [src/java/m/timeseries/insert] Revision: 4693200b9bab653b6a49775f228129a573bb3438  Date: Thu Nov 15 09:40:53 MST 2018
/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package m.timeseries.insert;

import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.UpdateOptions;
import javax.ws.rs.Path;
import csip.ModelDataService;
import csip.ServiceException;
import csip.SessionLogger;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.TimeZone;
import org.bson.Document;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import utils.TimeseriesTable;
import csip.annotations.*;
        
/**
 * Mongodb timeseries data insertion
 *
 * @author Dave Patterson
 */
@Name("Timeseries insert")
@Description("Update a timeseries collection in mongo.")
@Author(org="CSU")
@License(License.MIT)

@Path("m/insert/1.0")
public class V1_0 extends ModelDataService {

    MongoClient mongo;
    MongoDatabase db;
    JSONObject results;

    protected void open(String mongoclienturi) {
        MongoClientURI u = new MongoClientURI(mongoclienturi);
        mongo = new MongoClient(u);
        if (u.getDatabase() == null) {
            throw new RuntimeException("Missing database in mongo uri: " + mongoclienturi);
        }
        if (!u.getDatabase().equals("csip_timeseries")) {
            throw new RuntimeException("Can only modify csip_timeseries collection");
        }
        db = mongo.getDatabase(u.getDatabase());
    }


    protected void close() {
        mongo.close();
    }


    @Override
    public void doProcess() throws ServiceException, JSONException, IOException, FileNotFoundException, ParseException {
        String mongoUri = getStringParam("mongo_uri");
        String mongoCollection = getStringParam("mongo_collection");
        String docId = getStringParam("id", null);
        String data = getStringParam("data");
        File dataFile = getWorkspaceFile(data);
        String dataCol = getStringParam("data_column", null);
        JSONObject metadata = getJSONParam("metadata");
        String dataFmt = getStringParam("date_format");
        JSONArray locationArray = getJSONArrayParam("location", null);
        List<Double> location = null;
        if (locationArray != null) {
            location = Arrays.asList(Double.parseDouble(locationArray.get(0).toString()), Double.parseDouble(locationArray.get(1).toString()));
        }
        String chunkStrategy = getStringParam("chunk_strategy", TimeseriesTable.CHUNK_YEAR);
        run(mongoUri, mongoCollection, docId, dataFile, dataCol, metadata, dataFmt, location, chunkStrategy, LOG);
    }
    
    /**
     * 
     * @param mongoUri
     * @param mongoCollection
     * @param docId
     * @param dataFile
     * @param dataCol
     * @param metadata
     * @param dataFmt
     * @param location
     * @param LOG
     * @throws FileNotFoundException
     * @throws IOException
     * @throws JSONException
     * @throws ParseException
     * @throws ServiceException 
     */
    void run(String mongoUri, String mongoCollection, String docId, File dataFile, String dataCol, JSONObject metadata,
            String dataFmt, List<Double> location, String chunkStrategy, SessionLogger LOG)
            throws FileNotFoundException, IOException, JSONException, ParseException, ServiceException {

        open(mongoUri);
        MongoCollection<Document> collection = db.getCollection(mongoCollection);

        SimpleDateFormat ft = new SimpleDateFormat(dataFmt);
        ft.setTimeZone(TimeZone.getTimeZone("UTC"));

        TimeseriesTable tbl = new TimeseriesTable();
        
        // Insert data
        tbl.readCSV(dataFile, ft, dataCol, chunkStrategy);
            
        // Create document with location metadata
        Document locationDoc = new Document();
        if (docId != null) {
            locationDoc.append("_id", docId);
        }

        // Use a subdocument for the metadata.
        Document metadataDoc = new Document();
        Iterator itr = metadata.keys();
        while (itr.hasNext()) {
            String key = (String)itr.next();
            metadataDoc.put(key, metadata.get(key));
        }
        locationDoc.append("metadata", metadataDoc);
        locationDoc.append("header", tbl.header);
        locationDoc.append("date_fmt", dataFmt);
        if (location != null) {
            locationDoc.append("location", new Document().append("type", "Point").append("coordinates", location));
        }
        
        if (docId == null) {
            collection.insertOne(locationDoc);
        }
        else {
            Document query = new Document().append("_id", docId);
            UpdateOptions options = new UpdateOptions().upsert(true);
            collection.replaceOne(query, locationDoc, options);
        }
        results = new JSONObject();
        results.put("updated_document_id", locationDoc.get("_id").toString());
        
        tbl.insertData(collection, locationDoc);
        
        close();
    }


    @Override
    protected void postProcess() {
        putResult("result", results);
    }
    
    public static void main(String [] args) throws FileNotFoundException, IOException, JSONException, ParseException, ServiceException {
        V1_0 timeseries = new V1_0();
        JSONObject metadata = new JSONObject();
        metadata.put("title", "main test");
        metadata.put("author", "Dave");
        File data_file = new File("/home/dave/work/csip-timeseries/test/service_tests/m/insert/V1_0/insert-req/data_append.csv");
        SessionLogger LOG = new SessionLogger();
        List<Double> location = Arrays.asList(-105.0, 40.5);
        timeseries.run("mongodb://eds0.engr.colostate.edu:27017/csip_timeseries", "test_coll", "test_ref",
                data_file, "Total Precip", metadata, "yyyy-MM-dd", location, TimeseriesTable.CHUNK_YEAR, LOG);
        
        // Append some data. New data start at index 942.
        data_file = new File("/home/dave/work/csip-timeseries/test/service_tests/m/insert/V1_0/insert-req/data.csv");
        timeseries.run("mongodb://eds0.engr.colostate.edu:27017/csip_timeseries", "test_coll", "test_ref",
                data_file, "Total Precip", metadata, "yyyy-MM-dd", location, TimeseriesTable.CHUNK_YEAR, LOG);   
    }
}