MongoArchive.java [src/cokeyconverter] Revision: default Date:
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package cokeyconverter;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.GridFSBuckets;
import com.mongodb.client.gridfs.GridFSDownloadStream;
import static com.mongodb.client.model.Filters.eq;
import com.mongodb.client.model.UpdateOptions;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.commons.io.IOUtils;
import org.bson.BsonString;
import org.bson.Document;
import org.codehaus.jettison.json.JSONObject;
/**
*
* @author <a href="mailto:shaun.case@colostate.edu">Shaun Case</a>
*/
public class MongoArchive implements ServiceArchive {
static String ARCHIVE_TOO_BIG = "the archive was too big to store.";
MongoClient mongo;
String dbname;
static final String FS_COLL = "fs";
UpdateOptions opt = new UpdateOptions();
MongoArchive(String uri) throws Exception {
// "mongodb://user:pass@host:port/db"
if (uri.contains("mongodb://")) {
MongoClientURI u = new MongoClientURI(uri);
dbname = u.getDatabase();
if (dbname == null) {
dbname = "csip";
}
mongo = new MongoClient(u);
opt.upsert(true);
} else {
throw new Exception("No mongodb location specified");
}
} // "mongodb://user:pass@host:port/db"// "mongodb://user:pass@host:port/db"
@Override
public synchronized ArrayList<ModelArchive> getArchiveByReqIP(String ip, String service) throws Exception {
ArrayList<ModelArchive> ret_val = new ArrayList<>();
MongoDatabase db = mongo.getDatabase(dbname);
FindIterable<Document> r = db.getCollection(FS_COLL).find(eq("req_ip", ip));
r.sort(new Document("ctime", 1));
for (Document doc : r) {
if (doc != null) {
if (doc.getString("service").equalsIgnoreCase(service)) {
ret_val.add(new ModelArchive(
doc.getString("_id"),
doc.getString("ctime"),
doc.getString("etime"),
doc.getString("service"),
doc.getString("status"),
doc.getString("req_ip"),
doc.getString("filename")
)
);
}
}
}
return ret_val;
}
@Override
public synchronized ModelArchive getArchiveBySUID(String suid) throws Exception {
MongoDatabase db = mongo.getDatabase(dbname);
FindIterable<Document> r = db.getCollection(FS_COLL).find(new Document("_id", suid));
Document doc = r.first();
if (doc != null) {
return new ModelArchive(
doc.getString("_id"),
doc.getString("ctime"),
doc.getString("etime"),
doc.getString("service"),
doc.getString("status"),
doc.getString("req_ip"),
doc.getString("filename")
);
}
return null;
}
private synchronized byte[] getFile(String suid) throws Exception {
MongoDatabase db = mongo.getDatabase(dbname);
GridFSBucket gridFSBucket = GridFSBuckets.create(db);
try (GridFSDownloadStream stream = gridFSBucket.openDownloadStream(new BsonString(suid))) {
long fileLength = stream.getGridFSFile().getLength();
// this should not happen, since it is capped at 1GB.
if (fileLength > Integer.MAX_VALUE) {
return ARCHIVE_TOO_BIG.getBytes();
}
return IOUtils.toByteArray(stream, fileLength);
}
}
@Override
public synchronized JSONObject getServiceRequest(String suid) throws Exception {
JSONObject requestData = null;
byte[] zFile = getFile(suid);
try (ZipInputStream zin = new ZipInputStream(new ByteArrayInputStream(zFile))) {
ZipEntry entry;
while ((entry = zin.getNextEntry()) != null) {
if (entry.getName().contains(".request")) {
BufferedReader bReader = new BufferedReader(new InputStreamReader(zin));
StringBuilder fileContent = new StringBuilder();
String inputStr;
while ((inputStr = bReader.readLine()) != null) {
fileContent.append(inputStr);
}
requestData = new JSONObject(fileContent.toString());
break;
}
}
}
return requestData;
}
@Override
public synchronized JSONObject getServiceResponse(String suid) throws Exception {
JSONObject requestData = null;
byte[] zFile = getFile(suid);
try (ZipInputStream zin = new ZipInputStream(new ByteArrayInputStream(zFile))) {
ZipEntry entry;
while ((entry = zin.getNextEntry()) != null) {
if (entry.getName().contains(".response")) {
BufferedReader bReader = new BufferedReader(new InputStreamReader(zin));
StringBuilder fileContent = new StringBuilder();
String inputStr;
while ((inputStr = bReader.readLine()) != null) {
fileContent.append(inputStr);
}
requestData = new JSONObject(fileContent.toString());
break;
}
}
}
return requestData;
}
@Override
public synchronized void shutdown() throws Exception {
mongo.close();
}
@Override
public synchronized long getCount() {
MongoDatabase db = mongo.getDatabase(dbname);
return db.getCollection(FS_COLL).count();
}
// new keys implementation.
@Override
public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
if (sortby == null) {
sortby = "ctime";
sortAsc = false;
}
Document sort = new Document(sortby, sortAsc ? 1 : -1);
Set<String> l = new LinkedHashSet<>();
MongoDatabase db = mongo.getDatabase(dbname);
MongoCollection<Document> c = db.getCollection(FS_COLL);
for (Document doc : c.find().sort(sort).skip(skip).limit(limit)) {
l.add(doc.get("_id", String.class
));
}
return l;
}
@Override
public synchronized boolean hasArchive(String suid) throws Exception {
MongoDatabase db = mongo.getDatabase(dbname);
return db.getCollection(FS_COLL).count(new Document("_id", suid)) == 1;
}
}