MongoArchive.java [tools/MetaModelTools/src/archives] Revision: 04954fb50d3148fbace9e091840b77662ab98631  Date: Fri Nov 22 12:54:54 MST 2019
/*
 * $Id$
 *
 * This file is part of the Cloud Services Integration Platform (CSIP),
 * a Model-as-a-Service framework, API, and application suite.
 *
 * 2012-2019, OMSLab, Colorado State University.
 *
 * OMSLab licenses this file to you under the MIT license.
 * See the LICENSE file in the project root for more information.
 */
package archives;

import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.GridFSBuckets;
import com.mongodb.client.gridfs.GridFSDownloadStream;
import static com.mongodb.client.model.Filters.and;
import static com.mongodb.client.model.Filters.eq;
import com.mongodb.client.model.UpdateOptions;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import models.WEPSModelArchive;
import org.apache.commons.io.IOUtils;
import org.bson.BsonString;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.codehaus.jettison.json.JSONObject;
import models.ModelArchive;
import models.ModelArchiveFactory;

/**
 *
 * @author <a href="mailto:shaun.case@colostate.edu">Shaun Case</a>
 */
public class MongoArchive implements ServiceArchive {

    static String ARCHIVE_TOO_BIG = "the archive was too big to store.";
    MongoClient mongo;
    String dbname;
    static final String FS_COLL = "fs";

    UpdateOptions opt = new UpdateOptions();

    public MongoArchive(String uri) throws Exception {
	// "mongodb://user:pass@host:port/db"
	if (uri.contains("mongodb://")) {
	    MongoClientURI u = new MongoClientURI(uri);
	    dbname = u.getDatabase();
	    if (dbname == null) {
		dbname = "csip";
	    }

	    mongo = new MongoClient(u);
	    opt.upsert(true);
	} else {
	    throw new Exception("No mongodb location specified");
	}
    } // "mongodb://user:pass@host:port/db"// "mongodb://user:pass@host:port/db"

    @Override
    public synchronized ArrayList<ModelArchive> getArchiveByReqIP(String ip, String service) throws Exception {
	ArrayList<ModelArchive> ret_val = new ArrayList<>();
	MongoDatabase db = mongo.getDatabase(dbname);
	FindIterable<Document> r = db.getCollection(FS_COLL).find(eq("req_ip", ip));

	r.sort(new Document("ctime", 1));
	for (Document doc : r) {
	    if (doc != null) {
		if (doc.getString("service").equalsIgnoreCase(service)) {
		    ret_val.add(new ModelArchive(
			    doc.getString("_id"),
			    doc.getString("ctime"),
			    doc.getString("etime"),
			    doc.getString("service"),
			    doc.getString("status"),
			    doc.getString("req_ip"),
			    doc.getString("filename")
		    )
		    );
		}
	    }
	}
	return ret_val;
    }

    @Override
    public synchronized ModelArchive getArchiveBySUID(String suid) throws Exception {
	MongoDatabase db = mongo.getDatabase(dbname);
	FindIterable<Document> r = db.getCollection(FS_COLL).find(new Document("_id", suid));
	Document doc = r.first();
	if (doc != null) {
	    return new ModelArchive(
		    doc.getString("_id"),
		    doc.getString("ctime"),
		    doc.getString("etime"),
		    doc.getString("service"),
		    doc.getString("status"),
		    doc.getString("req_ip"),
		    doc.getString("filename")
	    );
	}
	return null;
    }

    private synchronized byte[] getFile(String suid) throws Exception {
	MongoDatabase db = mongo.getDatabase(dbname);
	GridFSBucket gridFSBucket = GridFSBuckets.create(db);
	try (GridFSDownloadStream stream = gridFSBucket.openDownloadStream(new BsonString(suid))) {
	    long fileLength = stream.getGridFSFile().getLength();
	    // this should not happen, since it is capped at 1GB.
	    if (fileLength > Integer.MAX_VALUE) {
		return ARCHIVE_TOO_BIG.getBytes();
	    }
	    return IOUtils.toByteArray(stream, fileLength);
	}
    }

    @Override
    public synchronized JSONObject getServiceRequest(String suid) throws Exception {
	JSONObject requestData = null;

	byte[] zFile = getFile(suid);

	try (ZipInputStream zin = new ZipInputStream(new ByteArrayInputStream(zFile))) {
	    ZipEntry entry;

	    while ((entry = zin.getNextEntry()) != null) {
		if (entry.getName().contains(".request")) {
		    BufferedReader bReader = new BufferedReader(new InputStreamReader(zin));
		    StringBuilder fileContent = new StringBuilder();
		    String inputStr;
		    while ((inputStr = bReader.readLine()) != null) {
			fileContent.append(inputStr).append(System.lineSeparator());
		    }
		    requestData = new JSONObject(fileContent.toString());
		    break;
		}
	    }
	}

	return requestData;
    }

