TimeseriesTable.java [src/java/utils] Revision: 132ffbc5bcb10b2255c3ec894f35bf6ea71b707b  Date: Thu Nov 15 10:39:12 MST 2018
/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package utils;

import com.mongodb.client.MongoCollection;
import com.opencsv.CSVReader;
import csip.ServiceException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.bson.Document;

/**
 *
 * @author dave
 */
public class TimeseriesTable {

  public static String CHUNK_ALL = "CHUNK_ALL";
  public static String CHUNK_YEAR = "CHUNK_YEAR";
  public static String CHUNK_NONE = "CHUNK_NONE";

  // Header is a list of strings (date, colname1, colname2...
  public List<String> header = new ArrayList();
  // Data is a pair of [year, rows], where rows is an array of [dt, val1, val2...]
  public List<Pair<Integer, List<List<Object>>>> data = new ArrayList<>();
  Comparator timeseriesCompare = (Comparator<List<Object>>) (List<Object> lhs, List<Object> rhs) -> {
    Date lhsDate = (Date) lhs.get(0);
    Date rhsDate = (Date) rhs.get(0);
    return lhsDate.compareTo(rhsDate);
  };

  public TimeseriesTable() {
  }

  public TimeseriesTable(Document timeseriesDoc) throws ServiceException {
    List<String> doc_header = (List<String>) timeseriesDoc.get("header");
    if (doc_header != null) {
      header.addAll(doc_header);
      //data.addAll((List<List<Object>>)timeseriesDoc.get("data"));
      throw new ServiceException("This version of the constructor is not implemented");
    }
  }

  public void readCSV(File dataFile, SimpleDateFormat ft, String dataCol)
          throws FileNotFoundException, IOException, ParseException, ServiceException {
    readCSV(dataFile, ft, dataCol, CHUNK_NONE);
  }

  /**
   *
   * @param dataFile
   * @param ft
   * @param dataCol If non-null, then only add data from this column
   * @param chunkStrat
   * @throws FileNotFoundException
   * @throws IOException
   * @throws ParseException
   */
  public void readCSV(File dataFile, SimpleDateFormat ft, String dataCol, String chunkStrat)
          throws FileNotFoundException, IOException, ParseException, ServiceException {
    CSVReader reader = new CSVReader(new FileReader(dataFile.getAbsolutePath()));
    String[] nextLine;

    // Data is stored as array of arrays of [date, val1, val2]
    List<List<Object>> csvData = new ArrayList();
    List<String> csvHeader = new ArrayList();
    int idatacol = 0;
    while ((nextLine = reader.readNext()) != null) {
      if (csvHeader.isEmpty()) {
        csvHeader.add(nextLine[0]);
        for (int i = 1; i < nextLine.length; i++) {
          String headerItem = nextLine[i].trim();
          if (dataCol == null) {
            csvHeader.add(headerItem);
          } else if (headerItem.equals(dataCol)) {
            idatacol = i;
            csvHeader.add(headerItem);
          }
        }
      } else {
        List<Object> vals = new ArrayList();
        Date dt = ft.parse(nextLine[0]);
        vals.add(dt);
        for (int i = 1; i < nextLine.length; i++) {
          if (i == idatacol || idatacol == 0) {
            vals.add(Double.parseDouble(nextLine[i]));
          }
        }
        csvData.add(vals);
      }
    }

    header = csvHeader;
    add(csvData, chunkStrat);
  }

  /**
   *
   * @param newHeader
   * @param newData
   * @param chunkStrat
   * @throws csip.ServiceException
   */
  public void add(List<List<Object>> newData, String chunkStrat) throws ServiceException {
    if (chunkStrat.equals(CHUNK_ALL)) {
      // Single document mode
      addYearData(0, newData);
    } else if (chunkStrat.equals(CHUNK_NONE)) {
      // Every timeseries item is a document. Index is from 0..n-1
      for (int i = 0; i < newData.size(); i++) {
        List<List<Object>> rows = new ArrayList<>();
        rows.add(newData.get(i));
        addYearData(i, rows);
      }
    } else if (chunkStrat.equals(CHUNK_YEAR)) {
      // Group data array into years
      int year = 0;
      List<List<Object>> yearRows = null;
      for (List<Object> row : newData) {
        Date date = (Date) row.get(0);
        LocalDate localDate = date.toInstant().atZone(ZoneId.of("UTC")).toLocalDate();
        int this_year = localDate.getYear();
        if (this_year != year) {
          // Year changed, add the pair of year and current list of rows.
          addYearData(year, yearRows);
          year = this_year;
          yearRows = new ArrayList<>();
        }
        yearRows.add(row);
      }
      // Add last group of years
      addYearData(year, yearRows);
    } else {
      throw new ServiceException("Invalid chunk strategy " + chunkStrat);
    }
  }

  public void addYearData(Integer year, List<List<Object>> rows) {
    if (rows != null && rows.size() > 0) {
      MutablePair<Integer, List<List<Object>>> pair = new MutablePair<>(year, rows);
      data.add(pair);
    }
  }

  /**
   * Insert the data into mongodb.
   *
   * @param locationDoc The organizing document with location info
   */
  public void insertData(MongoCollection collection, Document locationDoc) {
    // Create a document with each chunk
    List<Document> docs = new ArrayList<>();
    for (Pair<Integer, List<List<Object>>> chunk : data) {
      int year = chunk.getLeft();
      List<List<Object>> rows = chunk.getRight();
      Document chunkDoc = new Document()
              .append("location_id", locationDoc.get("_id"))
              .append("year", year)
              .append("data", rows);
      docs.add(chunkDoc);
    }
    collection.insertMany(docs);
  }
}