MongoAccess.java [src/java/utils] Revision: default  Date:
/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package utils;

import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoIterable;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.GridFSBuckets;
import com.mongodb.client.gridfs.GridFSDownloadStream;
import com.mongodb.client.gridfs.model.GridFSUploadOptions;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import csip.Config;
import csip.ServiceException;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import m.ann.training.scale.DataSetIndices;
import org.apache.commons.lang.SerializationUtils;

import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.encog.neural.neat.NEATNetwork;

import utils.MongoUtils.Sorting;
import utils.MongoUtils.ServiceFunction;

/*
Examples for mongo shell, or robomongo

find one:
  db.raw.findOne({"name": "erosion"})

size of a bson:
  Object.bsonsize(db.raw.findOne({"name": "erosion"}))

get the erosion values as JSON Array:
  db.raw.findOne({ name:"erosion" }).values

get the erosion values as String Array: 
  db.raw.findOne({name:"erosion"}).values.toString()

get the erosion values as JSON, one line:
  tojsononeline(db.raw.findOne({name:"erosion"}).values)

 */
/**
 *
 * @author od
 */
public class MongoAccess {

  public static final String VALUES_COUNT = "count";
  public static final String NAME = "name";
  public static final String TYPE = "type";
  public static final String MIN = "min";
  public static final String MAX = "max";
  public static final String MIN_INDEX = "min_index";
  public static final String MAX_INDEX = "max_index";
  public static final String NORM = "norm";
  public static final String NORM_MIN = "norm_min";
  public static final String NORM_MAX = "norm_max";
  public static final String VAL_ID = "values_id";
  public static final String IN = "in";
  public static final String OUT = "out";
  public static final String VALUES = "values";
  public static final String METADATA = "metadata";
  public static final String TIMESTAMP = "timestamp";
  public static final String FILES = "files";
  public static final String SUID = "suid";

  // ANN properties
  public static final String ANN_ID = "_id";
  public static final String POPULATION = "population";
  public static final String EPOCHS = "epochs";
  public static final String RECOVERY_EPOCHS = "recovery_epochs";
  public static final String MAX_EPOCHS = "max_epochs";
  public static final String SCORE = "score";
  public static final String SCORES = "scores";
  public static final String SCALE_MECHANISM = "scale_mechanism";
  public static final String EXIT_STRATEGY = "exit_strategy";
  public static final String CONNECTION_DENSITY = "connection_density";
  public static final String VARIABLES = "variables";
  public static final String TRAINING_PERC = "training_perc";
  public static final String TRAINING_ERROR = "training_error";
  public static final String BEST_NET_STRUCTURE = "best_net_structure";
  public static final String STRUCTURE_ID = "network_structure_id";
  public static final String LINK_NUM = "link_number";
  public static final String IN_NODE_NUM = "input_nodes";
  public static final String OUT_NODE_NUM = "output_nodes";
  public static final String HIDE_NODE_NUM = "hidden_nodes";
  public static final String STRUCTURE = "structure";

  // DB documents
  //public static final String MODEL_STAT = "model_stat";
  public static final String PERFORMANCE = "performance";
  public static final String HISTORY = "history";
  public static final String HYPERPARAMS = "hyper_params";

  //Collections
  public static final String RAW = "raw";
  public static final String VALIDATION = "validation";
  public static final String VALIDNORMALIZED = "validNormalized";
  public static final String NORMALIZED = "normalized";
  public static final String TRAINED = "trained";
  public static final String SELECTED = "selected";

  static final String METADATA__VAL_ID = MongoUtils.nestedDocuments(METADATA, VAL_ID);
  static final String METADATA__NAME = MongoUtils.nestedDocuments(METADATA, NAME);
  static final String TRAINED__FILES = MongoUtils.nestedDocuments(TRAINED, FILES);
  static final String METADATA__SERVICE = MongoUtils.nestedDocuments(METADATA, "service");

  private static MongoClient mongo;


