MongoArchive.java [tools/MetaModelTools/src/archives] Revision: ec5f4cade4553a8341e1cd241b111bfdb77a87a8  Date: Fri Jan 10 10:59:55 MST 2020
/*
 * $Id$
 *
 * This file is part of the Cloud Services Integration Platform (CSIP),
 * a Model-as-a-Service framework, API, and application suite.
 *
 * 2012-2019, OMSLab, Colorado State University.
 *
 * OMSLab licenses this file to you under the MIT license.
 * See the LICENSE file in the project root for more information.
 */
package archives;

import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.GridFSBuckets;
import com.mongodb.client.gridfs.GridFSDownloadStream;
import static com.mongodb.client.model.Filters.and;
import static com.mongodb.client.model.Filters.eq;
import com.mongodb.client.model.UpdateOptions;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import models.WEPSModelArchive;
import org.apache.commons.io.IOUtils;
import org.bson.BsonString;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.codehaus.jettison.json.JSONObject;
import models.ModelArchive;
import models.ModelArchiveFactory;

/**
 *
 * @author <a href="mailto:shaun.case@colostate.edu">Shaun Case</a>
 */
public class MongoArchive implements ServiceArchive {

  static String ARCHIVE_TOO_BIG = "the archive was too big to store.";
  MongoClient mongo;
  String dbname;
  static final String FS_COLL = "fs";

  UpdateOptions opt = new UpdateOptions();

  public MongoArchive(String uri) throws Exception {
    // "mongodb://user:pass@host:port/db"
    if (uri.contains("mongodb://")) {
      MongoClientURI u = new MongoClientURI(uri);
      dbname = u.getDatabase();
      if (dbname == null) {
        dbname = "csip";
      }

      mongo = new MongoClient(u);
      opt.upsert(true);
    } else {
      throw new Exception("No mongodb location specified");
    }
  } // "mongodb://user:pass@host:port/db"// "mongodb://user:pass@host:port/db"

  @Override
  public synchronized ArrayList<ModelArchive> getArchiveByReqIP(String ip, String service) throws Exception {
    ArrayList<ModelArchive> ret_val = new ArrayList<>();
    MongoDatabase db = mongo.getDatabase(dbname);
    FindIterable<Document> r = db.getCollection(FS_COLL).find(eq("req_ip", ip));

    r.sort(new Document("ctime", 1));
    for (Document doc : r) {
      if (doc != null) {
        if (doc.getString("service").equalsIgnoreCase(service)) {
          ret_val.add(new ModelArchive(
              doc.getString("_id"),
              doc.getString("ctime"),
              doc.getString("etime"),
              doc.getString("service"),
              doc.getString("status"),
              doc.getString("req_ip"),
              doc.getString("filename")
          )
          );
        }
      }
    }
    return ret_val;
  }

  @Override
  public synchronized ModelArchive getArchiveBySUID(String suid) throws Exception {
    MongoDatabase db = mongo.getDatabase(dbname);
    FindIterable<Document> r = db.getCollection(FS_COLL).find(new Document("_id", suid));
    Document doc = r.first();
    if (doc != null) {
      return new ModelArchive(
          doc.getString("_id"),
          doc.getString("ctime"),
          doc.getString("etime"),
          doc.getString("service"),
          doc.getString("status"),
          doc.getString("req_ip"),
          doc.getString("filename")
      );
    }
    return null;
  }

  private synchronized byte[] getFile(String suid) throws Exception {
    MongoDatabase db = mongo.getDatabase(dbname);
    GridFSBucket gridFSBucket = GridFSBuckets.create(db);
    try (GridFSDownloadStream stream = gridFSBucket.openDownloadStream(new BsonString(suid))) {
      long fileLength = stream.getGridFSFile().getLength();
      // this should not happen, since it is capped at 1GB.
      if (fileLength > Integer.MAX_VALUE) {
        return ARCHIVE_TOO_BIG.getBytes();
      }
      return IOUtils.toByteArray(stream, fileLength);
    }
  }

  @Override
  public synchronized JSONObject getServiceRequest(String suid) throws Exception {
    JSONObject requestData = null;

    byte[] zFile = getFile(suid);

    try (ZipInputStream zin = new ZipInputStream(new ByteArrayInputStream(zFile))) {
      ZipEntry entry;

      while ((entry = zin.getNextEntry()) != null) {
        if (entry.getName().contains(".request")) {
          BufferedReader bReader = new BufferedReader(new InputStreamReader(zin));
          StringBuilder fileContent = new StringBuilder();
          String inputStr;
          while ((inputStr = bReader.readLine()) != null) {
            fileContent.append(inputStr).append(System.lineSeparator());
          }
          requestData = new JSONObject(fileContent.toString());
          break;
        }
      }
    }

    return requestData;
  }

