MongoArchiveStore1.java [src/csip] Revision: Date:
/*
* $Id$
*
* This file is part of the Cloud Services Integration Platform (CSIP),
* a Model-as-a-Service framework, API and application suite.
*
* 2012-2022, Olaf David and others, OMSLab, Colorado State University.
*
* OMSLab licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package csip;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.GridFSBuckets;
import com.mongodb.client.gridfs.GridFSDownloadStream;
import com.mongodb.client.gridfs.model.GridFSUploadOptions;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.DeleteResult;
import csip.utils.Binaries;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonString;
import org.bson.Document;
/**
*
*/
class MongoArchiveStore1 implements ArchiveStore {
static final String FS_COLL = "fs";
static final BsonDocument ping = new BsonDocument("ping", new BsonInt32(1));
static final long _1GB = (long) 1024 * 1024 * 1024;
MongoClient mongo;
String dbname;
UpdateOptions opt = new UpdateOptions().upsert(true);
private MongoDatabase mdb;
private GridFSBucket gridFSBucket;
private MongoCollection<Document> coll;
private synchronized MongoDatabase getDb() {
if (mdb == null)
mdb = mongo.getDatabase(dbname);
return mdb;
}
private synchronized GridFSBucket getGridFSBucket() {
if (gridFSBucket == null)
gridFSBucket = GridFSBuckets.create(getDb());
return gridFSBucket;
}
private synchronized MongoCollection<Document> getCollection() {
if (coll == null)
coll = getDb().getCollection(FS_COLL);
return coll;
}
MongoArchiveStore1(String uri) {
// "mongodb://user:pass@host:port/db"
MongoClientURI u = new MongoClientURI(uri);
dbname = u.getDatabase();
if (dbname == null)
dbname = "csip";
mongo = new MongoClient(u);
Config.LOG.info("Connected to archive csip store : " + uri);
}
@Override
public synchronized boolean isAvailable() {
try {
Document d = getDb().runCommand(ping);
Config.LOG.info(d.toString());
return true;
} catch (Exception E) {
Config.LOG.info("Archive unavailable.");
return false;
}
}
@Override
public synchronized void archiveSession(String suid, ModelArchive ma, File file) throws Exception {
//Let's store the standard data in regular collection
MongoCollection<Document> c = getCollection();
String max = Config.getString(Config.CSIP_ARCHIVE_MAX_FILE_SIZE);
long limit = Binaries.parseByteSize(max);
// cap it hard at 1GB, probably still too big for Mongo
if (limit > _1GB)
limit = _1GB;
String filename;
long filesize;
InputStream is;
String contentType;
if (file.length() <= limit) {
filename = file.getName();
filesize = file.length();
is = new FileInputStream(file);
contentType = "application/zip";
} else {
filename = suid + ".txt";
filesize = ARCHIVE_TOO_BIG.length();
is = new ByteArrayInputStream(ARCHIVE_TOO_BIG.getBytes());
contentType = "text/plain";
}
// Let's query to ensure ID does not already exist in Mongo
// if it does, we will alert the user
FindIterable<Document> r = c.find(new Document("_id", suid));
Document o = r.first();
if (o == null) {
// Build our document and add all the fields
Document doc = new Document();
doc.append("_id", suid);
doc.append("service", ma.getService());
doc.append("status", ma.getStatus());
doc.append("ctime", ma.getCtime());
doc.append("etime", ma.getEtime());
doc.append("req_ip", ma.getReqIP());
doc.append("filename", ARCHIVE_PREFIX + filename);
doc.append("filelength", filesize);
//insert the document into the collection
c.replaceOne(new Document("_id", suid), doc, opt);
GridFSBucket gfs = getGridFSBucket();
GridFSUploadOptions options = new GridFSUploadOptions().metadata(new Document("contentType", contentType));
gfs.uploadFromStream(new BsonString(suid), ARCHIVE_PREFIX + filename, is, options);
} else {
Config.LOG.warning("Unable to archive record with ID: " + suid + " as record already exists!!!");
}
is.close();
}
@Override
public synchronized ModelArchive getArchive(String suid) throws Exception {
FindIterable<Document> r = getCollection().find(new Document("_id", suid));
Document doc = r.first();
if (doc != null) {
return new ModelArchive(
doc.getString("ctime"),
doc.getString("etime"),
doc.getString("service"),
doc.getString("status"),
doc.getString("req_ip"));
}
return null;
}
@Override
public synchronized void removeArchive(String suid) {
DeleteResult r = getCollection().deleteOne(new Document("_id", suid));
if (r.getDeletedCount() != 1)
throw new RuntimeException("Not deleted: " + suid);
getGridFSBucket().delete(new BsonString(suid));
}
@Override
public synchronized byte[] getFile(String suid) throws Exception {
GridFSBucket gfs = getGridFSBucket();
try (GridFSDownloadStream stream = gfs.openDownloadStream(new BsonString(suid))) {
long fileLength = stream.getGridFSFile().getLength();
// this should not happen, since it is capped at 1GB.
if (fileLength > Integer.MAX_VALUE)
return ARCHIVE_TOO_BIG.getBytes();
return IOUtils.toByteArray(stream, fileLength);
}
}
@Override
public synchronized void close() throws Exception {
mongo.close();
}
@Override
public synchronized long getCount() {
return getCollection().count();
}
// new keys implementation.
public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
MongoCollection<Document> c = getCollection();
if (sortby == null && skip == 0 && limit == Integer.MAX_VALUE) {
List<Document> d = c.find().projection(new Document("_id", 1)).into(new ArrayList<>());
Set<String> s = d.stream().map(m -> m.getString("_id")).collect(Collectors.toSet());
return s;
}
Set<String> l = new LinkedHashSet<>();
if (sortby == null) {
for (Document doc : c.find().skip(skip).limit(limit)) {
l.add(doc.get("_id", String.class));
}
return l;
}
Document sort = new Document(sortby, sortAsc ? 1 : -1);
for (Document doc : c.find().sort(sort).skip(skip).limit(limit)) {
l.add(doc.get("_id", String.class));
}
return l;
}
@Override
public synchronized boolean hasArchive(String suid) throws Exception {
return getCollection().count(new Document("_id", suid)) == 1;
}
public void f() {
MongoCollection<Document> c = getCollection();
List<Document> l = c.find().projection(new Document("_id", 1)).into(new ArrayList<>());
Set<String> s = l.stream().map(m -> m.getString("_id")).collect(Collectors.toSet());
System.out.println(l.get(0).getString("_id"));
System.out.println(s);
}
}