MongoArchiveStore1.java [src/csip] Revision:   Date:
/*
 * $Id$
 *
 * This file is part of the Cloud Services Integration Platform (CSIP),
 * a Model-as-a-Service framework, API and application suite.
 *
 * 2012-2022, Olaf David and others, OMSLab, Colorado State University.
 *
 * OMSLab licenses this file to you under the MIT license.
 * See the LICENSE file in the project root for more information.
 */
package csip;

import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.GridFSBuckets;
import com.mongodb.client.gridfs.GridFSDownloadStream;
import com.mongodb.client.gridfs.model.GridFSUploadOptions;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.DeleteResult;
import csip.utils.Binaries;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonString;
import org.bson.Document;

/**
 *
 */
class MongoArchiveStore1 implements ArchiveStore {

  static final String FS_COLL = "fs";
  static final BsonDocument ping = new BsonDocument("ping", new BsonInt32(1));
  static final long _1GB = 1024 * 1024 * 1024;

  MongoClient mongo;
  String dbname;

  UpdateOptions opt = new UpdateOptions().upsert(true);

  private MongoDatabase mdb;
  private GridFSBucket gridFSBucket;
  private MongoCollection<Document> coll;


  private synchronized MongoDatabase getDb() {
    if (mdb == null) {
      mdb = mongo.getDatabase(dbname);
    }
    return mdb;
  }


  private synchronized GridFSBucket getGridFSBucket() {
    if (gridFSBucket == null) {
      gridFSBucket = GridFSBuckets.create(getDb());
    }
    return gridFSBucket;
  }


  private synchronized MongoCollection<Document> getCollection() {
    if (coll == null) {
      coll = getDb().getCollection(FS_COLL);
    }
    return coll;
  }


  MongoArchiveStore1(String uri) {
    // "mongodb://user:pass@host:port/db"
    MongoClientURI u = new MongoClientURI(uri);
    dbname = u.getDatabase();
    if (dbname == null) {
      dbname = "csip";
    }
    mongo = new MongoClient(u);
    Config.LOG.info("Connected to archive csip store : " + uri);
  }


  @Override
  public synchronized boolean isAvailable() {
    try {
      Document d = getDb().runCommand(ping);
      Config.LOG.info(d.toString());
      return true;
    } catch (Exception E) {
      Config.LOG.info("Archive unavailable.");
      return false;
    }
  }


  @Override
  public synchronized void archiveSession(String suid, ModelArchive ma, File file) throws Exception {
    //Let's store the standard data in regular collection
    MongoCollection<Document> c = getCollection();

    String max = Config.getString(Config.CSIP_ARCHIVE_MAX_FILE_SIZE);
    long limit = Binaries.parseByteSize(max);

    // cap it hard at 1GB, probably still too big for Mongo
    if (limit > _1GB) {
      limit = _1GB;
    }

    String filename;
    long filesize;
    InputStream is;
    String contentType;

    if (file.length() <= limit) {
      filename = file.getName();
      filesize = file.length();
      is = new FileInputStream(file);
      contentType = "application/zip";
    } else {
      filename = suid + ".txt";
      filesize = ARCHIVE_TOO_BIG.length();
      is = new ByteArrayInputStream(ARCHIVE_TOO_BIG.getBytes());
      contentType = "text/plain";
    }

    // Let's query to ensure ID does not already exist in Mongo
    // if it does, we will alert the user
    FindIterable<Document> r = c.find(new Document("_id", suid));
    Document o = r.first();
    if (o == null) {
      // Build our document and add all the fields
      Document doc = new Document();
      doc.append("_id", suid);
      doc.append("service", ma.getService());
      doc.append("status", ma.getStatus());
      doc.append("ctime", ma.getCtime());
      doc.append("etime", ma.getEtime());
      doc.append("req_ip", ma.getReqIP());
      doc.append("filename", ARCHIVE_PREFIX + filename);
      doc.append("filelength", filesize);

      //insert the document into the collection
      c.replaceOne(new Document("_id", suid), doc, opt);

      GridFSBucket gfs = getGridFSBucket();
      GridFSUploadOptions options = new GridFSUploadOptions().metadata(new Document("contentType", contentType));
      gfs.uploadFromStream(new BsonString(suid), ARCHIVE_PREFIX + filename, is, options);
    } else {
      Config.LOG.warning("Unable to archive record with ID: " + suid + " as record already exists!!!");
    }
    is.close();
  }


  @Override
  public synchronized ModelArchive getArchive(String suid) throws Exception {
    FindIterable<Document> r = getCollection().find(new Document("_id", suid));
    Document doc = r.first();
    if (doc != null) {
      return new ModelArchive(
          doc.getString("ctime"),
          doc.getString("etime"),
          doc.getString("service"),
          doc.getString("status"),
          doc.getString("req_ip"));
    }
    return null;
  }


  @Override
  public synchronized void removeArchive(String suid) {
    DeleteResult r = getCollection().deleteOne(new Document("_id", suid));
    if (r.getDeletedCount() != 1) {
      throw new RuntimeException("Not deleted: " + suid);
    }
    getGridFSBucket().delete(new BsonString(suid));
  }


  @Override
  public synchronized byte[] getFile(String suid) throws Exception {
    GridFSBucket gfs = getGridFSBucket();
    try (GridFSDownloadStream stream = gfs.openDownloadStream(new BsonString(suid))) {
      long fileLength = stream.getGridFSFile().getLength();
      // this should not happen, since it is capped at 1GB.
      if (fileLength > Integer.MAX_VALUE) {
        return ARCHIVE_TOO_BIG.getBytes();
      }
      return IOUtils.toByteArray(stream, fileLength);
    }
  }


  @Override
  public synchronized void close() throws Exception {
    mongo.close();
  }


  @Override
  public synchronized long getCount() {
    return getCollection().count();
  }


  // new keys implementation.
  public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
    MongoCollection<Document> c = getCollection();

    if (sortby == null && skip == 0 && limit == Integer.MAX_VALUE) {
      List<Document> d = c.find().projection(new Document("_id", 1)).into(new ArrayList<>());
      Set<String> s = d.stream().map(m -> m.getString("_id")).collect(Collectors.toSet());
      return s;
    }

    Set<String> l = new LinkedHashSet<>();
    if (sortby == null) {
      for (Document doc : c.find().skip(skip).limit(limit)) {
        l.add(doc.get("_id", String.class));
      }
      return l;
    }

    Document sort = new Document(sortby, sortAsc ? 1 : -1);
    for (Document doc : c.find().sort(sort).skip(skip).limit(limit)) {
      l.add(doc.get("_id", String.class));
    }
    return l;
  }


  @Override
  public synchronized boolean hasArchive(String suid) throws Exception {
    return getCollection().count(new Document("_id", suid)) == 1;
  }


  public void f() {
    MongoCollection<Document> c = getCollection();
    List<Document> l = c.find().projection(new Document("_id", 1)).into(new ArrayList<>());
    Set<String> s = l.stream().map(m -> m.getString("_id")).collect(Collectors.toSet());
    System.out.println(l.get(0).getString("_id"));
    System.out.println(s);
  }

}