  @Override
  public synchronized JSONObject getServiceResponse(String suid) throws Exception {
    JSONObject requestData = null;

    byte[] zFile = getFile(suid);

    try (ZipInputStream zin = new ZipInputStream(new ByteArrayInputStream(zFile))) {
      ZipEntry entry;

      while ((entry = zin.getNextEntry()) != null) {
        if (entry.getName().contains(".response")) {
          BufferedReader bReader = new BufferedReader(new InputStreamReader(zin));
          StringBuilder fileContent = new StringBuilder();
          String inputStr;
          while ((inputStr = bReader.readLine()) != null) {
            fileContent.append(inputStr).append(System.lineSeparator());
          }
          requestData = new JSONObject(fileContent.toString());
          break;
        }
      }
    }

    return requestData;
  }

  @Override
  public synchronized void shutdown() throws Exception {
    mongo.close();
  }

  @Override
  public synchronized long getCount() {
    MongoDatabase db = mongo.getDatabase(dbname);
    return db.getCollection(FS_COLL).count();
  }

  // new keys implementation.
  @Override
  public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
    if (sortby == null) {
      sortby = "ctime";
      sortAsc = false;
    }
    Document sort = new Document(sortby, sortAsc ? 1 : -1);
    Set<String> l = new LinkedHashSet<>();
    MongoDatabase db = mongo.getDatabase(dbname);
    MongoCollection<Document> c = db.getCollection(FS_COLL);

    for (Document doc : c.find().sort(sort).skip(skip).limit(limit)) {
      l.add(doc.get("_id", String.class
      ));
    }
    return l;
  }

  @Override
  public synchronized boolean hasArchive(String suid) throws Exception {
    MongoDatabase db = mongo.getDatabase(dbname);
    return db.getCollection(FS_COLL).count(new Document("_id", suid)) == 1;

  }

  @Override
  public ArrayList<ModelArchive> getArchivesByFilters(Iterable<Bson> filters, int limit) throws Exception {
    ArrayList<ModelArchive> ret_val = new ArrayList<>();
    MongoDatabase db = mongo.getDatabase(dbname);
    FindIterable<Document> c = db.getCollection(FS_COLL).find(and(filters)).sort(new Document("ctime", -1));

    MongoCursor<Document> cursor = c.iterator();

    int count = 0;
    while (cursor.hasNext()) {
      Document doc = cursor.next();
      if (null != doc) {
        ModelArchive model = null;
        byte[] fileData = getFile(doc.getString("_id"));

        if (doc.getString("service").contains("weps/5.0")) {
          model = new WEPSModelArchive(doc.getString("_id"),
              doc.getString("ctime"),
              doc.getString("etime"),
              doc.getString("service"),
              doc.getString("status"),
              doc.getString("req_ip"),
              doc.getString("filename"),
              fileData
          );
          model.saveFileData(fileData);
        }

        fileData = null;  //  Allow garbage collection of the byte array.

        ret_val.add(model);
        count++;

        //  Reached our limit yet?
        if ((-1 != limit) && (count >= limit)) {
          break;
        }
      }
    }

    System.gc();  //  We've read a lot of data and dynamically allocated a lot of space, some of which was returned...force some garbage collection.
    return ret_val;
  }

  @Override
  public ArrayList<ModelArchive> getArchivesByFilter(String key, String value, int limit, boolean basicArchiveFunctionalaity) throws Exception {
    ArrayList<ModelArchive> ret_val = new ArrayList<>();
    MongoDatabase db = mongo.getDatabase(dbname);
    FindIterable<Document> c = db.getCollection(FS_COLL).find(and(eq(key, value), eq("status", "Finished"))).sort(new Document("ctime", -1));

    MongoCursor<Document> cursor = c.iterator();

    int count = 0;
    while (cursor.hasNext()) {
      Document doc = cursor.next();
      if ((null != doc) && (doc.getString(key).equalsIgnoreCase(value))) {
        ModelArchive model = null;
        byte[] fileData = null;
        String serviceName = doc.getString("service");

        model = ModelArchiveFactory.getModelArchive(serviceName, doc);
        if (null != model) {
          fileData = getFile(doc.getString("_id"));
          model.setFileDataEx(fileData);
          model.saveFileData(fileData);
          fileData = null;  //  Allow garbage collection of the byte array.

          if (null != model) {
            ret_val.add(model);
            count++;
          }
        }
        //  Reached our limit yet?
        if ((-1 != limit) && (count >= limit)) {
          break;
        }
      }
    }

    System.gc();  //  We've read a lot of data and dynamically allocated a lot of space, some of which was returned...force some garbage collection.
    return ret_val;
  }

  @Override
  public ArrayList<String> getSUIDsByFilter(String key, String value
  ) {
    ArrayList<String> ret_val = new ArrayList<>();
    MongoDatabase db = mongo.getDatabase(dbname);
    FindIterable<Document> c = db.getCollection(FS_COLL).find(eq(key, value)).sort(new Document("ctime", 1));

    MongoCursor<Document> cursor = c.iterator();

    while (cursor.hasNext()) {
      Document doc = cursor.next();
      if (null != doc) {
        ret_val.add(doc.getString("_id"));
      }
    }
    return ret_val;
  }
}