MongoArchiveStore.java [src/csip] Revision: 71821307bfe742c00c6dc582c171224a9ac59935 Date: Fri Apr 21 11:46:19 MDT 2017
/*
* $Id$
*
* This file is part of the Cloud Services Integration Platform (CSIP),
* a Model-as-a-Service framework, API and application suite.
*
* 2012-2017, Olaf David and others, OMSLab, Colorado State University.
*
* OMSLab licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package csip;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.WriteResult;
import com.mongodb.gridfs.GridFS;
import com.mongodb.gridfs.GridFSDBFile;
import com.mongodb.gridfs.GridFSInputFile;
import csip.utils.Binaries;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.LinkedHashSet;
import java.util.Set;
/**
*
*/
class MongoArchiveStore implements ArchiveStore {
MongoClient mongo;
String dbname;
static final String fsColl = "fs";
MongoArchiveStore(String uri) {
// "mongodb://user:pass@host:port/db"
MongoClientURI u = new MongoClientURI(uri);
dbname = u.getDatabase();
if (dbname == null) {
dbname = "csip";
}
mongo = new MongoClient(u);
Config.LOG.info("Connected to archive csip store : " + uri);
} // "mongodb://user:pass@host:port/db"
public synchronized void removeAll() {
DB db = mongo.getDB(dbname);
//Let's store the standard data in regular collection
DBCollection collection = db.getCollection(fsColl);
collection.remove(new BasicDBObject());
// remove all file in the bucket
GridFS fileStore = new GridFS(db, fsColl);
fileStore.remove((DBObject) null);
}
@Override
public synchronized void archiveSession(String sid, ModelArchive ma, File f) throws Exception {
DB db = mongo.getDB(dbname);
//Let's store the standard data in regular collection
DBCollection collection = db.getCollection(fsColl);
String max = Config.getString(Config.CSIP_ARCHIVE_MAX_FILE_SIZE);
long limit = Binaries.parseByteSize(max);
String filename = null;
long filesize = 0;
InputStream is = null;
if (f.length() <= limit) {
filename = f.getName();
filesize = f.length();
is = new FileInputStream(f);
} else {
filename = sid + ".txt";
filesize = ARCHIVE_TOO_BIG.length();
is = new ByteArrayInputStream(ARCHIVE_TOO_BIG.getBytes());
}
// Let's query to ensure ID does not already exist in Mongo
// if it does, we will alert the user
BasicDBObject query = new BasicDBObject("_id", sid);
DBCursor cursor = collection.find(query);
if (!cursor.hasNext()) {
// Build our document and add all the fields
BasicDBObject doc = new BasicDBObject();
doc.append("_id", sid);
doc.append("service", ma.getService());
doc.append("status", ma.getStatus());
doc.append("ctime", ma.getCtime());
doc.append("etime", ma.getEtime());
doc.append("req_ip", ma.getReqIP());
doc.append("filename", "files-" + filename);
doc.append("filelength", filesize);
//insert the document into the collection
collection.insert(doc);
// Now let's store the binary file data using filestore GridFS
GridFS fileStore = new GridFS(db, fsColl);
GridFSInputFile file = fileStore.createFile(is);
file.setId(sid);
file.setFilename("files-" + filename);
file.save();
} else {
Config.LOG.warning("Unable to archive record with ID: " + sid + " as record already exists!!!");
}
}
@Override
public synchronized ModelArchive getArchive(String suid) throws Exception {
DB db = mongo.getDB(dbname);
DBCollection collection = db.getCollection(fsColl);
BasicDBObject findQuery = new BasicDBObject("_id", suid);
DBObject doc = collection.findOne(findQuery);
if (doc != null) {
ModelArchive ma = new ModelArchive(
doc.get("ctime").toString(),
doc.get("etime").toString(),
doc.get("service").toString(),
doc.get("status").toString(),
doc.get("req_ip").toString());
return ma;
}
return null;
}
@Override
public synchronized void removeArchive(String suid) {
DB db = mongo.getDB(dbname);
DBCollection collection = db.getCollection(fsColl);
BasicDBObject q = new BasicDBObject("_id", suid);
WriteResult res = collection.remove(q);
GridFS fileStore = new GridFS(db, fsColl);
fileStore.remove(q);
}
@Override
public synchronized byte[] getFile(String suid, String filename) throws Exception {
DB db = mongo.getDB(dbname);
GridFS fileStore = new GridFS(db, fsColl);
BasicDBObject query = new BasicDBObject("_id", suid);
GridFSDBFile gridFile = fileStore.findOne(query);
if (gridFile != null) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
gridFile.writeTo(out);
return out.toByteArray();
}
return null;
}
@Override
public synchronized void shutdown() throws Exception {
mongo.close();
}
@Override
public synchronized long getCount() {
DB db = mongo.getDB(dbname);
return db.getCollection(fsColl).count();
}
// new keys implementation.
public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
if (sortby == null) {
sortby = "ctime";
sortAsc = false;
}
BasicDBObject sort = new BasicDBObject(sortby, sortAsc ? 1 : -1);
Set<String> l = new LinkedHashSet<>();
DB db = mongo.getDB(dbname);
DBCollection c = db.getCollection(fsColl);
DBCursor cursor = c.find().sort(sort).skip(skip).limit(limit);
while (cursor.hasNext()) {
DBObject o = cursor.next();
l.add(o.get("_id").toString());
}
return l;
}
@Override
public synchronized boolean hasArchive(String suid) throws Exception {
DB db = mongo.getDB(dbname);
DBCollection c = db.getCollection(fsColl);
return c.count(new BasicDBObject("_id", suid)) == 1;
}
}