  synchronized static MongoClient getMongo() {
    // "mongodb://user:pass@host:port/db"
    if (mongo == null) {
      String s = Config.getString("ann.db.uri", "mongodb://localhost:27017");
      MongoClientURI u = new MongoClientURI(s);
      String dbname = u.getDatabase();
      if (dbname != null) {
        throw new IllegalArgumentException("remove database in config!");
      }
      mongo = new MongoClient(u);
    }
    return mongo;
  }


  public synchronized static void closeMongo() {
    if (mongo != null) {
      mongo.close();
      mongo = null;
    }
  }


  public static String getCollectionHash(String annName, String collection) {
    checkAnnExists(annName);
    MongoDatabase db = getMongo().getDatabase(annName);
    Document collStatsResults = db.runCommand(new Document("dbHash", 1).append("collections", collection));
    Document coll = (Document) collStatsResults.get("collections");
    String h = coll.getString(collection);
    return h;
  }


  private static MongoDatabase getDatabase(String annName) {
    checkAnnExists(annName);
    return getMongo().getDatabase(annName);
  }


  private static MongoCollection<Document> getCollection(String annName, String collection) {
    MongoCollection<Document> mongoCollection = getDatabase(annName).getCollection(collection);
    if (mongoCollection == null) {
      String msg = "Null collection " + collection + " from database " + annName;
      throw new NullPointerException(msg);
    }
    return mongoCollection;
  }


  private static void checkAnnExists(String annName) {
    if (!getMongo().listDatabaseNames().into(new ArrayList<>()).contains(annName)) {
      throw new IllegalArgumentException("No such ann: " + annName);
    }
  }


  /**
   * Get Anns.
   *
   * @return The list of ANNs
   */
  public static List<String> getAnns() {
    List<String> l = new ArrayList<>();
    MongoIterable<String> dbNames = getMongo().listDatabaseNames();
    for (String dbName : dbNames) {
      if (getMongo().getDatabase(dbName).listCollectionNames().into(new ArrayList<>()).contains(RAW)) {
        l.add(dbName);
      }
    }
    return l;
  }


  private static FindIterable<Document> findDocumentsInCollection(String annName, String collection) {
    return findDocumentsInCollection(annName, collection, null);
  }


  private static FindIterable<Document> findDocumentsInCollection(String annName, String collection, Bson filter) {
    FindIterable<Document> iterableDocument = (filter == null) ? getCollection(annName, collection).find()
        : getCollection(annName, collection).find(filter);
    if (iterableDocument == null) {
      String msg = "Null iterable document for collection " + collection + " from database " + annName;
      throw new NullPointerException(msg);
    }
    return iterableDocument;
  }


  private static Document findFirstDocumentInCollection(String annName, String collection) {
    return findFirstDocumentInCollection(annName, collection, null);
  }


  private static Document findFirstDocumentInCollection(String annName, String collection, Bson filter) {
    Document d = findDocumentsInCollection(annName, collection, filter).first();
    if (d == null) {
      String msg = "Null document for collection " + TRAINED__FILES + " from database " + annName;
      throw new NullPointerException(msg);
    }
    return d;
  }


  public static String getTrainingStats(String ann, String suid) {
    Bson filter = Filters.eq(METADATA__SERVICE, suid);
    Document d = findFirstDocumentInCollection(ann, TRAINED__FILES, filter);
    return d.toJson();
  }


  /**
   * get the values.
   *
   * @param ann
   * @param phase
   * @param var
   * @return
   */
  static List<Number> getValues(String ann, String phase, String var) throws ServiceException {
    Bson filter = Filters.eq(NAME, var);
    Document d = findFirstDocumentInCollection(ann, phase, filter);
    if (d == null) {
      throw new ServiceException("No such variable in " + ann + "/" + phase + ": " + var);
    }
    if (!d.containsKey(VALUES)) {
      throw new ServiceException("No values in " + ann + "/" + phase + ": " + var);
    }
    List<Number> values = d.get(VALUES, List.class);
    return values;
  }


