MongoSessionStore.java [src/csip] Revision: 71821307bfe742c00c6dc582c171224a9ac59935 Date: Fri Apr 21 11:46:19 MDT 2017
/*
* $Id$
*
* This file is part of the Cloud Services Integration Platform (CSIP),
* a Model-as-a-Service framework, API and application suite.
*
* 2012-2017, Olaf David and others, OMSLab, Colorado State University.
*
* OMSLab licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package csip;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
import java.util.LinkedHashSet;
import java.util.Set;
import org.bson.Document;
/**
*
*/
class MongoSessionStore implements SessionStore {
MongoClient mongo;
MongoDatabase db;
static final String coll = "session";
MongoSessionStore(String uri) {
MongoClientURI u = new MongoClientURI(uri);
String dbname = u.getDatabase();
if (dbname == null) {
dbname = "csip";
}
mongo = new MongoClient(u);
db = mongo.getDatabase(dbname);
Config.LOG.info("Connected to mongo session csip store : " + uri);
}
@Override
public synchronized void setSession(String suid, ModelSession session) throws Exception {
Document doc = new Document();
doc.append("_id", suid)
.append("tst", session.getTstamp())
.append("exp", session.getExpDate())
.append("srv", session.getService())
.append("sta", session.getStatus())
.append("nip", session.getNodeIP())
.append("rip", session.getReqIP())
.append("cpu", session.getCputime().isEmpty() ? -1 : Integer.parseInt(session.getCputime()))
.append("pro", session.getProgress() == null ? "" : session.getProgress());
String[] a = session.getAttachments();
doc.append("att", String.join(",", a));
UpdateOptions opt = new UpdateOptions();
opt.upsert(true);
MongoCollection<Document> c = db.getCollection(coll);
c.replaceOne(new Document("_id", suid), doc, opt);
Config.LOG.info("added: " + suid + " " + doc.toJson());
}
@Override
public synchronized ModelSession getSession(String suid) throws Exception {
FindIterable<Document> r = db.getCollection(coll).find(new Document("_id", suid));
Document o = r.first();
if (o != null) {
// Build our document and add all the fields
ModelSession s = new ModelSession();
s.setTstamp(o.get("tst", String.class));
s.setExpDate(o.get("exp", String.class));
s.setService(o.get("srv", String.class));
s.setStatus(o.get("sta", String.class));
s.setNodeIP(o.get("nip", String.class));
s.setReqIP(o.get("rip", String.class));
int cpu = o.get("cpu", Integer.class);
s.setCputime(cpu == -1 ? "" : Integer.toString(cpu));
s.setProgress((o.get("pro", String.class)).equals("") ? null : (o.get("pro", String.class)));
String l = o.get("att", String.class);
s.setAttachments(l.isEmpty() ? ModelSession.NO_ATTACHMENTS : l.split(","));
return s;
} else {
Config.LOG.warning("Unable get session : " + suid + " as record already exists!!!");
return null;
}
}
@Override
public synchronized boolean hasSession(String suid) throws Exception {
return db.getCollection(coll).count(new Document("_id", suid)) == 1;
}
@Override
public synchronized void removeSession(String suid) {
DeleteResult r = db.getCollection(coll).deleteOne(new Document("_id", suid));
if (r.getDeletedCount() != 1) {
throw new RuntimeException("Not deleted: " + suid);
}
}
@Override
public synchronized void shutdown() throws Exception {
mongo.close();
}
@Override
public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
if (sortby == null) {
sortby = "tst";
sortAsc = false;
}
Document sort = new Document(sortby, sortAsc ? 1 : -1);
Set<String> l = new LinkedHashSet<>();
MongoCollection<Document> c = db.getCollection(coll);
for (Document doc : c.find().sort(sort).skip(skip).limit(limit)) {
l.add(doc.get("_id", String.class));
}
return l;
}
@Override
public synchronized long getCount() {
return db.getCollection(coll).count();
}
@Override
public void ping() throws Exception {
MongoCollection<Document> c = db.getCollection(coll + "_test");
String id = "test_id_1234";
// insert document
Document doc = new Document("_id", id);
UpdateOptions opt = new UpdateOptions();
opt.upsert(true);
UpdateResult res = c.replaceOne(new Document("_id", id), doc, opt);
Config.LOG.info("added " + res);
// find it.
FindIterable<Document> d = c.find(new Document("_id", id));
Document o = d.first();
if (o == null) {
throw new Exception("Not found: " + id);
}
if (!id.equals(o.getString("_id"))) {
throw new Exception("Id not found: " + id);
}
Config.LOG.info("found " + o);
// remove it.
DeleteResult r = c.deleteOne(new Document("_id", id));
if (r.getDeletedCount() != 1) {
throw new Exception("Not deleted: " + id);
}
Config.LOG.info("deleted " + r);
c.drop();
}
}