TimeseriesTable.java [src/java/utils] Revision: default Date:
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package utils;
import com.mongodb.client.MongoCollection;
import com.opencsv.CSVReader;
import csip.ServiceException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.bson.Document;
/**
*
* @author dave
*/
public class TimeseriesTable {
// All timeseries data will be stored in a single document in the data array.
public static String CHUNK_ALL = "CHUNK_ALL";
// Timeseries data will be broken up into documents representing a year of data.
public static String CHUNK_YEAR = "CHUNK_YEAR";
// Data will be stored in individual documents.
public static String CHUNK_NONE = "CHUNK_NONE";
// Header is a list of strings (date, colname1, colname2...
public List<String> header = new ArrayList();
// Data is a pair of [year, rows], where rows is an array of [dt, val1, val2...]
public List<Pair<Integer, List<List<Object>>>> data = new ArrayList<>();
Comparator timeseriesCompare = (Comparator<List<Object>>) (List<Object> lhs, List<Object> rhs) -> {
Date lhsDate = (Date) lhs.get(0);
Date rhsDate = (Date) rhs.get(0);
return lhsDate.compareTo(rhsDate);
};
public TimeseriesTable() {
}
public TimeseriesTable(Document timeseriesDoc) throws ServiceException {
List<String> doc_header = (List<String>) timeseriesDoc.get("header");
if (doc_header != null) {
header.addAll(doc_header);
//data.addAll((List<List<Object>>)timeseriesDoc.get("data"));
throw new ServiceException("This version of the constructor is not implemented");
}
}
public void readCSV(File dataFile, SimpleDateFormat ft, String dataCol)
throws FileNotFoundException, IOException, ParseException, ServiceException {
readCSV(dataFile, ft, dataCol, CHUNK_NONE);
}
/**
*
* @param dataFile
* @param ft
* @param dataCol If non-null, then only add data from this column
* @param chunkStrat
* @throws FileNotFoundException
* @throws IOException
* @throws ParseException
*/
public void readCSV(File dataFile, SimpleDateFormat ft, String dataCol, String chunkStrat)
throws FileNotFoundException, IOException, ParseException, ServiceException {
CSVReader reader = new CSVReader(new FileReader(dataFile.getAbsolutePath()));
String[] nextLine;
// Data is stored as array of arrays of [date, val1, val2]
List<List<Object>> csvData = new ArrayList();
List<String> csvHeader = new ArrayList();
int idatacol = 0;
while ((nextLine = reader.readNext()) != null) {
if (csvHeader.isEmpty()) {
csvHeader.add(nextLine[0]);
for (int i = 1; i < nextLine.length; i++) {
String headerItem = nextLine[i].trim();
if (dataCol == null) {
csvHeader.add(headerItem);
} else if (headerItem.equals(dataCol)) {
idatacol = i;
csvHeader.add(headerItem);
}
}
} else {
List<Object> vals = new ArrayList();
Date dt = ft.parse(nextLine[0]);
vals.add(dt);
for (int i = 1; i < nextLine.length; i++) {
if (i == idatacol || idatacol == 0) {
vals.add(Double.parseDouble(nextLine[i]));
}
}
csvData.add(vals);
}
}
header = csvHeader;
add(csvData, chunkStrat);
}
/**
*
* @param newHeader
* @param newData
* @param chunkStrat
* @throws csip.ServiceException
*/
public void add(List<List<Object>> newData, String chunkStrat) throws ServiceException {
if (chunkStrat.equals(CHUNK_ALL)) {
// Single document mode
addYearData(0, newData);
} else if (chunkStrat.equals(CHUNK_NONE)) {
// Every timeseries item is a document. Index is from 0..n-1
for (int i = 0; i < newData.size(); i++) {
List<List<Object>> rows = new ArrayList<>();
rows.add(newData.get(i));
addYearData(i, rows);
}
} else if (chunkStrat.equals(CHUNK_YEAR)) {
// Group data array into years
int year = 0;
List<List<Object>> yearRows = null;
for (List<Object> row : newData) {
Date date = (Date) row.get(0);
LocalDate localDate = date.toInstant().atZone(ZoneId.of("UTC")).toLocalDate();
int this_year = localDate.getYear();
if (this_year != year) {
// Year changed, add the pair of year and current list of rows.
addYearData(year, yearRows);
year = this_year;
yearRows = new ArrayList<>();
}
yearRows.add(row);
}
// Add last group of years
addYearData(year, yearRows);
} else {
throw new ServiceException("Invalid chunk strategy " + chunkStrat);
}
}
public void addYearData(Integer year, List<List<Object>> rows) {
if (rows != null && rows.size() > 0) {
MutablePair<Integer, List<List<Object>>> pair = new MutablePair<>(year, rows);
data.add(pair);
}
}
/**
* Insert the data into mongodb.
*
* @param locationDoc The organizing document with location info
*/
public void insertData(MongoCollection collection, Document locationDoc) {
// Create a document with each chunk
List<Document> docs = new ArrayList<>();
for (Pair<Integer, List<List<Object>>> chunk : data) {
int year = chunk.getLeft();
List<List<Object>> rows = chunk.getRight();
Document chunkDoc = new Document()
.append("location_id", locationDoc.get("_id"))
.append("year", year)
.append("data", rows);
docs.add(chunkDoc);
}
collection.insertMany(docs);
}
}