  public static Iterable<Document> getSortedNormalizedData(String ann, String normCollection,
      String field, Sorting sort) throws ServiceException {
    return ((FindIterable< Document>) getNormalizedData(ann, normCollection))
        .sort(new Document(MongoUtils.nestedDocuments(METADATA, field), sort.getOrder()));
  }


  /**
   * Get the normalized data for the ANN
   *
   * @param ann
   */
  static Iterable<Document> getNormalizedData(String ann, String normCollection) throws ServiceException {
    return findDocumentsInCollection(ann, normCollection);
  }


  public static String getValidValuesId(String ann) {
    return findFirstDocumentInCollection(ann, RAW).get(METADATA, Document.class).getString(VAL_ID);
  }


  public static List<Bson> extractMetadata(Iterable<Document> d) {
    List<Bson> metadata = new ArrayList<>();
    for (Document doc : d) {
      Bson m = doc.get(METADATA, Document.class);
      if (m == null) {
        throw new NullPointerException("Metadata not available for object " + d.toString());
      }
      metadata.add(doc.get(METADATA, Document.class));
    }
    return metadata;
  }


  public static Iterable<Document> retrieveSortedANNsDocuments(String annName, String field, Sorting sort) {
//    Bson filter = Filters.ne(MongoUtils.nestedDocuments(METADATA, "exit_strategy"), "rec");
    Bson sorting = new Document(MongoUtils.nestedDocuments(METADATA, field), sort.getOrder());
//    Iterable<Document> iterable = findDocumentsInCollection(annName, TRAINED__FILES, filter).sort(sorting);
    Iterable<Document> iterable = findDocumentsInCollection(annName, TRAINED__FILES).sort(sorting);
    if (iterable == null) {
      throw new NullPointerException("No documents returned from files collection of " + annName);
    }
    return iterable;
  }


  public static double[] retrieveANNsNonRec(String annName, String varName, String field) {
    Bson filter = Filters.ne(MongoUtils.nestedDocuments(METADATA, "exit_strategy"), "rec");
    Iterable<Document> iterable = findDocumentsInCollection(annName, TRAINED__FILES, filter);
    return getErrors(iterable, varName, field);
  }


  public static double[] retrieveANNsErrors(String annName, String varName, String field) {
    Iterable<Document> iterable = findDocumentsInCollection(annName, TRAINED__FILES);
    return getErrors(iterable, varName, field);
  }


  private static double[] getErrors(Iterable<Document> iterable, String varName, String field) {
    List<Double> errors = new ArrayList<>();
    for (Document d : iterable) {
      errors.add(d.get(METADATA, Document.class).get(PERFORMANCE, Document.class)
          .get(varName, Document.class).getDouble(field));
    }
    return errors.stream().mapToDouble(Double::doubleValue).toArray();
  }


  private static List<Object> getMetadata(Iterable<Document> iterable, String metadata) {
    List<Object> errors = new ArrayList<>();
    for (Document d : iterable) {
      errors.add(d.get(METADATA, Document.class).get(metadata));
    }
    return errors;
  }


  public static List<Object> retrieveANNsNonRecInfo(String annName, String metadata) {
    Bson filter = Filters.ne(MongoUtils.nestedDocuments(METADATA, "exit_strategy"), "rec");
    Iterable<Document> iterable = findDocumentsInCollection(annName, TRAINED__FILES, filter);
    return getMetadata(iterable, metadata);
  }


  public static Iterable<Document> retrieveANNsDocuments(String annName) {
    return findDocumentsInCollection(annName, TRAINED__FILES);
  }


  // why select the first?
  public static double retrieveMinMaxPerc(String annName) {
    Bson sorting = new Document(TIMESTAMP, Sorting.DESCENDING.getOrder());
    FindIterable<Document> iterable = findDocumentsInCollection(annName, SELECTED).sort(sorting);
    Document d = iterable.first();
    return d.getDouble("percentage btw min-max");
  }


