MongoSessionStore.java [src/csip] Revision: Date:
/*
* $Id$
*
* This file is part of the Cloud Services Integration Platform (CSIP),
* a Model-as-a-Service framework, API and application suite.
*
* 2012-2022, Olaf David and others, OMSLab, Colorado State University.
*
* OMSLab licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package csip;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
import csip.utils.Services;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.logging.Level;
import org.bson.Document;
/**
*
*/
class MongoSessionStore implements SessionStore {
MongoClient mongo;
MongoDatabase db;
static final String SESSION_COLL = "session";
static final String RESOURCES_COLL = "resources";
UpdateOptions opt = new UpdateOptions().upsert(true);
MongoSessionStore(String uri) {
MongoClientURI u = new MongoClientURI(uri);
String dbname = u.getDatabase();
if (dbname == null)
dbname = "csip";
mongo = new MongoClient(u);
db = mongo.getDatabase(dbname);
Config.LOG.info("Connected to mongo session csip store : " + uri);
}
@Override
public synchronized long countSessionsByState(String state) {
return db.getCollection(SESSION_COLL).count(new Document("sta", state));
}
@Override
public void registerResources(boolean register) {
MongoCollection<Document> c = db.getCollection(RESOURCES_COLL);
if (register) {
Document doc = new Document();
doc.append("ip", Services.LOCAL_IP_ADDR)
.append("cpus", Runtime.getRuntime().availableProcessors())
.append("total_mem", Runtime.getRuntime().totalMemory())
.append("max_mem", Runtime.getRuntime().maxMemory())
.append("free_mem", Runtime.getRuntime().freeMemory());
c.replaceOne(new Document("ip", Services.LOCAL_IP_ADDR), doc, opt);
if (Config.LOG.isLoggable(Level.INFO))
Config.LOG.info("added: " + Services.LOCAL_IP_ADDR + " " + doc.toJson());
} else {
DeleteResult r = c.deleteOne(new Document("ip", Services.LOCAL_IP_ADDR));
if (r.getDeletedCount() != 1)
Config.LOG.info("Not deleted: " + Services.LOCAL_IP_ADDR);
}
}
@Override
public synchronized void setSession(String suid, ModelSession session) throws Exception {
Document doc = new Document();
doc.append("_id", suid)
.append("tst", session.getTstamp())
.append("exp", session.getExpDate())
.append("srv", session.getService())
.append("sta", session.getStatus())
.append("nip", session.getNodeIP())
.append("rip", session.getReqIP())
.append("cpu", session.getCputime().isEmpty() ? -1 : Integer.parseInt(session.getCputime()))
.append("pro", session.getProgress() == null ? "" : session.getProgress());
doc.append("att", String.join(",", session.getAttachments()));
MongoCollection<Document> c = db.getCollection(SESSION_COLL);
c.replaceOne(new Document("_id", suid), doc, opt);
if (Config.LOG.isLoggable(Level.INFO))
Config.LOG.info("added: " + suid + " " + doc.toJson());
}
@Override
public synchronized ModelSession getSession(String suid) throws Exception {
Document o = db.getCollection(SESSION_COLL).find(new Document("_id", suid)).first();
if (o != null) {
// Build our document and add all the fields
ModelSession s = new ModelSession();
s.setTstamp(o.getString("tst"));
s.setExpDate(o.getString("exp"));
s.setService(o.getString("srv"));
s.setStatus(o.getString("sta"));
s.setNodeIP(o.getString("nip"));
s.setReqIP(o.getString("rip"));
int cpu = o.getInteger("cpu");
s.setCputime(cpu == -1 ? "" : Integer.toString(cpu));
s.setProgress((o.getString("pro")).equals("") ? null : (o.getString("pro")));
String l = o.getString("att");
s.setAttachments(l.isEmpty() ? ModelSession.NO_ATTACHMENTS : l.split(","));
return s;
} else {
Config.LOG.warning("Unable get session : " + suid + " as record already exists!!!");
return null;
}
}
@Override
public synchronized boolean hasSession(String suid) throws Exception {
return db.getCollection(SESSION_COLL).count(new Document("_id", suid)) == 1;
}
@Override
public synchronized void removeSession(String suid) {
DeleteResult r = db.getCollection(SESSION_COLL).deleteOne(new Document("_id", suid));
if (r.getDeletedCount() != 1)
throw new RuntimeException("Not deleted: " + suid);
}
@Override
public synchronized void close() throws Exception {
mongo.close();
}
@Override
public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
if (sortby == null) {
sortby = "tst";
sortAsc = false;
}
Document sort = new Document(sortby, sortAsc ? 1 : -1);
Set<String> l = new LinkedHashSet<>();
MongoCollection<Document> c = db.getCollection(SESSION_COLL);
for (Document doc : c.find().sort(sort).skip(skip).limit(limit)) {
l.add(doc.getString("_id"));
}
return l;
}
@Override
public synchronized long getCount() {
return db.getCollection(SESSION_COLL).count();
}
@Override
public void ping() throws Exception {
MongoCollection<Document> c = db.getCollection(SESSION_COLL + "_test");
String id = "test_id_1234";
// insert document
Document doc = new Document("_id", id);
UpdateResult res = c.replaceOne(new Document("_id", id), doc, opt);
Config.LOG.info("added " + res);
// find it.
FindIterable<Document> d = c.find(new Document("_id", id));
Document o = d.first();
if (o == null)
throw new Exception("Not found: " + id);
if (!id.equals(o.getString("_id")))
throw new Exception("Id not found: " + id);
Config.LOG.info("found " + o);
// remove it.
DeleteResult r = c.deleteOne(new Document("_id", id));
if (r.getDeletedCount() != 1)
throw new Exception("Not deleted: " + id);
Config.LOG.info("deleted " + r);
c.drop();
}
}