MongoSessionStore.java [src/csip] Revision:   Date:
/*
 * $Id$
 *
 * This file is part of the Cloud Services Integration Platform (CSIP),
 * a Model-as-a-Service framework, API and application suite.
 *
 * 2012-2022, Olaf David and others, OMSLab, Colorado State University.
 *
 * OMSLab licenses this file to you under the MIT license.
 * See the LICENSE file in the project root for more information.
 */
package csip;

import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
import csip.utils.Services;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.logging.Level;
import org.bson.Document;

/**
 *
 */
class MongoSessionStore implements SessionStore {

  MongoClient mongo;
  MongoDatabase db;
  static final String SESSION_COLL = "session";
  static final String RESOURCES_COLL = "resources";
  UpdateOptions opt = new UpdateOptions().upsert(true);


  MongoSessionStore(String uri) {
    MongoClientURI u = new MongoClientURI(uri);
    String dbname = u.getDatabase();
    if (dbname == null) 
      dbname = "csip";
    
    mongo = new MongoClient(u);
    db = mongo.getDatabase(dbname);
    Config.LOG.info("Connected to mongo session csip store : " + uri);
  }


  @Override
  public synchronized long countSessionsByState(String state) {
    return db.getCollection(SESSION_COLL).count(new Document("sta", state));
  }


  @Override
  public void registerResources(boolean register) {
    MongoCollection<Document> c = db.getCollection(RESOURCES_COLL);
    if (register) {
      Document doc = new Document();
      doc.append("ip", Services.LOCAL_IP_ADDR)
          .append("cpus", Runtime.getRuntime().availableProcessors())
          .append("total_mem", Runtime.getRuntime().totalMemory())
          .append("max_mem", Runtime.getRuntime().maxMemory())
          .append("free_mem", Runtime.getRuntime().freeMemory());
      c.replaceOne(new Document("ip", Services.LOCAL_IP_ADDR), doc, opt);
      if (Config.LOG.isLoggable(Level.INFO)) 
        Config.LOG.info("added: " + Services.LOCAL_IP_ADDR + " " + doc.toJson());
      
    } else {
      DeleteResult r = c.deleteOne(new Document("ip", Services.LOCAL_IP_ADDR));
      if (r.getDeletedCount() != 1) 
        Config.LOG.info("Not deleted: " + Services.LOCAL_IP_ADDR);
    }
  }


  @Override
  public synchronized void setSession(String suid, ModelSession session) throws Exception {
    Document doc = new Document();
    doc.append("_id", suid)
        .append("tst", session.getTstamp())
        .append("exp", session.getExpDate())
        .append("srv", session.getService())
        .append("sta", session.getStatus())
        .append("nip", session.getNodeIP())
        .append("rip", session.getReqIP())
        .append("cpu", session.getCputime().isEmpty() ? -1 : Integer.parseInt(session.getCputime()))
        .append("pro", session.getProgress() == null ? "" : session.getProgress());
    doc.append("att", String.join(",", session.getAttachments()));

    MongoCollection<Document> c = db.getCollection(SESSION_COLL);
    c.replaceOne(new Document("_id", suid), doc, opt);
    if (Config.LOG.isLoggable(Level.INFO)) 
      Config.LOG.info("added: " + suid + " " + doc.toJson());
    
  }


  @Override
  public synchronized ModelSession getSession(String suid) throws Exception {
    Document o = db.getCollection(SESSION_COLL).find(new Document("_id", suid)).first();
    if (o != null) {
      // Build our document and add all the fields
      ModelSession s = new ModelSession();
      s.setTstamp(o.getString("tst"));
      s.setExpDate(o.getString("exp"));
      s.setService(o.getString("srv"));
      s.setStatus(o.getString("sta"));
      s.setNodeIP(o.getString("nip"));
      s.setReqIP(o.getString("rip"));
      int cpu = o.getInteger("cpu");
      s.setCputime(cpu == -1 ? "" : Integer.toString(cpu));
      s.setProgress((o.getString("pro")).equals("") ? null : (o.getString("pro")));
      String l = o.getString("att");
      s.setAttachments(l.isEmpty() ? ModelSession.NO_ATTACHMENTS : l.split(","));
      return s;
    } else {
      Config.LOG.warning("Unable get session : " + suid + " as record already exists!!!");
      return null;
    }
  }


  @Override
  public synchronized boolean hasSession(String suid) throws Exception {
    return db.getCollection(SESSION_COLL).count(new Document("_id", suid)) == 1;
  }


  @Override
  public synchronized void removeSession(String suid) {
    DeleteResult r = db.getCollection(SESSION_COLL).deleteOne(new Document("_id", suid));
    if (r.getDeletedCount() != 1) 
      throw new RuntimeException("Not deleted: " + suid);
  }


  @Override
  public synchronized void close() throws Exception {
    mongo.close();
  }


  @Override
  public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
    if (sortby == null) {
      sortby = "tst";
      sortAsc = false;
    }
    Document sort = new Document(sortby, sortAsc ? 1 : -1);
    Set<String> l = new LinkedHashSet<>();
    MongoCollection<Document> c = db.getCollection(SESSION_COLL);
    for (Document doc : c.find().sort(sort).skip(skip).limit(limit)) {
      l.add(doc.getString("_id"));
    }
    return l;
  }


  @Override
  public synchronized long getCount() {
    return db.getCollection(SESSION_COLL).count();
  }


  @Override
  public void ping() throws Exception {
    MongoCollection<Document> c = db.getCollection(SESSION_COLL + "_test");
    String id = "test_id_1234";
    // insert document
    Document doc = new Document("_id", id);
    UpdateResult res = c.replaceOne(new Document("_id", id), doc, opt);
    Config.LOG.info("added " + res);
    // find it.
    FindIterable<Document> d = c.find(new Document("_id", id));
    Document o = d.first();
    if (o == null) 
      throw new Exception("Not found: " + id);
    
    if (!id.equals(o.getString("_id"))) 
      throw new Exception("Id not found: " + id);
    
    Config.LOG.info("found " + o);
    // remove it.
    DeleteResult r = c.deleteOne(new Document("_id", id));
    if (r.getDeletedCount() != 1) 
      throw new Exception("Not deleted: " + id);
    
    Config.LOG.info("deleted " + r);
    c.drop();
  }

}