  public static double retrieveQuartPerc(String annName) {
    Bson sorting = new Document(TIMESTAMP, Sorting.DESCENDING.getOrder());
    FindIterable<Document> iterable = findDocumentsInCollection(annName, SELECTED).sort(sorting);
    Document d = iterable.first();
    return d.getDouble("percentage btw quartiles");
  }


  public static List<NEATNetwork> retrieveANNs(String annName) {
    Bson sorting = new Document(TIMESTAMP, Sorting.DESCENDING.getOrder());
    FindIterable<Document> iterable = findDocumentsInCollection(annName, SELECTED).sort(sorting);
    Document d = iterable.first();
    List<ObjectId> ids = d.get("selected_id", List.class);
    List<NEATNetwork> nn = new ArrayList<>();
    ids.forEach((id) -> {
      nn.add(retrieveANN(annName, id));
    });
    return nn;
  }


  public static NEATNetwork retrieveANN(String annName, ObjectId id) {
    MongoDatabase db = getDatabase(annName);
    GridFSBucket gridFSBucket = GridFSBuckets.create(db, TRAINED);
    byte[] file;
    try (GridFSDownloadStream stream = gridFSBucket.openDownloadStream(id)) {
      long fileLength = stream.getGridFSFile().getLength();
      file = new byte[(int) fileLength];
      stream.read(file);
    }
    NEATNetwork nn = (NEATNetwork) SerializationUtils.deserialize(file);
    if (nn == null) {
      throw new NullPointerException("Network not found in ann '" + annName + "': " + id);
    }
    return nn;
  }


  /**
   * Store ANN as file with metadata
   *
   *
   * @param ann_in
   * @param ann_out
   * @param network
   * @param meta
   * @throws IOException
   */
  // get the metadata: mongo r2 --quiet --eval  "db.trained.files.findOne({'filename':'r2-hello.ann'}).metadata" get the
  // get the file:     mongofiles -d r2 --prefix=trained get r2-hello.ann
  public static void storeANN(String ann_in, byte[] network, String ann_out,
      Document meta, String nn_id) throws IOException {

    GridFSBucket gridFSBucket = GridFSBuckets.create(getDatabase(ann_out), TRAINED);
    Iterable<Document> d = findDocumentsInCollection(ann_out, TRAINED__FILES);
    for (Document dd : d) {
      if (dd.get(MongoAccess.METADATA, Document.class).getString("nn_id").equals(nn_id)) {
        ObjectId objid = dd.getObjectId(MongoAccess.ANN_ID);
        gridFSBucket.delete(objid);
      }
    }

    // 1.check nn with id is stored already
    // 2.if is stored, get ObjectId and delete it gridFSBucket.delete(objectId)
    // 3.save it
    GridFSUploadOptions options = new GridFSUploadOptions().metadata(meta);
    InputStream is = new ByteArrayInputStream(network);
    gridFSBucket.uploadFromStream(ann_in + "-" + ann_out, is, options);
    is.close();
  }


  public static boolean collectionExist(String annName, String collection) {
    MongoIterable<String> col = getDatabase(annName).listCollectionNames();
    for (String coll : col) {
      if (coll.toLowerCase().equals(collection.toLowerCase())) {
        return true;
      }
    }
    return false;
  }


  /**
   * Normalize the raw data db-side
   *
   * @param annName
   */
  public static void normalize(String annName) {
    checkAnnExists(annName);
    // if a normalized database exists
    MongoCollection<Document> col = getCollection(annName, RAW);
    List<Bson> list = MongoAggregations.normalize(MongoAccess.NORMALIZED);
    AggregateIterable<Document> output = col.aggregate(list);
    output.toCollection();
  }


  public static void pushSelectedANNs(String ann, List<ObjectId> id, ErrorEstimate ee) {
    MongoCollection<Document> col = getCollection(ann, SELECTED);
    Document d = new Document("timestamp", new Date())
        .append("percentage btw quartiles", ee.getQuartilesPerc())
        .append("percentage btw min-max", ee.getMinMaxPerc())
        .append("selected_id", id);
    col.insertOne(d);
  }


