MongoArchive.java [tools/MetaModelTools/src/archives] Revision: 04954fb50d3148fbace9e091840b77662ab98631 Date: Fri Nov 22 12:54:54 MST 2019
/*
* $Id$
*
* This file is part of the Cloud Services Integration Platform (CSIP),
* a Model-as-a-Service framework, API, and application suite.
*
* 2012-2019, OMSLab, Colorado State University.
*
* OMSLab licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package archives;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.GridFSBuckets;
import com.mongodb.client.gridfs.GridFSDownloadStream;
import static com.mongodb.client.model.Filters.and;
import static com.mongodb.client.model.Filters.eq;
import com.mongodb.client.model.UpdateOptions;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import models.WEPSModelArchive;
import org.apache.commons.io.IOUtils;
import org.bson.BsonString;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.codehaus.jettison.json.JSONObject;
import models.ModelArchive;
import models.ModelArchiveFactory;
/**
*
* @author <a href="mailto:shaun.case@colostate.edu">Shaun Case</a>
*/
public class MongoArchive implements ServiceArchive {
static String ARCHIVE_TOO_BIG = "the archive was too big to store.";
MongoClient mongo;
String dbname;
static final String FS_COLL = "fs";
UpdateOptions opt = new UpdateOptions();
public MongoArchive(String uri) throws Exception {
// "mongodb://user:pass@host:port/db"
if (uri.contains("mongodb://")) {
MongoClientURI u = new MongoClientURI(uri);
dbname = u.getDatabase();
if (dbname == null) {
dbname = "csip";
}
mongo = new MongoClient(u);
opt.upsert(true);
} else {
throw new Exception("No mongodb location specified");
}
} // "mongodb://user:pass@host:port/db"// "mongodb://user:pass@host:port/db"
@Override
public synchronized ArrayList<ModelArchive> getArchiveByReqIP(String ip, String service) throws Exception {
ArrayList<ModelArchive> ret_val = new ArrayList<>();
MongoDatabase db = mongo.getDatabase(dbname);
FindIterable<Document> r = db.getCollection(FS_COLL).find(eq("req_ip", ip));
r.sort(new Document("ctime", 1));
for (Document doc : r) {
if (doc != null) {
if (doc.getString("service").equalsIgnoreCase(service)) {
ret_val.add(new ModelArchive(
doc.getString("_id"),
doc.getString("ctime"),
doc.getString("etime"),
doc.getString("service"),
doc.getString("status"),
doc.getString("req_ip"),
doc.getString("filename")
)
);
}
}
}
return ret_val;
}
@Override
public synchronized ModelArchive getArchiveBySUID(String suid) throws Exception {
MongoDatabase db = mongo.getDatabase(dbname);
FindIterable<Document> r = db.getCollection(FS_COLL).find(new Document("_id", suid));
Document doc = r.first();
if (doc != null) {
return new ModelArchive(
doc.getString("_id"),
doc.getString("ctime"),
doc.getString("etime"),
doc.getString("service"),
doc.getString("status"),
doc.getString("req_ip"),
doc.getString("filename")
);
}
return null;
}
private synchronized byte[] getFile(String suid) throws Exception {
MongoDatabase db = mongo.getDatabase(dbname);
GridFSBucket gridFSBucket = GridFSBuckets.create(db);
try (GridFSDownloadStream stream = gridFSBucket.openDownloadStream(new BsonString(suid))) {
long fileLength = stream.getGridFSFile().getLength();
// this should not happen, since it is capped at 1GB.
if (fileLength > Integer.MAX_VALUE) {
return ARCHIVE_TOO_BIG.getBytes();
}
return IOUtils.toByteArray(stream, fileLength);
}
}
@Override
public synchronized JSONObject getServiceRequest(String suid) throws Exception {
JSONObject requestData = null;
byte[] zFile = getFile(suid);
try (ZipInputStream zin = new ZipInputStream(new ByteArrayInputStream(zFile))) {
ZipEntry entry;
while ((entry = zin.getNextEntry()) != null) {
if (entry.getName().contains(".request")) {
BufferedReader bReader = new BufferedReader(new InputStreamReader(zin));
StringBuilder fileContent = new StringBuilder();
String inputStr;
while ((inputStr = bReader.readLine()) != null) {
fileContent.append(inputStr).append(System.lineSeparator());
}
requestData = new JSONObject(fileContent.toString());
break;
}
}
}
return requestData;
}
@Override
public synchronized JSONObject getServiceResponse(String suid) throws Exception {
JSONObject requestData = null;
byte[] zFile = getFile(suid);
try (ZipInputStream zin = new ZipInputStream(new ByteArrayInputStream(zFile))) {
ZipEntry entry;
while ((entry = zin.getNextEntry()) != null) {
if (entry.getName().contains(".response")) {
BufferedReader bReader = new BufferedReader(new InputStreamReader(zin));
StringBuilder fileContent = new StringBuilder();
String inputStr;
while ((inputStr = bReader.readLine()) != null) {
fileContent.append(inputStr).append(System.lineSeparator());
}
requestData = new JSONObject(fileContent.toString());
break;
}
}
}
return requestData;
}
@Override
public synchronized void shutdown() throws Exception {
mongo.close();
}
@Override
public synchronized long getCount() {
MongoDatabase db = mongo.getDatabase(dbname);
return db.getCollection(FS_COLL).count();
}
// new keys implementation.
@Override
public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
if (sortby == null) {
sortby = "ctime";
sortAsc = false;
}
Document sort = new Document(sortby, sortAsc ? 1 : -1);
Set<String> l = new LinkedHashSet<>();
MongoDatabase db = mongo.getDatabase(dbname);
MongoCollection<Document> c = db.getCollection(FS_COLL);
for (Document doc : c.find().sort(sort).skip(skip).limit(limit)) {
l.add(doc.get("_id", String.class
));
}
return l;
}
@Override
public synchronized boolean hasArchive(String suid) throws Exception {
MongoDatabase db = mongo.getDatabase(dbname);
return db.getCollection(FS_COLL).count(new Document("_id", suid)) == 1;
}
@Override
public ArrayList<ModelArchive> getArchivesByFilters(Iterable<Bson> filters, int limit) throws Exception {
ArrayList<ModelArchive> ret_val = new ArrayList<>();
MongoDatabase db = mongo.getDatabase(dbname);
FindIterable<Document> c = db.getCollection(FS_COLL).find(and(filters)).sort(new Document("ctime", -1));
MongoCursor<Document> cursor = c.iterator();
int count = 0;
while (cursor.hasNext()) {
Document doc = cursor.next();
if (null != doc) {
ModelArchive model = null;
byte[] fileData = getFile(doc.getString("_id"));
if (doc.getString("service").contains("weps/5.0")) {
model = new WEPSModelArchive(doc.getString("_id"),
doc.getString("ctime"),
doc.getString("etime"),
doc.getString("service"),
doc.getString("status"),
doc.getString("req_ip"),
doc.getString("filename"),
fileData
);
model.saveFileData(fileData);
}
fileData = null; // Allow garbage collection of the byte array.
ret_val.add(model);
count++;
// Reached our limit yet?
if ((-1 != limit) && (count >= limit)) {
break;
}
}
}
System.gc(); // We've read a lot of data and dynamically allocated a lot of space, some of which was returned...force some garbage collection.
return ret_val;
}
@Override
public ArrayList<ModelArchive> getArchivesByFilter(String key, String value, int limit) throws Exception {
ArrayList<ModelArchive> ret_val = new ArrayList<>();
MongoDatabase db = mongo.getDatabase(dbname);
FindIterable<Document> c = db.getCollection(FS_COLL).find(and(eq(key, value), eq("status", "Finished"))).sort(new Document("ctime", -1));
MongoCursor<Document> cursor = c.iterator();
int count = 0;
while (cursor.hasNext()) {
Document doc = cursor.next();
if ((null != doc) && (doc.getString(key).equalsIgnoreCase(value))) {
ModelArchive model = null;
byte[] fileData = null;
String serviceName = doc.getString("service");
model = ModelArchiveFactory.getModelArchive(serviceName, doc);
if (null != model) {
fileData = getFile(doc.getString("_id"));
model.setFileDataEx(fileData);
model.saveFileData(fileData);
fileData = null; // Allow garbage collection of the byte array.
if (null != model) {
ret_val.add(model);
count++;
}
}
// Reached our limit yet?
if ((-1 != limit) && (count >= limit)) {
break;
}
}
}
System.gc(); // We've read a lot of data and dynamically allocated a lot of space, some of which was returned...force some garbage collection.
return ret_val;
}
@Override
public ArrayList<String> getSUIDsByFilter(String key, String value
) {
ArrayList<String> ret_val = new ArrayList<>();
MongoDatabase db = mongo.getDatabase(dbname);
FindIterable<Document> c = db.getCollection(FS_COLL).find(eq(key, value)).sort(new Document("ctime", 1));
MongoCursor<Document> cursor = c.iterator();
while (cursor.hasNext()) {
Document doc = cursor.next();
if (null != doc) {
ret_val.add(doc.getString("_id"));
}
}
return ret_val;
}
}