MongoSessionStore.java [src/csip] Revision: 71821307bfe742c00c6dc582c171224a9ac59935  Date: Fri Apr 21 11:46:19 MDT 2017
/*
 * $Id$
 *
 * This file is part of the Cloud Services Integration Platform (CSIP),
 * a Model-as-a-Service framework, API and application suite.
 *
 * 2012-2017, Olaf David and others, OMSLab, Colorado State University.
 *
 * OMSLab licenses this file to you under the MIT license.
 * See the LICENSE file in the project root for more information.
 */
package csip;

import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
import java.util.LinkedHashSet;
import java.util.Set;
import org.bson.Document;

/**
 *
 */
class MongoSessionStore implements SessionStore {

    MongoClient mongo;
    MongoDatabase db;
    static final String coll = "session";


    MongoSessionStore(String uri) {
        MongoClientURI u = new MongoClientURI(uri);
        String dbname = u.getDatabase();
        if (dbname == null) {
            dbname = "csip";
        }
        mongo = new MongoClient(u);
        db = mongo.getDatabase(dbname);
        Config.LOG.info("Connected to mongo session csip store : " + uri);
    }


    @Override
    public synchronized void setSession(String suid, ModelSession session) throws Exception {
        Document doc = new Document();
        doc.append("_id", suid)
                .append("tst", session.getTstamp())
                .append("exp", session.getExpDate())
                .append("srv", session.getService())
                .append("sta", session.getStatus())
                .append("nip", session.getNodeIP())
                .append("rip", session.getReqIP())
                .append("cpu", session.getCputime().isEmpty() ? -1 : Integer.parseInt(session.getCputime()))
                .append("pro", session.getProgress() == null ? "" : session.getProgress());
        String[] a = session.getAttachments();
        doc.append("att", String.join(",", a));
        UpdateOptions opt = new UpdateOptions();
        opt.upsert(true);

        MongoCollection<Document> c = db.getCollection(coll);
        c.replaceOne(new Document("_id", suid), doc, opt);
        Config.LOG.info("added: " + suid + " " + doc.toJson());
    }


    @Override
    public synchronized ModelSession getSession(String suid) throws Exception {
        FindIterable<Document> r = db.getCollection(coll).find(new Document("_id", suid));
        Document o = r.first();
        if (o != null) {
            // Build our document and add all the fields
            ModelSession s = new ModelSession();
            s.setTstamp(o.get("tst", String.class));
            s.setExpDate(o.get("exp", String.class));
            s.setService(o.get("srv", String.class));
            s.setStatus(o.get("sta", String.class));
            s.setNodeIP(o.get("nip", String.class));
            s.setReqIP(o.get("rip", String.class));
            int cpu = o.get("cpu", Integer.class);
            s.setCputime(cpu == -1 ? "" : Integer.toString(cpu));
            s.setProgress((o.get("pro", String.class)).equals("") ? null : (o.get("pro", String.class)));
            String l = o.get("att", String.class);
            s.setAttachments(l.isEmpty() ? ModelSession.NO_ATTACHMENTS : l.split(","));
            return s;
        } else {
            Config.LOG.warning("Unable get session : " + suid + " as record already exists!!!");
            return null;
        }
    }


    @Override
    public synchronized boolean hasSession(String suid) throws Exception {
        return db.getCollection(coll).count(new Document("_id", suid)) == 1;
    }


    @Override
    public synchronized void removeSession(String suid) {
        DeleteResult r = db.getCollection(coll).deleteOne(new Document("_id", suid));
        if (r.getDeletedCount() != 1) {
            throw new RuntimeException("Not deleted: " + suid);
        }
    }


    @Override
    public synchronized void shutdown() throws Exception {
        mongo.close();
    }


    @Override
    public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
        if (sortby == null) {
            sortby = "tst";
            sortAsc = false;
        }
        Document sort = new Document(sortby, sortAsc ? 1 : -1);
        Set<String> l = new LinkedHashSet<>();
        MongoCollection<Document> c = db.getCollection(coll);
        for (Document doc : c.find().sort(sort).skip(skip).limit(limit)) {
            l.add(doc.get("_id", String.class));
        }
        return l;
    }


    @Override
    public synchronized long getCount() {
        return db.getCollection(coll).count();
    }


    @Override
    public void ping() throws Exception {
        MongoCollection<Document> c = db.getCollection(coll + "_test");
        String id = "test_id_1234";
        // insert document
        Document doc = new Document("_id", id);
        UpdateOptions opt = new UpdateOptions();
        opt.upsert(true);
        UpdateResult res = c.replaceOne(new Document("_id", id), doc, opt);
        Config.LOG.info("added " + res);
        // find it.
        FindIterable<Document> d = c.find(new Document("_id", id));
        Document o = d.first();
        if (o == null) {
            throw new Exception("Not found: " + id);
        }
        if (!id.equals(o.getString("_id"))) {
            throw new Exception("Id not found: " + id);
        }
        Config.LOG.info("found " + o);
        // remove it.
        DeleteResult r = c.deleteOne(new Document("_id", id));
        if (r.getDeletedCount() != 1) {
            throw new Exception("Not deleted: " + id);
        }
        Config.LOG.info("deleted " + r);
        c.drop();
    }

}