  public static void validatePipeline(String ann, String currentService) {
    String validValuesID = getValidValuesId(ann);
    String currentID = findFirstDocumentInCollection(ann, currentService).get(METADATA, Document.class).getString(VAL_ID);

    // maybe create a FeNS Exception?
    if (!currentID.equals(validValuesID)) {
      String msg = "BROKEN PIPELINE: valid ID " + validValuesID
          + " differ from current ID " + currentID;
      throw new RuntimeException(msg);
    }
  }


  /**
   * Collect raw data.
   *
   * @param ann The name of the ann-dedicated database
   * @param paramNames List of parameters names
   * @param getVal List of parameters values
   * @param getDescr List of parameters metadata
   * @throws ServiceException
   */
  public static void collectRawData(String ann,
      Set<String> paramNames,
      ServiceFunction<String, Number> getVal,
      ServiceFunction<String, String> getDescr)
      throws ServiceException {

    MongoCollection<Document> col = getMongo().getDatabase(ann).getCollection(RAW);
    String val_id = ObjectId.get().toString();
    Date date = new Date();
    for (String name : paramNames) {
      String type = null;

      // normalization default range
      double norm_min = 0;
      double norm_max = 1;

      try {
        type = getDescr.apply(name);
      } catch (ServiceException E) {
        continue;
      }
      if (type != null && (type.contains(IN) || type.contains(OUT))) {
        Boolean norm = Boolean.FALSE;
        if (type.contains(NORM)) {
          norm = Boolean.TRUE;
        }
        if (MongoUtils.checkForNormRange(type)) {
          String norm_range = type.substring(type.indexOf("[") + 1, type.indexOf("]"));
          norm_range = norm_range.replaceAll("\\s+", "");
          String[] minmax = norm_range.split(",");
          norm_min = Double.parseDouble(minmax[0]);
          norm_max = Double.parseDouble(minmax[1]);
          // make sure min, max are different
          if (norm_min == norm_max) {
            throw new IllegalArgumentException("Min max for norm range are identical");
          }
          // make sure min, max are not reversed
          if (norm_min > norm_max) {
            double tmp_min = norm_max;
            norm_max = norm_min;
            norm_min = tmp_min;
          }
        }
        type = type.contains(IN) ? IN : OUT;
        collect(col, val_id, name, getVal.apply(name), norm, norm_min, norm_max, type, date);
      } // add exception if IN and OUT are not available
    }
  }