    @Override
    public synchronized JSONObject getServiceResponse(String suid) throws Exception {
	JSONObject requestData = null;

	byte[] zFile = getFile(suid);

	try (ZipInputStream zin = new ZipInputStream(new ByteArrayInputStream(zFile))) {
	    ZipEntry entry;

	    while ((entry = zin.getNextEntry()) != null) {
		if (entry.getName().contains(".response")) {
		    BufferedReader bReader = new BufferedReader(new InputStreamReader(zin));
		    StringBuilder fileContent = new StringBuilder();
		    String inputStr;
		    while ((inputStr = bReader.readLine()) != null) {
			fileContent.append(inputStr).append(System.lineSeparator());
		    }
		    requestData = new JSONObject(fileContent.toString());
		    break;
		}
	    }
	}

	return requestData;
    }

    @Override
    public synchronized void shutdown() throws Exception {
	mongo.close();
    }

    @Override
    public synchronized long getCount() {
	MongoDatabase db = mongo.getDatabase(dbname);
	return db.getCollection(FS_COLL).count();
    }

    // new keys implementation.
    @Override
    public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
	if (sortby == null) {
	    sortby = "ctime";
	    sortAsc = false;
	}
	Document sort = new Document(sortby, sortAsc ? 1 : -1);
	Set<String> l = new LinkedHashSet<>();
	MongoDatabase db = mongo.getDatabase(dbname);
	MongoCollection<Document> c = db.getCollection(FS_COLL);

	for (Document doc : c.find().sort(sort).skip(skip).limit(limit)) {
	    l.add(doc.get("_id", String.class
	    ));
	}
	return l;
    }

    @Override
    public synchronized boolean hasArchive(String suid) throws Exception {
	MongoDatabase db = mongo.getDatabase(dbname);
	return db.getCollection(FS_COLL).count(new Document("_id", suid)) == 1;

    }

    @Override
    public ArrayList<ModelArchive> getArchivesByFilters(Iterable<Bson> filters, int limit) throws Exception {
	ArrayList<ModelArchive> ret_val = new ArrayList<>();
	MongoDatabase db = mongo.getDatabase(dbname);
	FindIterable<Document> c = db.getCollection(FS_COLL).find(and(filters)).sort(new Document("ctime", -1));

	MongoCursor<Document> cursor = c.iterator();

	int count = 0;
	while (cursor.hasNext()) {
	    Document doc = cursor.next();
	    if (null != doc) {
		ModelArchive model = null;
		byte[] fileData = getFile(doc.getString("_id"));

		if (doc.getString("service").contains("weps/5.0")) {
		    model = new WEPSModelArchive(doc.getString("_id"),
			    doc.getString("ctime"),
			    doc.getString("etime"),
			    doc.getString("service"),
			    doc.getString("status"),
			    doc.getString("req_ip"),
			    doc.getString("filename"),
			    fileData
		    );
		    model.saveFileData(fileData);
		}

		fileData = null;  //  Allow garbage collection of the byte array.

		ret_val.add(model);
		count++;

		//  Reached our limit yet?
		if ((-1 != limit) && (count >= limit)) {
		    break;
		}
	    }
	}

	System.gc();  //  We've read a lot of data and dynamically allocated a lot of space, some of which was returned...force some garbage collection.
	return ret_val;
    }

    @Override
    public ArrayList<ModelArchive> getArchivesByFilter(String key, String value, int limit) throws Exception {
	ArrayList<ModelArchive> ret_val = new ArrayList<>();
	MongoDatabase db = mongo.getDatabase(dbname);
	FindIterable<Document> c = db.getCollection(FS_COLL).find(and(eq(key, value), eq("status", "Finished"))).sort(new Document("ctime", -1));

	MongoCursor<Document> cursor = c.iterator();

	int count = 0;
	while (cursor.hasNext()) {
	    Document doc = cursor.next();
	    if ((null != doc) && (doc.getString(key).equalsIgnoreCase(value))) {
		ModelArchive model = null;
		byte[] fileData = null;
		String serviceName = doc.getString("service");

		model = ModelArchiveFactory.getModelArchive(serviceName, doc);
		if (null != model) {
		    fileData = getFile(doc.getString("_id"));
		    model.setFileDataEx(fileData);
		    model.saveFileData(fileData);
		    fileData = null;  //  Allow garbage collection of the byte array.

		    if (null != model) {
			ret_val.add(model);
			count++;
		    }
		}
		//  Reached our limit yet?
		if ((-1 != limit) && (count >= limit)) {
		    break;
		}
	    }
	}

	System.gc();  //  We've read a lot of data and dynamically allocated a lot of space, some of which was returned...force some garbage collection.
	return ret_val;
    }

    @Override
    public ArrayList<String> getSUIDsByFilter(String key, String value
    ) {
	ArrayList<String> ret_val = new ArrayList<>();
	MongoDatabase db = mongo.getDatabase(dbname);
	FindIterable<Document> c = db.getCollection(FS_COLL).find(eq(key, value)).sort(new Document("ctime", 1));

	MongoCursor<Document> cursor = c.iterator();

	while (cursor.hasNext()) {
	    Document doc = cursor.next();
	    if (null != doc) {
		ret_val.add(doc.getString("_id"));
	    }
	}
	return ret_val;
    }
}