MongoArchiveStore1.java [src/csip] Revision: beaf35d680e39fda49d78cf7e170b55dc5710c27  Date: Tue Apr 25 16:47:08 MDT 2017
/*
 * $Id$
 *
 * This file is part of the Cloud Services Integration Platform (CSIP),
 * a Model-as-a-Service framework, API and application suite.
 *
 * 2012-2017, Olaf David and others, OMSLab, Colorado State University.
 *
 * OMSLab licenses this file to you under the MIT license.
 * See the LICENSE file in the project root for more information.
 */
package csip;

import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.GridFSBuckets;
import com.mongodb.client.gridfs.GridFSDownloadStream;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.DeleteResult;
import csip.utils.Binaries;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.LinkedHashSet;
import java.util.Set;
import org.bson.BsonString;
import org.bson.Document;

/**
 *
 */
class MongoArchiveStore1 implements ArchiveStore {

    MongoClient mongo;
    String dbname;
    static final String fsColl = "fs";

    UpdateOptions opt = new UpdateOptions();


    MongoArchiveStore1(String uri) {
        // "mongodb://user:pass@host:port/db"
        MongoClientURI u = new MongoClientURI(uri);
        dbname = u.getDatabase();
        if (dbname == null) {
            dbname = "csip";
        }
        mongo = new MongoClient(u);
        Config.LOG.info("Connected to archive csip store : " + uri);
        opt.upsert(true);
    } // "mongodb://user:pass@host:port/db"


    @Override
    public synchronized void archiveSession(String suid, ModelArchive ma, File file) throws Exception {
        MongoDatabase db = mongo.getDatabase(dbname);
        //Let's store the standard data in regular collection
        MongoCollection<Document> c = db.getCollection(fsColl);

        String max = Config.getString(Config.CSIP_ARCHIVE_MAX_FILE_SIZE);
        long limit = Binaries.parseByteSize(max);
        String filename = null;
        long filesize = 0;
        InputStream is = null;

        if (file.length() <= limit) {
            filename = file.getName();
            filesize = file.length();
            is = new FileInputStream(file);
        } else {
            filename = suid + ".txt";
            filesize = ARCHIVE_TOO_BIG.length();
            is = new ByteArrayInputStream(ARCHIVE_TOO_BIG.getBytes());
        }

        // Let's query to ensure ID does not already exist in Mongo
        // if it does, we will alert the user
        FindIterable<Document> r = c.find(new Document("_id", suid));
        Document o = r.first();
        if (o == null) {
            // Build our document and add all the fields
            Document doc = new Document();
            doc.append("_id", suid);
            doc.append("service", ma.getService());
            doc.append("status", ma.getStatus());
            doc.append("ctime", ma.getCtime());
            doc.append("etime", ma.getEtime());
            doc.append("req_ip", ma.getReqIP());
            doc.append("filename", "files-" + filename);
            doc.append("filelength", filesize);

            //insert the document into the collection
            c.replaceOne(new Document("_id", suid), doc, opt);

            GridFSBucket gridFSBucket = GridFSBuckets.create(db);
            gridFSBucket.uploadFromStream(new BsonString(suid), "files-" + filename, is);
            is.close();
        } else {
            Config.LOG.warning("Unable to archive record with ID: " + suid + " as record already exists!!!");
        }
    }


    @Override
    public synchronized ModelArchive getArchive(String suid) throws Exception {
        MongoDatabase db = mongo.getDatabase(dbname);
        FindIterable<Document> r = db.getCollection(fsColl).find(new Document("_id", suid));
        Document doc = r.first();
        if (doc != null) {
            return new ModelArchive(
                    doc.getString("ctime"),
                    doc.getString("etime"),
                    doc.getString("service"),
                    doc.getString("status"),
                    doc.getString("req_ip"));
        }
        return null;
    }


    @Override
    public synchronized void removeArchive(String suid) {
        MongoDatabase db = mongo.getDatabase(dbname);
        Document q = new Document("_id", suid);
        DeleteResult r = db.getCollection(fsColl).deleteOne(q);
        if (r.getDeletedCount() != 1) {
            throw new RuntimeException("Not deleted: " + suid);
        }
        GridFSBucket gridFSBucket = GridFSBuckets.create(db);
        gridFSBucket.delete(new BsonString(suid));
    }


    @Override
    public synchronized byte[] getFile(String suid, String filename) throws Exception {
        MongoDatabase db = mongo.getDatabase(dbname);
        GridFSBucket gridFSBucket = GridFSBuckets.create(db);
        byte[] file;
        try (GridFSDownloadStream stream = gridFSBucket.openDownloadStream(new BsonString(suid))) {
            long fileLength = stream.getGridFSFile().getLength();
            file = new byte[(int) fileLength];
            stream.read(file);
        }
        return file;
    }


    @Override
    public synchronized void shutdown() throws Exception {
        mongo.close();
    }


    @Override
    public synchronized long getCount() {
        MongoDatabase db = mongo.getDatabase(dbname);
        return db.getCollection(fsColl).count();
    }


    // new keys implementation.
    public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
        if (sortby == null) {
            sortby = "ctime";
            sortAsc = false;
        }
        Document sort = new Document(sortby, sortAsc ? 1 : -1);
        Set<String> l = new LinkedHashSet<>();
        MongoDatabase db = mongo.getDatabase(dbname);
        MongoCollection<Document> c = db.getCollection(fsColl);
        for (Document doc : c.find().sort(sort).skip(skip).limit(limit)) {
            l.add(doc.get("_id", String.class));
        }
        return l;
    }


    @Override
    public synchronized boolean hasArchive(String suid) throws Exception {
        MongoDatabase db = mongo.getDatabase(dbname);
        return db.getCollection(fsColl).count(new Document("_id", suid)) == 1;
    }

}