Displaying differences for changeset
 
display as  

src/java/m/timeseries/insert/V1_0.java

@@ -7,12 +7,12 @@
 
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientURI;
-import com.mongodb.client.FindIterable;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
 import com.mongodb.client.model.UpdateOptions;
 import javax.ws.rs.Path;
 import csip.ModelDataService;
+import csip.ServiceException;
 import csip.SessionLogger;
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -66,83 +66,99 @@
 
 
     @Override
-    public void doProcess() throws Exception {
-        String mongo_uri = getStringParam("mongo_uri");
-        String mongo_collection = getStringParam("mongo_collection");
-        String doc_id = getStringParam("id", null);
+    public void doProcess() throws ServiceException, JSONException, IOException, FileNotFoundException, ParseException {
+        String mongoUri = getStringParam("mongo_uri");
+        String mongoCollection = getStringParam("mongo_collection");
+        String docId = getStringParam("id", null);
         String data = getStringParam("data");
-        File data_file = getWorkspaceFile(data);
-        String data_col = getStringParam("data_column", null);
+        File dataFile = getWorkspaceFile(data);
+        String dataCol = getStringParam("data_column", null);
         JSONObject metadata = getJSONParam("metadata");
-        String date_fmt = getStringParam("date_format");
+        String dataFmt = getStringParam("date_format");
         JSONArray locationArray = getJSONArrayParam("location", null);
         List<Double> location = null;
         if (locationArray != null) {
             location = Arrays.asList(Double.parseDouble(locationArray.get(0).toString()), Double.parseDouble(locationArray.get(1).toString()));
         }
-        run(mongo_uri, mongo_collection, doc_id, data_file, data_col, metadata, date_fmt, location, LOG);
+        String chunkStrategy = getStringParam("chunk_strategy", TimeseriesTable.CHUNK_YEAR);
+        run(mongoUri, mongoCollection, docId, dataFile, dataCol, metadata, dataFmt, location, chunkStrategy, LOG);
     }
     
-    void run(String mongo_uri, String mongo_collection, String doc_id, File data_file, String data_col, JSONObject metadata,
-            String date_fmt, List<Double> location, SessionLogger LOG) throws FileNotFoundException, IOException, JSONException, ParseException {
+    /**
+     * 
+     * @param mongoUri
+     * @param mongoCollection
+     * @param docId
+     * @param dataFile
+     * @param dataCol
+     * @param metadata
+     * @param dataFmt
+     * @param location
+     * @param LOG
+     * @throws FileNotFoundException
+     * @throws IOException
+     * @throws JSONException
+     * @throws ParseException
+     * @throws ServiceException 
+     */
+    void run(String mongoUri, String mongoCollection, String docId, File dataFile, String dataCol, JSONObject metadata,
+            String dataFmt, List<Double> location, String chunkStrategy, SessionLogger LOG)
+            throws FileNotFoundException, IOException, JSONException, ParseException, ServiceException {
 
-        open(mongo_uri);
-        MongoCollection<Document> collection = db.getCollection(mongo_collection);
+        open(mongoUri);
+        MongoCollection<Document> collection = db.getCollection(mongoCollection);
 
-        SimpleDateFormat ft = new SimpleDateFormat(date_fmt);
+        SimpleDateFormat ft = new SimpleDateFormat(dataFmt);
         ft.setTimeZone(TimeZone.getTimeZone("UTC"));
 
-        // Create single document to store the data
-        Document doc = new Document();
-
-        Document find = new Document().append("_id", doc_id);
-        FindIterable<Document> c = collection.find(find);
-        for (Document finddoc: c) {
-            doc = finddoc;
-            LOG.info("Updating existing doc");
-        }
+        TimeseriesTable tbl = new TimeseriesTable();
         
-        // Load current timeseries data
-        TimeseriesTable tbl = new TimeseriesTable(doc);           
-        // Append new data
-        tbl.readCSV(data_file, ft, data_col);
+        // Insert data
+        tbl.readCSV(dataFile, ft, dataCol, chunkStrategy);
             
-        if (doc_id != null) {
-            doc.append("_id", doc_id);
+        // Create document with location metadata
+        Document locationDoc = new Document();
+        if (docId != null) {
+            locationDoc.append("_id", docId);
         }
 
-        Document metadata_doc = new Document();
+        // Use a subdocument for the metadata.
+        Document metadataDoc = new Document();
         Iterator itr = metadata.keys();
         while (itr.hasNext()) {
             String key = (String)itr.next();
-            metadata_doc.put(key, metadata.get(key));
+            metadataDoc.put(key, metadata.get(key));
         }
-        doc.append("metadata", metadata_doc);
-        doc.append("header", tbl.header);
-        doc.append("data", tbl.data);
-        doc.append("date_fmt", date_fmt);
-        doc.append("location", new Document().append("type", "Point").append("coordinates", location));
+        locationDoc.append("metadata", metadataDoc);
+        locationDoc.append("header", tbl.header);
+        locationDoc.append("date_fmt", dataFmt);
+        if (location != null) {
+            locationDoc.append("location", new Document().append("type", "Point").append("coordinates", location));
+        }
         
-        if (doc_id == null) {
-            collection.insertOne(doc);
+        if (docId == null) {
+            collection.insertOne(locationDoc);
         }
         else {
-            Document query = new Document().append("_id", doc_id);
+            Document query = new Document().append("_id", docId);
             UpdateOptions options = new UpdateOptions().upsert(true);
-            collection.replaceOne(query, doc, options);
+            collection.replaceOne(query, locationDoc, options);
         }
         results = new JSONObject();
-        results.put("updated_document_id", doc.get("_id").toString());
+        results.put("updated_document_id", locationDoc.get("_id").toString());
+        
+        tbl.insertData(collection, locationDoc);
+        
         close();
     }
 
 
     @Override
-    protected void postProcess() throws Exception {
+    protected void postProcess() {
         putResult("result", results);
     }
     
-    public static void main(String [] args) throws FileNotFoundException, IOException, JSONException, ParseException {
+    public static void main(String [] args) throws FileNotFoundException, IOException, JSONException, ParseException, ServiceException {
         V1_0 timeseries = new V1_0();
         JSONObject metadata = new JSONObject();
         metadata.put("title", "main test");
@@ -150,10 +166,12 @@
         File data_file = new File("/home/dave/work/csip-timeseries/test/service_tests/m/insert/V1_0/insert-req/data_append.csv");
         SessionLogger LOG = new SessionLogger();
         List<Double> location = Arrays.asList(-105.0, 40.5);
-        timeseries.run("mongodb://eds0.engr.colostate.edu:27017/csip_timeseries", "test_coll", "test_ref", data_file, "Total Precip", metadata, "yyyy-MM-dd", location, LOG);
+        timeseries.run("mongodb://eds0.engr.colostate.edu:27017/csip_timeseries", "test_coll", "test_ref",
+                data_file, "Total Precip", metadata, "yyyy-MM-dd", location, TimeseriesTable.CHUNK_YEAR, LOG);
         
         // Append some data. New data start at index 942.
         data_file = new File("/home/dave/work/csip-timeseries/test/service_tests/m/insert/V1_0/insert-req/data.csv");
-        timeseries.run("mongodb://eds0.engr.colostate.edu:27017/csip_timeseries", "test_coll", "test_ref", data_file, "Total Precip", metadata, "yyyy-MM-dd", location, LOG);   
+        timeseries.run("mongodb://eds0.engr.colostate.edu:27017/csip_timeseries", "test_coll", "test_ref",
+                data_file, "Total Precip", metadata, "yyyy-MM-dd", location, TimeseriesTable.CHUNK_YEAR, LOG);   
     }
 }

src/java/m/timeseries/insert/V1_0.json

@@ -33,6 +33,10 @@
             "value": "yyyy-MM-dd",
             "description": "Java SimpleDateFormatter format."
         }, {
+            "name": "chunk_strategy",
+            "value": "CHUNK_YEAR",
+            "description": "One of [CHUNK_ALL, CHUNK_NONE, CHUNK_YEAR]"
+        }, {
             "name": "data",
             "value": "data.csv",
             "description": "Standard csv format. First line is the header. First column must be date. Successive columns are floating point data."

src/java/m/timeseries/query/V1_0.java

@@ -107,22 +107,26 @@
             }
         }
 
+        Document locationsOnly = new Document().append("location_id", new Document().append("$exists", false));
         if (search_feature == null) {
             FindIterable<Document> c;
             if (search_id == null) {
-                c = collection.find();
+                // No search and no location filter, so get all docs in collection
+                c = collection.find(locationsOnly);
             } else {
+                // Search by document ID.
                 // Check for object ID (will be a hexidecimal string)
                 boolean id_test = search_id.matches("\\p{XDigit}+");
                 if (id_test) {
                     ObjectId search_objid = new ObjectId(search_id);
                     c = collection.find(Filters.eq("_id", search_objid));
                 } else {
+                    // Simple search using string
                     c = collection.find(Filters.eq("_id", search_id));
                 }
             }
-            for (Document doc : c) {
-                JSONObject res = getResult(collection, doc, start_date, end_date, metadata_only, compute_stats, compute_stats_local);
+            for (Document locationDoc : c) {
+                JSONObject res = getResult(collection, locationDoc, start_date, end_date, metadata_only, compute_stats, compute_stats_local);
                 results.put(res);
             }
         } else {
@@ -136,8 +140,8 @@
                     throw new ServiceException(ex);
                 }
                 AggregateIterable<Document> c = collection.aggregate(agg_match);
-                for (Document doc : c) {
-                    JSONObject res = getResult(collection, doc, start_date, end_date, metadata_only, compute_stats, compute_stats_local);
+                for (Document locationDoc : c) {
+                    JSONObject res = getResult(collection, locationDoc, start_date, end_date, metadata_only, compute_stats, compute_stats_local);
                     results.put(res);
                 }
             }
@@ -146,15 +150,17 @@
     }
 
 
-    private JSONObject getResult(MongoCollection<Document> c, Document doc, Date start_date, Date end_date,
+    private JSONObject getResult(MongoCollection<Document> c, Document locationDoc, Date start_date, Date end_date,
             boolean metadata_only, boolean compute_stats, boolean compute_stats_local) throws JSONException {
-        String date_fmt = (String) doc.get("date_fmt");
+        String date_fmt = (String) locationDoc.get("date_fmt");
+        if (date_fmt == null) {
+             date_fmt = "yyyy-MM-dd";
+        }
         SimpleDateFormat ft = new SimpleDateFormat(date_fmt);
         ft.setTimeZone(TimeZone.getTimeZone("UTC"));
 
-        List<String> header = (ArrayList) doc.get("header");
-        List<List<Object>> data = (ArrayList) doc.get("data");
-        JSONArray output = new JSONArray();
+        List<String> header = (ArrayList) locationDoc.get("header");
+        List<List<Object>> data = (ArrayList) locationDoc.get("data");
         List<List<Object>> filtered = new ArrayList();
         JSONObject statResult = new JSONObject();
         JSONObject statResultLocal = new JSONObject();
@@ -183,7 +189,7 @@
 
             // Export for robomongo
             List<Bson> aggList = Arrays.asList(
-                Aggregates.match(new Document("_id", doc.getString("_id"))),
+                Aggregates.match(new Document("location_id", locationDoc.getString("_id"))),
                 Aggregates.unwind("$data"),
                 //Aggregates.project(project)
                 Aggregates.match(datesQuery),
@@ -211,7 +217,7 @@
                 }
 
                 aggList = Arrays.asList(
-                    Aggregates.match(new Document("_id", doc.getString("_id"))),
+                    Aggregates.match(new Document("_id", locationDoc.getString("_id"))),
                     Aggregates.unwind("$data"),
                     Aggregates.match(datesQuery),
                     Aggregates.project(project),
@@ -251,15 +257,17 @@
         }
 
         JSONObject res = new JSONObject();
-        res.put("_id", doc.get("_id"));
-        Document location = (Document) doc.get("location");
-        List<Double> coords = (List<Double>) location.get("coordinates");
-        res.put("location", coords);
+        res.put("_id", locationDoc.get("_id"));
+        Document location = (Document) locationDoc.get("location");
+        if (location != null) {
+            List<Double> coords = (List<Double>) location.get("coordinates");
+            res.put("location", coords);
+        }
         res.put("header", header);
         if (!metadata_only) {
             res.put("data", filtered);
         }
-        Document metadata = (Document) doc.get("metadata");
+        Document metadata = (Document) locationDoc.get("metadata");
         res.put("metadata", new JSONObject(metadata.toJson()));
         
         if (compute_stats) {

src/java/utils/TimeseriesTable.java

@@ -6,91 +6,92 @@
 
 package utils;
 
+import com.mongodb.client.MongoCollection;
 import com.opencsv.CSVReader;
+import csip.ServiceException;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.time.LocalDate;
+import java.time.ZoneId;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
-import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.TreeSet;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.bson.Document;
 
 /**
  *
  * @author dave
  */
+
+
 public class TimeseriesTable {
+    public static String CHUNK_ALL = "CHUNK_ALL";
+    public static String CHUNK_YEAR = "CHUNK_YEAR";
+    public static String CHUNK_NONE = "CHUNK_NONE";
+
+    // Header is a list of strings (date, colname1, colname2...
     public List<String> header = new ArrayList();
-    public List<List<Object>> data = new ArrayList();
-    Comparator timeseries_compare = (Comparator<List<Object>>) (List<Object> lhs, List<Object> rhs) -> {
-        Date lhs_date = (Date)lhs.get(0);
-        Date rhs_date = (Date)rhs.get(0);
-        return lhs_date.compareTo(rhs_date);
+    // Data is a pair of [year, rows], where rows is an array of [dt, val1, val2...]
+    public List<Pair<Integer, List<List<Object>>>> data = new ArrayList<>();
+    Comparator timeseriesCompare = (Comparator<List<Object>>) (List<Object> lhs, List<Object> rhs) -> {
+        Date lhsDate = (Date)lhs.get(0);
+        Date rhsDate = (Date)rhs.get(0);
+        return lhsDate.compareTo(rhsDate);
     };
     
-    public TimeseriesTable(Document timeseries_doc) {
-        List<String> doc_header = (List<String>)timeseries_doc.get("header");
+    public TimeseriesTable() {
+    }
+    
+    public TimeseriesTable(Document timeseriesDoc) throws ServiceException {
+        List<String> doc_header = (List<String>)timeseriesDoc.get("header");
         if (doc_header != null) {
             header.addAll(doc_header);
-            data.addAll((List<List<Object>>)timeseries_doc.get("data"));
+            //data.addAll((List<List<Object>>)timeseriesDoc.get("data"));
+            throw new ServiceException("This version of the constructor is not implemented");
         }
     }
     
+    public void readCSV(File dataFile, SimpleDateFormat ft, String dataCol)
+            throws FileNotFoundException, IOException, ParseException, ServiceException {
+        readCSV(dataFile, ft, dataCol, CHUNK_NONE);
+    }
+    
     /**
      * 
-     * @param new_header
-     * @param new_data
-     * @param data_col 
-     */
-    public void add(List<String> new_header, List<List<Object>> new_data, String data_col) {
-        header = new_header;
-        // Use LinkedHashSet to removed duplocate
-        TreeSet<List<Object>> hashed_data = new TreeSet<>(timeseries_compare);
-        // Add original data
-        hashed_data.addAll(data);
-        // Remove entries with same date as new_data
-        hashed_data.removeAll(new_data);
-        // Add new_data. Hashed_data should have only updated versions of duplicates.
-        hashed_data.addAll(new_data);
-        data = new ArrayList<List<Object>>(hashed_data);
-        //sort();
-    }
-    
-    /**
-     * 
-     * @param data_file
+     * @param dataFile
      * @param ft
-     * @param data_col If non-null, then only add data from this column 
+     * @param dataCol If non-null, then only add data from this column 
+     * @param chunkStrat
      * @throws FileNotFoundException
      * @throws IOException
      * @throws ParseException 
      */
-    public void readCSV(File data_file, SimpleDateFormat ft, String data_col) throws FileNotFoundException, IOException, ParseException {
-        CSVReader reader = new CSVReader(new FileReader(data_file.getAbsolutePath()));
+    public void readCSV(File dataFile, SimpleDateFormat ft, String dataCol, String chunkStrat)
+            throws FileNotFoundException, IOException, ParseException, ServiceException {
+        CSVReader reader = new CSVReader(new FileReader(dataFile.getAbsolutePath()));
         String[] nextLine;
         
         // Data is stored as array of arrays of [date, val1, val2]
-        List<List<Object>> csv_data = new ArrayList();
-        List<String> csv_header = new ArrayList();
+        List<List<Object>> csvData = new ArrayList();
+        List<String> csvHeader = new ArrayList();
         int idatacol = 0;
         while ((nextLine = reader.readNext()) != null) {
-            if (csv_header.isEmpty()) {
-                csv_header.add(nextLine[0]);
+            if (csvHeader.isEmpty()) {
+                csvHeader.add(nextLine[0]);
                 for (int i=1; i<nextLine.length; i++) {
                     String headerItem = nextLine[i].trim();
-                    if (data_col == null) {
-                        csv_header.add(headerItem);
-                    } else if (headerItem.equals(data_col)) {
+                    if (dataCol == null) {
+                        csvHeader.add(headerItem);
+                    } else if (headerItem.equals(dataCol)) {
                         idatacol = i;
-                        csv_header.add(headerItem);
+                        csvHeader.add(headerItem);
                     }
                 }
             }
@@ -103,14 +104,76 @@
                         vals.add(Double.parseDouble(nextLine[i]));
                     }
                 }
-                csv_data.add(vals);
+                csvData.add(vals);
             }
         }
         
-        add(csv_header, csv_data, data_col);
+        header = csvHeader;
+        add(csvData, chunkStrat);
     }
 
-    public void sort() {
-        Collections.sort(data, timeseries_compare);
+    /**
+     * 
+     * @param newHeader
+     * @param newData
+     * @param chunkStrat 
+     * @throws csip.ServiceException 
+     */
+    public void add(List<List<Object>> newData, String chunkStrat) throws ServiceException {
+        if (chunkStrat.equals(CHUNK_ALL)) {
+            // Single document mode
+            addYearData(0, newData);
+        } else if (chunkStrat.equals(CHUNK_NONE)) {
+            // Every timeseries item is a document. Index is from 0..n-1
+            for (int i=0; i<newData.size(); i++) {
+                List<List<Object>> rows = new ArrayList<>();
+                rows.add(newData.get(i));
+                addYearData(i, rows);
+            }
+        } else if (chunkStrat.equals(CHUNK_YEAR)) {
+            // Group data array into years
+            int year = 0;
+            List<List<Object>> yearRows = null;
+            for (List<Object> row : newData) {
+                Date date = (Date)row.get(0);
+                LocalDate localDate = date.toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
+                int this_year = localDate.getYear();
+                if (this_year != year) {
+                    // Year changed, add the pair of year and current list of rows.
+                    addYearData(year, yearRows);
+                    year = this_year;
+                    yearRows = new ArrayList<>();
+                }
+                yearRows.add(row);
+            }
+            // Add last group of years
+            addYearData(year, yearRows);
+        } else {
+            throw new ServiceException("Invalid chunk strategy " + chunkStrat);
+        }
+    }
+    
+    public void addYearData(Integer year, List<List<Object>> rows) {
+        if (rows != null && rows.size() > 0) {
+            MutablePair<Integer, List<List<Object>>> pair = new MutablePair<>(year, rows);
+            data.add(pair);
+        }
+    }
+    
+    /**
+     * Insert the data into mongodb.
+     * @param locationDoc The organizing document with location info
+     */
+    public void insertData(MongoCollection collection, Document locationDoc) {
+        // Create a document with each chunk
+        for (Pair<Integer, List<List<Object>>> chunk : data) {
+            int year = chunk.getLeft();
+            List<List<Object>> rows = chunk.getRight();
+            Document chunkDoc = new Document()
+                    .append("location_id", locationDoc.get("_id"))
+                    .append("year", year)
+                    .append("data", rows);
+            collection.insertOne(chunkDoc);
+        }
     }
 }

test/service_tests/m/insert/V1_0/insert-no-location-req.json

@@ -21,6 +21,11 @@
             },
             "description": "The metadata will be added to the document."
         }, {
+            "name": "chunk_strategy",
+            "value": "CHUNK_ALL",
+            "description": "One of [CHUNK_ALL, CHUNK_NONE, CHUNK_YEAR]"
+
+        }, {
             "name": "date_format",
             "value": "yyyy-MM-dd",
             "description": "Java SimpleDateFormatter format."

test/service_tests/m/insert/V1_0/insert-req.json

@@ -25,6 +25,10 @@
             "value": [-105, 40.5],
             "description": "[longitude, latitude]"
         }, {
+            "name": "chunk_strategy",
+            "value": "CHUNK_NONE",
+            "description": "One of [CHUNK_ALL, CHUNK_NONE, CHUNK_YEAR]"
+        }, {
             "name": "date_format",
             "value": "yyyy-MM-dd",
             "description": "Java SimpleDateFormatter format."

test/service_tests/m/insert/V1_0/service.properties

@@ -1,5 +1,6 @@
 # service endpoint, required
 url=http://localhost:8080/csip-timeseries/m/insert/1.0
+#url=http://csip.engr.colostate.edu:8087/csip-timeseries/m/insert/1.0
 
 # number of concurrent tests, optional, defaults to 1
 concurrency=1

test/service_tests/m/query/V1_0/service.properties

@@ -1,5 +1,6 @@
 # service endpoint, required
 url=http://localhost:8080/csip-timeseries/m/query/1.0
+#url=http://csip.engr.colostate.edu:8087/csip-timeseries/m/query/1.0
 
 # number of concurrent tests, optional, defaults to 1
 concurrency=1