  /**
   * Collect an entire csv of data column by column and push it to database
   *
   * @param annName The name of the ann-dedicated database
   * @param paramNames
   * @param getDescr Metadata of each parameter
   * @param file csv file
   *
   * @throws IOException
   * @throws ServiceException
   */
  public static void collectFromFile(String annName, Set<String> paramNames,
      ServiceFunction<String, String> getDescr, File file,
      int blockSize) throws IOException, ServiceException {

    // 32 K buffer
    try (BufferedReader r = new BufferedReader(new FileReader(file), 8192 * 4)) {
      Pattern p = Pattern.compile("\\s*,\\s*");

      String[] names = p.split(r.readLine());
      String[] types = new String[names.length];
      Boolean[] norm = new Boolean[names.length];
      double[] norm_min = new double[names.length];
      double[] norm_max = new double[names.length];
      for (int i = 0; i < names.length; i++) {
        norm_max[i] = 1.0;
      }
      List<Integer> indices = new ArrayList<>();

      for (int i = 0; i < names.length; i++) {
        if (paramNames.contains(names[i])) {
          types[i] = getDescr.apply(names[i]);
          if (types[i] != null && (types[i].contains(IN) || types[i].contains(OUT))) {
            indices.add(i);
            norm[i] = Boolean.FALSE;
            if (types[i].contains(NORM)) {
              norm[i] = Boolean.TRUE;
            }
            if (MongoUtils.checkForNormRange(types[i])) {
              String norm_range = types[i].substring(types[i].indexOf("[") + 1, types[i].indexOf("]"));
              norm_range = norm_range.replaceAll("\\s+", "");
              String[] minmax = norm_range.split(",");
              norm_min[i] = Double.parseDouble(minmax[0]);
              norm_max[i] = Double.parseDouble(minmax[1]);

              // make sure min, max are different
              if (norm_min == norm_max) {
                throw new IllegalArgumentException("Min max for norm range are identical");
              }
              // make sure min, max are not reversed
              if (norm_min[i] > norm_max[i]) {
                double tmp_min = norm_max[i];
                norm_max[i] = norm_min[i];
                norm_min[i] = tmp_min;
              }
            }
            types[i] = types[i].contains(IN) ? IN : OUT;
          }
        }
      }

      getMongo().dropDatabase(annName);
      MongoCollection<Document> col = getMongo().getDatabase(annName).getCollection(RAW);

      String val_id = ObjectId.get().toString();
      Date date = new Date();

      List<Object>[] block = new ArrayList[names.length];
      for (int i = 0; i < block.length; i++) {
        block[i] = new ArrayList<>();
      }

      int rows = 0;
      String line = r.readLine();
      while (line != null) {
        String[] row = p.split(line);
        for (int i : indices) {
          block[i].add(new Double(row[i]));
        }
        line = r.readLine();
        if ((++rows % blockSize == 0) || line == null) {
          for (int i : indices) {
            collectInBlocks(col, val_id, names[i], block[i], norm[i], norm_min[i], norm_max[i], types[i], date);
            block[i].clear();
          }
        }
      }
      List<Bson> list = MongoAggregations.computeMinMax(RAW);
      AggregateIterable<Document> output = col.aggregate(list);
      output.toCollection();
    }
  }


  /**
   * Collect an entire csv of data column by column and push it to database
   *
   * @param paramNames
   * @param getDescr Metadata of each parameter
   * @param file csv file
   *
   * @throws IOException
   * @throws ServiceException
   */
  public static Data collectFromFile(Set<String> paramNames,
      ServiceFunction<String, String> getDescr, File file) throws IOException, ServiceException {

    try (BufferedReader r = new BufferedReader(new FileReader(file))) {
      Pattern p = Pattern.compile("\\s*,\\s*");

      String[] names = p.split(r.readLine());
      String[] types = new String[names.length];
      Boolean[] norm = new Boolean[names.length];
      double[] norm_min = new double[names.length];
      double[] norm_max = new double[names.length];
      for (int i = 0; i < names.length; i++) {
        norm_max[i] = 1.0;
      }

      for (int i = 0; i < names.length; i++) {
        if (paramNames.contains(names[i])) {
          types[i] = getDescr.apply(names[i]);
          if (types[i] != null && (types[i].contains(IN) || types[i].contains(OUT))) {
            norm[i] = Boolean.FALSE;
            if (types[i].contains(NORM)) {
              norm[i] = Boolean.TRUE;
            }
            if (MongoUtils.checkForNormRange(types[i])) {
              String norm_range = types[i].substring(types[i].indexOf("[") + 1, types[i].indexOf("]"));
              norm_range = norm_range.replaceAll("\\s+", "");
              String[] minmax = norm_range.split(",");
              norm_min[i] = Double.parseDouble(minmax[0]);
              norm_max[i] = Double.parseDouble(minmax[1]);

              // make sure min, max are different
              if (norm_min == norm_max) {
                throw new IllegalArgumentException("Min max for norm range are identical");
              }
              // make sure min, max are not reversed
              if (norm_min[i] > norm_max[i]) {
                double tmp_min = norm_max[i];
                norm_max[i] = norm_min[i];
                norm_min[i] = tmp_min;
              }
            }
            types[i] = types[i].contains(IN) ? IN : OUT;
          }
        }
      }
      Data data = new Data(names, norm, norm_min, norm_max, types);
      String line = null;
      int index = 0;
      while ((line = r.readLine()) != null) {
        String[] row = p.split(line);
        double[] tmpData = Arrays.stream(row).mapToDouble(Double::valueOf).toArray();
        data.put(index, tmpData);
        index++;
      }
      return data;
    }
  }


