MongoArchive.java [src/cokeyconverter] Revision: default  Date:
/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package cokeyconverter;

import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.GridFSBuckets;
import com.mongodb.client.gridfs.GridFSDownloadStream;
import static com.mongodb.client.model.Filters.eq;
import com.mongodb.client.model.UpdateOptions;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.commons.io.IOUtils;
import org.bson.BsonString;
import org.bson.Document;
import org.codehaus.jettison.json.JSONObject;

/**
 *
 * @author <a href="mailto:shaun.case@colostate.edu">Shaun Case</a>
 */
public class MongoArchive implements ServiceArchive {

    static String ARCHIVE_TOO_BIG = "the archive was too big to store.";
    MongoClient mongo;
    String dbname;
    static final String FS_COLL = "fs";

    UpdateOptions opt = new UpdateOptions();

    MongoArchive(String uri) throws Exception {
        // "mongodb://user:pass@host:port/db"
        if (uri.contains("mongodb://")) {
            MongoClientURI u = new MongoClientURI(uri);
            dbname = u.getDatabase();
            if (dbname == null) {
                dbname = "csip";
            }

            mongo = new MongoClient(u);
            opt.upsert(true);
        } else {
            throw new Exception("No mongodb location specified");
        }
    } // "mongodb://user:pass@host:port/db"// "mongodb://user:pass@host:port/db"

    @Override
    public synchronized ArrayList<ModelArchive> getArchiveByReqIP(String ip, String service) throws Exception {
        ArrayList<ModelArchive> ret_val = new ArrayList<>();
        MongoDatabase db = mongo.getDatabase(dbname);
        FindIterable<Document> r = db.getCollection(FS_COLL).find(eq("req_ip", ip));

        r.sort(new Document("ctime", 1));
        for (Document doc : r) {
            if (doc != null) {
                if (doc.getString("service").equalsIgnoreCase(service)) {
                    ret_val.add(new ModelArchive(
                            doc.getString("_id"),
                            doc.getString("ctime"),
                            doc.getString("etime"),
                            doc.getString("service"),
                            doc.getString("status"),
                            doc.getString("req_ip"),
                            doc.getString("filename")
                    )
                    );
                }
            }
        }
        return ret_val;
    }

    @Override
    public synchronized ModelArchive getArchiveBySUID(String suid) throws Exception {
        MongoDatabase db = mongo.getDatabase(dbname);
        FindIterable<Document> r = db.getCollection(FS_COLL).find(new Document("_id", suid));
        Document doc = r.first();
        if (doc != null) {
            return new ModelArchive(
                    doc.getString("_id"),
                    doc.getString("ctime"),
                    doc.getString("etime"),
                    doc.getString("service"),
                    doc.getString("status"),
                    doc.getString("req_ip"),
                    doc.getString("filename")
            );
        }
        return null;
    }

    private synchronized byte[] getFile(String suid) throws Exception {
        MongoDatabase db = mongo.getDatabase(dbname);
        GridFSBucket gridFSBucket = GridFSBuckets.create(db);
        try (GridFSDownloadStream stream = gridFSBucket.openDownloadStream(new BsonString(suid))) {
            long fileLength = stream.getGridFSFile().getLength();
            // this should not happen, since it is capped at 1GB.
            if (fileLength > Integer.MAX_VALUE) {
                return ARCHIVE_TOO_BIG.getBytes();
            }
            return IOUtils.toByteArray(stream, fileLength);
        }
    }

    @Override
    public synchronized JSONObject getServiceRequest(String suid) throws Exception {
        JSONObject requestData = null;

        byte[] zFile = getFile(suid);

        try (ZipInputStream zin = new ZipInputStream(new ByteArrayInputStream(zFile))) {
            ZipEntry entry;

            while ((entry = zin.getNextEntry()) != null) {
                if (entry.getName().contains(".request")) {
                    BufferedReader bReader = new BufferedReader(new InputStreamReader(zin));
                    StringBuilder fileContent = new StringBuilder();
                    String inputStr;
                    while ((inputStr = bReader.readLine()) != null) {
                        fileContent.append(inputStr);
                    }
                    requestData = new JSONObject(fileContent.toString());
                    break;
                }
            }
        }

        return requestData;
    }

    @Override
    public synchronized JSONObject getServiceResponse(String suid) throws Exception {
        JSONObject requestData = null;

        byte[] zFile = getFile(suid);

        try (ZipInputStream zin = new ZipInputStream(new ByteArrayInputStream(zFile))) {
            ZipEntry entry;

            while ((entry = zin.getNextEntry()) != null) {
                if (entry.getName().contains(".response")) {
                    BufferedReader bReader = new BufferedReader(new InputStreamReader(zin));
                    StringBuilder fileContent = new StringBuilder();
                    String inputStr;
                    while ((inputStr = bReader.readLine()) != null) {
                        fileContent.append(inputStr);
                    }
                    requestData = new JSONObject(fileContent.toString());
                    break;
                }
            }
        }

        return requestData;
    }

    @Override
    public synchronized void shutdown() throws Exception {
        mongo.close();
    }

    @Override
    public synchronized long getCount() {
        MongoDatabase db = mongo.getDatabase(dbname);
        return db.getCollection(FS_COLL).count();
    }

    // new keys implementation.
    @Override
    public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
        if (sortby == null) {
            sortby = "ctime";
            sortAsc = false;
        }
        Document sort = new Document(sortby, sortAsc ? 1 : -1);
        Set<String> l = new LinkedHashSet<>();
        MongoDatabase db = mongo.getDatabase(dbname);
        MongoCollection<Document> c = db.getCollection(FS_COLL);

        for (Document doc : c.find().sort(sort).skip(skip).limit(limit)) {
            l.add(doc.get("_id", String.class
            ));
        }
        return l;
    }

    @Override
    public synchronized boolean hasArchive(String suid) throws Exception {
        MongoDatabase db = mongo.getDatabase(dbname);
        return db.getCollection(FS_COLL).count(new Document("_id", suid)) == 1;

    }
}