@@ -7,12 +7,12 @@ |
|
import com.mongodb.MongoClient; |
import com.mongodb.MongoClientURI; |
-import com.mongodb.client.FindIterable; |
import com.mongodb.client.MongoCollection; |
import com.mongodb.client.MongoDatabase; |
import com.mongodb.client.model.UpdateOptions; |
import javax.ws.rs.Path; |
import csip.ModelDataService; |
+import csip.ServiceException; |
import csip.SessionLogger; |
import java.io.File; |
import java.io.FileNotFoundException; |
@@ -66,83 +66,99 @@ |
|
|
@Override |
- public void doProcess() throws Exception { |
- String mongo_uri = getStringParam("mongo_uri"); |
- String mongo_collection = getStringParam("mongo_collection"); |
- String doc_id = getStringParam("id", null); |
+ public void doProcess() throws ServiceException, JSONException, IOException, FileNotFoundException, ParseException { |
+ String mongoUri = getStringParam("mongo_uri"); |
+ String mongoCollection = getStringParam("mongo_collection"); |
+ String docId = getStringParam("id", null); |
String data = getStringParam("data"); |
- File data_file = getWorkspaceFile(data); |
- String data_col = getStringParam("data_column", null); |
+ File dataFile = getWorkspaceFile(data); |
+ String dataCol = getStringParam("data_column", null); |
JSONObject metadata = getJSONParam("metadata"); |
- String date_fmt = getStringParam("date_format"); |
+ String dataFmt = getStringParam("date_format"); |
JSONArray locationArray = getJSONArrayParam("location", null); |
List<Double> location = null; |
if (locationArray != null) { |
location = Arrays.asList(Double.parseDouble(locationArray.get(0).toString()), Double.parseDouble(locationArray.get(1).toString())); |
} |
- run(mongo_uri, mongo_collection, doc_id, data_file, data_col, metadata, date_fmt, location, LOG); |
+ String chunkStrategy = getStringParam("chunk_strategy", TimeseriesTable.CHUNK_YEAR); |
+ run(mongoUri, mongoCollection, docId, dataFile, dataCol, metadata, dataFmt, location, chunkStrategy, LOG); |
} |
|
- void run(String mongo_uri, String mongo_collection, String doc_id, File data_file, String data_col, JSONObject metadata, |
- String date_fmt, List<Double> location, SessionLogger LOG) throws FileNotFoundException, IOException, JSONException, ParseException { |
+ /** |
+ * |
+ * @param mongoUri |
+ * @param mongoCollection |
+ * @param docId |
+ * @param dataFile |
+ * @param dataCol |
+ * @param metadata |
+ * @param dataFmt |
+ * @param location |
+ * @param LOG |
+ * @throws FileNotFoundException |
+ * @throws IOException |
+ * @throws JSONException |
+ * @throws ParseException |
+ * @throws ServiceException |
+ */ |
+ void run(String mongoUri, String mongoCollection, String docId, File dataFile, String dataCol, JSONObject metadata, |
+ String dataFmt, List<Double> location, String chunkStrategy, SessionLogger LOG) |
+ throws FileNotFoundException, IOException, JSONException, ParseException, ServiceException { |
|
- open(mongo_uri); |
- MongoCollection<Document> collection = db.getCollection(mongo_collection); |
+ open(mongoUri); |
+ MongoCollection<Document> collection = db.getCollection(mongoCollection); |
|
- SimpleDateFormat ft = new SimpleDateFormat(date_fmt); |
+ SimpleDateFormat ft = new SimpleDateFormat(dataFmt); |
ft.setTimeZone(TimeZone.getTimeZone("UTC")); |
|
- // Create single document to store the data |
- Document doc = new Document(); |
- |
- Document find = new Document().append("_id", doc_id); |
- FindIterable<Document> c = collection.find(find); |
- for (Document finddoc: c) { |
- doc = finddoc; |
- LOG.info("Updating existing doc"); |
- } |
+ TimeseriesTable tbl = new TimeseriesTable(); |
|
- // Load current timeseries data |
- TimeseriesTable tbl = new TimeseriesTable(doc); |
- // Append new data |
- tbl.readCSV(data_file, ft, data_col); |
+ // Insert data |
+ tbl.readCSV(dataFile, ft, dataCol, chunkStrategy); |
|
- if (doc_id != null) { |
- doc.append("_id", doc_id); |
+ // Create document with location metadata |
+ Document locationDoc = new Document(); |
+ if (docId != null) { |
+ locationDoc.append("_id", docId); |
} |
|
- Document metadata_doc = new Document(); |
+ // Use a subdocument for the metadata. |
+ Document metadataDoc = new Document(); |
Iterator itr = metadata.keys(); |
while (itr.hasNext()) { |
String key = (String)itr.next(); |
- metadata_doc.put(key, metadata.get(key)); |
+ metadataDoc.put(key, metadata.get(key)); |
} |
- doc.append("metadata", metadata_doc); |
- doc.append("header", tbl.header); |
- doc.append("data", tbl.data); |
- doc.append("date_fmt", date_fmt); |
- doc.append("location", new Document().append("type", "Point").append("coordinates", location)); |
+ locationDoc.append("metadata", metadataDoc); |
+ locationDoc.append("header", tbl.header); |
+ locationDoc.append("date_fmt", dataFmt); |
+ if (location != null) { |
+ locationDoc.append("location", new Document().append("type", "Point").append("coordinates", location)); |
+ } |
|
- if (doc_id == null) { |
- collection.insertOne(doc); |
+ if (docId == null) { |
+ collection.insertOne(locationDoc); |
} |
else { |
- Document query = new Document().append("_id", doc_id); |
+ Document query = new Document().append("_id", docId); |
UpdateOptions options = new UpdateOptions().upsert(true); |
- collection.replaceOne(query, doc, options); |
+ collection.replaceOne(query, locationDoc, options); |
} |
results = new JSONObject(); |
- results.put("updated_document_id", doc.get("_id").toString()); |
+ results.put("updated_document_id", locationDoc.get("_id").toString()); |
+ |
+ tbl.insertData(collection, locationDoc); |
+ |
close(); |
} |
|
|
@Override |
- protected void postProcess() throws Exception { |
+ protected void postProcess() { |
putResult("result", results); |
} |
|
- public static void main(String [] args) throws FileNotFoundException, IOException, JSONException, ParseException { |
+ public static void main(String [] args) throws FileNotFoundException, IOException, JSONException, ParseException, ServiceException { |
V1_0 timeseries = new V1_0(); |
JSONObject metadata = new JSONObject(); |
metadata.put("title", "main test"); |
@@ -150,10 +166,12 @@ |
File data_file = new File("/home/dave/work/csip-timeseries/test/service_tests/m/insert/V1_0/insert-req/data_append.csv"); |
SessionLogger LOG = new SessionLogger(); |
List<Double> location = Arrays.asList(-105.0, 40.5); |
- timeseries.run("mongodb://eds0.engr.colostate.edu:27017/csip_timeseries", "test_coll", "test_ref", data_file, "Total Precip", metadata, "yyyy-MM-dd", location, LOG); |
+ timeseries.run("mongodb://eds0.engr.colostate.edu:27017/csip_timeseries", "test_coll", "test_ref", |
+ data_file, "Total Precip", metadata, "yyyy-MM-dd", location, TimeseriesTable.CHUNK_YEAR, LOG); |
|
// Append some data. New data start at index 942. |
data_file = new File("/home/dave/work/csip-timeseries/test/service_tests/m/insert/V1_0/insert-req/data.csv"); |
- timeseries.run("mongodb://eds0.engr.colostate.edu:27017/csip_timeseries", "test_coll", "test_ref", data_file, "Total Precip", metadata, "yyyy-MM-dd", location, LOG); |
+ timeseries.run("mongodb://eds0.engr.colostate.edu:27017/csip_timeseries", "test_coll", "test_ref", |
+ data_file, "Total Precip", metadata, "yyyy-MM-dd", location, TimeseriesTable.CHUNK_YEAR, LOG); |
} |
} |
@@ -107,22 +107,26 @@ |
} |
} |
|
+ Document locationsOnly = new Document().append("location_id", new Document().append("$exists", false)); |
if (search_feature == null) { |
FindIterable<Document> c; |
if (search_id == null) { |
- c = collection.find(); |
+ // No search and no location filter, so get all docs in collection |
+ c = collection.find(locationsOnly); |
} else { |
+ // Search by document ID. |
// Check for object ID (will be a hexidecimal string) |
boolean id_test = search_id.matches("\\p{XDigit}+"); |
if (id_test) { |
ObjectId search_objid = new ObjectId(search_id); |
c = collection.find(Filters.eq("_id", search_objid)); |
} else { |
+ // Simple search using string |
c = collection.find(Filters.eq("_id", search_id)); |
} |
} |
- for (Document doc : c) { |
- JSONObject res = getResult(collection, doc, start_date, end_date, metadata_only, compute_stats, compute_stats_local); |
+ for (Document locationDoc : c) { |
+ JSONObject res = getResult(collection, locationDoc, start_date, end_date, metadata_only, compute_stats, compute_stats_local); |
results.put(res); |
} |
} else { |
@@ -136,8 +140,8 @@ |
throw new ServiceException(ex); |
} |
AggregateIterable<Document> c = collection.aggregate(agg_match); |
- for (Document doc : c) { |
- JSONObject res = getResult(collection, doc, start_date, end_date, metadata_only, compute_stats, compute_stats_local); |
+ for (Document locationDoc : c) { |
+ JSONObject res = getResult(collection, locationDoc, start_date, end_date, metadata_only, compute_stats, compute_stats_local); |
results.put(res); |
} |
} |
@@ -146,15 +150,17 @@ |
} |
|
|
- private JSONObject getResult(MongoCollection<Document> c, Document doc, Date start_date, Date end_date, |
+ private JSONObject getResult(MongoCollection<Document> c, Document locationDoc, Date start_date, Date end_date, |
boolean metadata_only, boolean compute_stats, boolean compute_stats_local) throws JSONException { |
- String date_fmt = (String) doc.get("date_fmt"); |
+ String date_fmt = (String) locationDoc.get("date_fmt"); |
+ if (date_fmt == null) { |
+ date_fmt = "yyyy-MM-dd"; |
+ } |
SimpleDateFormat ft = new SimpleDateFormat(date_fmt); |
ft.setTimeZone(TimeZone.getTimeZone("UTC")); |
|
- List<String> header = (ArrayList) doc.get("header"); |
- List<List<Object>> data = (ArrayList) doc.get("data"); |
- JSONArray output = new JSONArray(); |
+ List<String> header = (ArrayList) locationDoc.get("header"); |
+ List<List<Object>> data = (ArrayList) locationDoc.get("data"); |
List<List<Object>> filtered = new ArrayList(); |
JSONObject statResult = new JSONObject(); |
JSONObject statResultLocal = new JSONObject(); |
@@ -183,7 +189,7 @@ |
|
// Export for robomongo |
List<Bson> aggList = Arrays.asList( |
- Aggregates.match(new Document("_id", doc.getString("_id"))), |
+ Aggregates.match(new Document("location_id", locationDoc.getString("_id"))), |
Aggregates.unwind("$data"), |
//Aggregates.project(project) |
Aggregates.match(datesQuery), |
@@ -211,7 +217,7 @@ |
} |
|
aggList = Arrays.asList( |
- Aggregates.match(new Document("_id", doc.getString("_id"))), |
+ Aggregates.match(new Document("_id", locationDoc.getString("_id"))), |
Aggregates.unwind("$data"), |
Aggregates.match(datesQuery), |
Aggregates.project(project), |
@@ -251,15 +257,17 @@ |
} |
|
JSONObject res = new JSONObject(); |
- res.put("_id", doc.get("_id")); |
- Document location = (Document) doc.get("location"); |
- List<Double> coords = (List<Double>) location.get("coordinates"); |
- res.put("location", coords); |
+ res.put("_id", locationDoc.get("_id")); |
+ Document location = (Document) locationDoc.get("location"); |
+ if (location != null) { |
+ List<Double> coords = (List<Double>) location.get("coordinates"); |
+ res.put("location", coords); |
+ } |
res.put("header", header); |
if (!metadata_only) { |
res.put("data", filtered); |
} |
- Document metadata = (Document) doc.get("metadata"); |
+ Document metadata = (Document) locationDoc.get("metadata"); |
res.put("metadata", new JSONObject(metadata.toJson())); |
|
if (compute_stats) { |
@@ -6,91 +6,92 @@ |
|
package utils; |
|
+import com.mongodb.client.MongoCollection; |
import com.opencsv.CSVReader; |
+import csip.ServiceException; |
import java.io.File; |
import java.io.FileNotFoundException; |
import java.io.FileReader; |
import java.io.IOException; |
import java.text.ParseException; |
import java.text.SimpleDateFormat; |
+import java.time.LocalDate; |
+import java.time.ZoneId; |
import java.util.ArrayList; |
-import java.util.Arrays; |
-import java.util.Collections; |
import java.util.Comparator; |
import java.util.Date; |
-import java.util.LinkedHashSet; |
import java.util.List; |
-import java.util.TreeSet; |
+import org.apache.commons.lang3.tuple.MutablePair; |
+import org.apache.commons.lang3.tuple.Pair; |
import org.bson.Document; |
|
/** |
* |
* @author dave |
*/ |
+ |
+ |
public class TimeseriesTable { |
+ public static String CHUNK_ALL = "CHUNK_ALL"; |
+ public static String CHUNK_YEAR = "CHUNK_YEAR"; |
+ public static String CHUNK_NONE = "CHUNK_NONE"; |
+ |
+ // Header is a list of strings (date, colname1, colname2... |
public List<String> header = new ArrayList(); |
- public List<List<Object>> data = new ArrayList(); |
- Comparator timeseries_compare = (Comparator<List<Object>>) (List<Object> lhs, List<Object> rhs) -> { |
- Date lhs_date = (Date)lhs.get(0); |
- Date rhs_date = (Date)rhs.get(0); |
- return lhs_date.compareTo(rhs_date); |
+ // Data is a pair of [year, rows], where rows is an array of [dt, val1, val2...] |
+ public List<Pair<Integer, List<List<Object>>>> data = new ArrayList<>(); |
+ Comparator timeseriesCompare = (Comparator<List<Object>>) (List<Object> lhs, List<Object> rhs) -> { |
+ Date lhsDate = (Date)lhs.get(0); |
+ Date rhsDate = (Date)rhs.get(0); |
+ return lhsDate.compareTo(rhsDate); |
}; |
|
- public TimeseriesTable(Document timeseries_doc) { |
- List<String> doc_header = (List<String>)timeseries_doc.get("header"); |
+ public TimeseriesTable() { |
+ } |
+ |
+ public TimeseriesTable(Document timeseriesDoc) throws ServiceException { |
+ List<String> doc_header = (List<String>)timeseriesDoc.get("header"); |
if (doc_header != null) { |
header.addAll(doc_header); |
- data.addAll((List<List<Object>>)timeseries_doc.get("data")); |
+ //data.addAll((List<List<Object>>)timeseriesDoc.get("data")); |
+ throw new ServiceException("This version of the constructor is not implemented"); |
} |
} |
|
+ public void readCSV(File dataFile, SimpleDateFormat ft, String dataCol) |
+ throws FileNotFoundException, IOException, ParseException, ServiceException { |
+ readCSV(dataFile, ft, dataCol, CHUNK_NONE); |
+ } |
+ |
/** |
* |
- * @param new_header |
- * @param new_data |
- * @param data_col |
- */ |
- public void add(List<String> new_header, List<List<Object>> new_data, String data_col) { |
- header = new_header; |
- // Use LinkedHashSet to removed duplocate |
- TreeSet<List<Object>> hashed_data = new TreeSet<>(timeseries_compare); |
- // Add original data |
- hashed_data.addAll(data); |
- // Remove entries with same date as new_data |
- hashed_data.removeAll(new_data); |
- // Add new_data. Hashed_data should have only updated versions of duplicates. |
- hashed_data.addAll(new_data); |
- data = new ArrayList<List<Object>>(hashed_data); |
- //sort(); |
- } |
- |
- /** |
- * |
- * @param data_file |
+ * @param dataFile |
* @param ft |
- * @param data_col If non-null, then only add data from this column |
+ * @param dataCol If non-null, then only add data from this column |
+ * @param chunkStrat |
* @throws FileNotFoundException |
* @throws IOException |
* @throws ParseException |
*/ |
- public void readCSV(File data_file, SimpleDateFormat ft, String data_col) throws FileNotFoundException, IOException, ParseException { |
- CSVReader reader = new CSVReader(new FileReader(data_file.getAbsolutePath())); |
+ public void readCSV(File dataFile, SimpleDateFormat ft, String dataCol, String chunkStrat) |
+ throws FileNotFoundException, IOException, ParseException, ServiceException { |
+ CSVReader reader = new CSVReader(new FileReader(dataFile.getAbsolutePath())); |
String[] nextLine; |
|
// Data is stored as array of arrays of [date, val1, val2] |
- List<List<Object>> csv_data = new ArrayList(); |
- List<String> csv_header = new ArrayList(); |
+ List<List<Object>> csvData = new ArrayList(); |
+ List<String> csvHeader = new ArrayList(); |
int idatacol = 0; |
while ((nextLine = reader.readNext()) != null) { |
- if (csv_header.isEmpty()) { |
- csv_header.add(nextLine[0]); |
+ if (csvHeader.isEmpty()) { |
+ csvHeader.add(nextLine[0]); |
for (int i=1; i<nextLine.length; i++) { |
String headerItem = nextLine[i].trim(); |
- if (data_col == null) { |
- csv_header.add(headerItem); |
- } else if (headerItem.equals(data_col)) { |
+ if (dataCol == null) { |
+ csvHeader.add(headerItem); |
+ } else if (headerItem.equals(dataCol)) { |
idatacol = i; |
- csv_header.add(headerItem); |
+ csvHeader.add(headerItem); |
} |
} |
} |
@@ -103,14 +104,76 @@ |
vals.add(Double.parseDouble(nextLine[i])); |
} |
} |
- csv_data.add(vals); |
+ csvData.add(vals); |
} |
} |
|
- add(csv_header, csv_data, data_col); |
+ header = csvHeader; |
+ add(csvData, chunkStrat); |
} |
|
- public void sort() { |
- Collections.sort(data, timeseries_compare); |
+ /** |
+ * |
+ * @param newHeader |
+ * @param newData |
+ * @param chunkStrat |
+ * @throws csip.ServiceException |
+ */ |
+ public void add(List<List<Object>> newData, String chunkStrat) throws ServiceException { |
+ if (chunkStrat.equals(CHUNK_ALL)) { |
+ // Single document mode |
+ addYearData(0, newData); |
+ } else if (chunkStrat.equals(CHUNK_NONE)) { |
+ // Every timeseries item is a document. Index is from 0..n-1 |
+ for (int i=0; i<newData.size(); i++) { |
+ List<List<Object>> rows = new ArrayList<>(); |
+ rows.add(newData.get(i)); |
+ addYearData(i, rows); |
+ } |
+ } else if (chunkStrat.equals(CHUNK_YEAR)) { |
+ // Group data array into years |
+ int year = 0; |
+ List<List<Object>> yearRows = null; |
+ for (List<Object> row : newData) { |
+ Date date = (Date)row.get(0); |
+ LocalDate localDate = date.toInstant().atZone(ZoneId.systemDefault()).toLocalDate(); |
+ int this_year = localDate.getYear(); |
+ if (this_year != year) { |
+ // Year changed, add the pair of year and current list of rows. |
+ addYearData(year, yearRows); |
+ year = this_year; |
+ yearRows = new ArrayList<>(); |
+ } |
+ yearRows.add(row); |
+ } |
+ // Add last group of years |
+ addYearData(year, yearRows); |
+ } else { |
+ throw new ServiceException("Invalid chunk strategy " + chunkStrat); |
+ } |
+ } |
+ |
+ public void addYearData(Integer year, List<List<Object>> rows) { |
+ if (rows != null && rows.size() > 0) { |
+ MutablePair<Integer, List<List<Object>>> pair = new MutablePair<>(year, rows); |
+ data.add(pair); |
+ } |
+ } |
+ |
+ /** |
+ * Insert the data into mongodb. |
+ * @param locationDoc The organizing document with location info |
+ */ |
+ public void insertData(MongoCollection collection, Document locationDoc) { |
+ // Create a document with each chunk |
+ for (Pair<Integer, List<List<Object>>> chunk : data) { |
+ int year = chunk.getLeft(); |
+ List<List<Object>> rows = chunk.getRight(); |
+ Document chunkDoc = new Document() |
+ .append("location_id", locationDoc.get("_id")) |
+ .append("year", year) |
+ .append("data", rows); |
+ collection.insertOne(chunkDoc); |
+ } |
} |
} |