  public static void pushToDB(String annName, Data data, DataSetIndices dataIndices) {
    MongoCollection<Document> rawCol = getMongo().getDatabase(annName).getCollection(RAW);
    MongoCollection<Document> validationCol = getMongo().getDatabase(annName).getCollection(VALIDATION);
    String val_id = ObjectId.get().toString();
    Date date = new Date();

    for (int rowIndex : dataIndices.getTraining()) {
      double[] rowData = data.getDataPerRow(rowIndex);
      dataPush(data, rowData, val_id, date, rawCol);
    }
    for (int rowIndex : dataIndices.getValidation()) {
      double[] rowData = data.getDataPerRow(rowIndex);
      dataPush(data, rowData, val_id, date, validationCol);
    }
  }


  private static void dataPush(Data data, double[] rowData, String val_id, Date date, MongoCollection<Document> col) {
    for (int colIndex = 0; colIndex < rowData.length; colIndex++) {
      String name = data.getName(colIndex);
      boolean norm = data.getNorm(colIndex);
      double norm_min = data.getNormMin(colIndex);
      double norm_max = data.getNormMax(colIndex);
      double min = data.getMin(colIndex);
      double max = data.getMax(colIndex);
      String type = data.getType(colIndex);
      collect(col, val_id, name, rowData[colIndex], norm, norm_min, norm_max, min, max, type, date);
    }
  }


  private static void collect(MongoCollection<Document> col, String val_id, String name, Object val,
      Boolean norm, Double norm_min, Double norm_max, Double min, Double max, String type, Date tstamp) {
    Bson filter = Filters.eq(METADATA__NAME, name);
    if (col.find(filter).first() == null) {
      Document d = new Document(NAME, name);
      d.append(TYPE, type);
      d.append(NORM, norm);
      d.append(NORM_MIN, norm_min);
      d.append(NORM_MAX, norm_max);
      d.append(MIN, min);
      d.append(MAX, max);
      Document app = new Document(TIMESTAMP, new Date())
          .append(METADATA, d);
      col.insertOne(app);
    }
    col.updateOne(filter, Updates.combine(Updates.push(VALUES, val),
        Updates.set(METADATA__VAL_ID, val_id), Updates.set(TIMESTAMP, tstamp)));
  }


  private static void collectInBlocks(MongoCollection<Document> col,
      String val_id, String name, List<Object> val,
      Boolean norm, Double norm_min, Double norm_max, String type, Date tstamp) {
    Bson filter = Filters.eq(METADATA__NAME, name);
    if (col.find(filter).first() == null) {
      Document d = new Document(NAME, name);
      d.append(TYPE, type);
      d.append(NORM, norm);
      d.append(NORM_MIN, norm_min);
      d.append(NORM_MAX, norm_max);
      Document app = new Document(TIMESTAMP, new Date())
          .append(METADATA, d);
      col.insertOne(app);
    }
    col.updateOne(filter, Updates.combine(Updates.pushEach(VALUES, val),
        Updates.set(METADATA__VAL_ID, val_id), Updates.set(TIMESTAMP, tstamp)));
  }


  private static void collect(MongoCollection<Document> col, String val_id,
      String name, Object val, Boolean norm, Double norm_min, Double norm_max,
      String type, Date tstamp) {
    Bson filter = Filters.eq(METADATA__NAME, name);
    if (col.find(filter).first() == null) {
      Document d = new Document(NAME, name);
      d.append(TYPE, type);
      d.append(NORM, norm);
      d.append(NORM_MIN, norm_min);
      d.append(NORM_MAX, norm_max);
      Document app = new Document(TIMESTAMP, new Date())
          .append(METADATA, d);
      col.insertOne(app);
    }
    col.updateOne(filter, Updates.combine(Updates.push(VALUES, val),
        Updates.set(METADATA__VAL_ID, val_id), Updates.set(TIMESTAMP, tstamp)));
  }

}