MongoArchiveStore1.java [src/csip] Revision: beaf35d680e39fda49d78cf7e170b55dc5710c27 Date: Tue Apr 25 16:47:08 MDT 2017
/*
* $Id$
*
* This file is part of the Cloud Services Integration Platform (CSIP),
* a Model-as-a-Service framework, API and application suite.
*
* 2012-2017, Olaf David and others, OMSLab, Colorado State University.
*
* OMSLab licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package csip;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.GridFSBuckets;
import com.mongodb.client.gridfs.GridFSDownloadStream;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.DeleteResult;
import csip.utils.Binaries;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.LinkedHashSet;
import java.util.Set;
import org.bson.BsonString;
import org.bson.Document;
/**
*
*/
class MongoArchiveStore1 implements ArchiveStore {
MongoClient mongo;
String dbname;
static final String fsColl = "fs";
UpdateOptions opt = new UpdateOptions();
MongoArchiveStore1(String uri) {
// "mongodb://user:pass@host:port/db"
MongoClientURI u = new MongoClientURI(uri);
dbname = u.getDatabase();
if (dbname == null) {
dbname = "csip";
}
mongo = new MongoClient(u);
Config.LOG.info("Connected to archive csip store : " + uri);
opt.upsert(true);
} // "mongodb://user:pass@host:port/db"
@Override
public synchronized void archiveSession(String suid, ModelArchive ma, File file) throws Exception {
MongoDatabase db = mongo.getDatabase(dbname);
//Let's store the standard data in regular collection
MongoCollection<Document> c = db.getCollection(fsColl);
String max = Config.getString(Config.CSIP_ARCHIVE_MAX_FILE_SIZE);
long limit = Binaries.parseByteSize(max);
String filename = null;
long filesize = 0;
InputStream is = null;
if (file.length() <= limit) {
filename = file.getName();
filesize = file.length();
is = new FileInputStream(file);
} else {
filename = suid + ".txt";
filesize = ARCHIVE_TOO_BIG.length();
is = new ByteArrayInputStream(ARCHIVE_TOO_BIG.getBytes());
}
// Let's query to ensure ID does not already exist in Mongo
// if it does, we will alert the user
FindIterable<Document> r = c.find(new Document("_id", suid));
Document o = r.first();
if (o == null) {
// Build our document and add all the fields
Document doc = new Document();
doc.append("_id", suid);
doc.append("service", ma.getService());
doc.append("status", ma.getStatus());
doc.append("ctime", ma.getCtime());
doc.append("etime", ma.getEtime());
doc.append("req_ip", ma.getReqIP());
doc.append("filename", "files-" + filename);
doc.append("filelength", filesize);
//insert the document into the collection
c.replaceOne(new Document("_id", suid), doc, opt);
GridFSBucket gridFSBucket = GridFSBuckets.create(db);
gridFSBucket.uploadFromStream(new BsonString(suid), "files-" + filename, is);
is.close();
} else {
Config.LOG.warning("Unable to archive record with ID: " + suid + " as record already exists!!!");
}
}
@Override
public synchronized ModelArchive getArchive(String suid) throws Exception {
MongoDatabase db = mongo.getDatabase(dbname);
FindIterable<Document> r = db.getCollection(fsColl).find(new Document("_id", suid));
Document doc = r.first();
if (doc != null) {
return new ModelArchive(
doc.getString("ctime"),
doc.getString("etime"),
doc.getString("service"),
doc.getString("status"),
doc.getString("req_ip"));
}
return null;
}
@Override
public synchronized void removeArchive(String suid) {
MongoDatabase db = mongo.getDatabase(dbname);
Document q = new Document("_id", suid);
DeleteResult r = db.getCollection(fsColl).deleteOne(q);
if (r.getDeletedCount() != 1) {
throw new RuntimeException("Not deleted: " + suid);
}
GridFSBucket gridFSBucket = GridFSBuckets.create(db);
gridFSBucket.delete(new BsonString(suid));
}
@Override
public synchronized byte[] getFile(String suid, String filename) throws Exception {
MongoDatabase db = mongo.getDatabase(dbname);
GridFSBucket gridFSBucket = GridFSBuckets.create(db);
byte[] file;
try (GridFSDownloadStream stream = gridFSBucket.openDownloadStream(new BsonString(suid))) {
long fileLength = stream.getGridFSFile().getLength();
file = new byte[(int) fileLength];
stream.read(file);
}
return file;
}
@Override
public synchronized void shutdown() throws Exception {
mongo.close();
}
@Override
public synchronized long getCount() {
MongoDatabase db = mongo.getDatabase(dbname);
return db.getCollection(fsColl).count();
}
// new keys implementation.
public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
if (sortby == null) {
sortby = "ctime";
sortAsc = false;
}
Document sort = new Document(sortby, sortAsc ? 1 : -1);
Set<String> l = new LinkedHashSet<>();
MongoDatabase db = mongo.getDatabase(dbname);
MongoCollection<Document> c = db.getCollection(fsColl);
for (Document doc : c.find().sort(sort).skip(skip).limit(limit)) {
l.add(doc.get("_id", String.class));
}
return l;
}
@Override
public synchronized boolean hasArchive(String suid) throws Exception {
MongoDatabase db = mongo.getDatabase(dbname);
return db.getCollection(fsColl).count(new Document("_id", suid)) == 1;
}
}