Config.java [src/csip] Revision: e3b2437676c97ed1e4ec35623fb8f67820908b1a Date: Thu Apr 07 10:16:29 MDT 2016
/*
* $Id$
*
* This file is part of the Cloud Services Integration Platform (CSIP),
* 2010-2013, Olaf David and others, Colorado State University.
*
* CSIP is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, version 2.1.
*
* CSIP is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with OMS. If not, see <http://www.gnu.org/licenses/lgpl.txt>.
*/
package csip;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.WriteResult;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.gridfs.GridFS;
import com.mongodb.gridfs.GridFSDBFile;
import com.mongodb.gridfs.GridFSInputFile;
import csip.ModelDataService.Task;
import csip.utils.Binaries;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLTimeoutException;
import java.sql.Statement;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.servlet.ServletContext;
import javax.ws.rs.Path;
import oms3.annotations.Description;
import oms3.annotations.Name;
import org.bson.Document;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;
import redis.clients.jedis.Transaction;
/**
* Global properties, changeable at runtime.
*
* @author od
*/
public class Config {
private static final Properties p = new Properties();
private static final LinkedList<PostgresChunk> pcs = new LinkedList<>();
private static final List<Task> tasks = Collections.synchronizedList(new ArrayList<Task>());
//
private static ExecutorService exec;
//
private static ArchiveStore archive;
private static SessionStore session;
private static AccessLogStore logstore;
//
private static Timer timer;
//
private static final String SESSION_PREFIX = "session";
private static final Registry reg = new Registry();
//
static final Logger LOG = Logger.getLogger(Config.class.getName());
public static Map<Object, Object> properties() {
return Collections.unmodifiableMap(p);
}
static {
/*
The CSIP version
*/
put("csip.version", "$version: 2.1.22 b50ffe381a06 2016-04-07 od, built at 2016-04-07 10:14 by od$");
/*
* The runtime architecture.
*/
put("csip.arch", Binaries.getArch());
// session
/*
The backend to use for session management. valid choices are
"mongodb", "redis", "local".
If set to "redis" the properties "csip.redis.session.server"
and "csip.session.redis.port" can be used to control the redis
connectivity for session management.
*/
put("csip.session.backend", "local");
/*
The hostname or address of the redis server for session management. The
property "csip.session.backend" has to be set to "redis" to use
this setting.
*/
put("csip.session.redis.server", "localhost");
/*
The port of the redis server for session management. The
property "csip.session.backend" has to be set to "redis" to use
this setting.
*/
put("csip.session.redis.port", "6379");
/*
The mongodb connecion string.
*/
put("csip.session.mongodb.uri", "mongodb://localhost:27017/csip");
/*
The default time in seconds for a session to stay active after the
model finishes. All model results will be available for that period.
After this period expires the session will be removed and the model results
will be removed or archived. This value can be altered using
the "keep_results" metainfo value of a request.
see duration string examples: https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html#parse-java.lang.CharSequence-
*/
put("csip.session.ttl", "PT30S"); // 30 sec ttl
put("csip.session.ttl.failed", "PT30S");
put("csip.session.ttl.cancelled", "PT30S");
/*
The default csip timezone to be used for time management.
*/
put("csip.timezone", "MST7MDT");
// archive
/*
defines the archive backend implementation: "mongodb" or "none" are
possible.
"none" means disabled.
*/
put("csip.archive.backend", "none");
/*
The mongodb connection uri, if the backend is set to "mongodb"
*/
put("csip.archive.mongodb.uri", "mongodb://localhost:27017/csip");
/*
The max file size for an attachment to be archived.
*/
put("csip.archive.max.filesize", "10MB");
/*
The default time in seconds for an entry to stay in the archive.
All archived model results will be available for that period after
the session expired. After this period expires the archive will be removed.
see duration string examples: https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html#parse-java.lang.CharSequence-.
*/
put("csip.archive.ttl", "P1D"); // one day.
/*
If the archive is enabled, only archive failed runs, default: false
*/
put("csip.archive.failedonly", "false");
// logger
/*
The log level for service logging. This is only used if "csip.logging.enabled"
is set to true. All java.util.logging.Level log levels are usable.
*/
put("csip.logging.level", "INFO");
/*
control if the stack trace should be part of the response metadata
if the logic fails. default is true.
*/
put("csip.response.stacktrace", "true");
/*
"postgres", "none"
*/
put("csip.accesslog.backend", "none");
put("csip.accesslog.postgres.url", "jdbc:postgresql://localhost/od?user=od&password=od");
/*
connection timeout in sec to write to accesslog
*/
put("csip.accesslog.postgres.timeout", "3");
/*
Number of timeouts before giving up on trying to write to accesslog
If this number is reached no further attempt is made.
*/
put("csip.accesslog.postgres.retry", "10");
// some generic server parameter for hazelcast and redis.
/*
The connection timeout for all redis connections.
*/
put("csip.redis.timeout", "0");
/*
The csip root directory
*/
put("csip.dir", "/tmp/csip");
/*
The csip directories for executables.
*/
put("csip.bin.dir", "/tmp/csip/bin");
/*
The csip directory for sessions.
*/
put("csip.work.dir", "/tmp/csip/work");
/*
The csip directory to store results.
*/
put("csip.results.dir", "/tmp/csip/results");
/*
The csip cache file directory.
*/
put("csip.cache.dir", "/tmp/csip/cache");
/*
The csip data directory.
*/
put("csip.data.dir", "/tmp/csip/data");
/*
The csip UI parts are enabled or not.
*/
put("csip.ui.enabled", "true");
/*
External url pats These
properties can be set to force a public scheme/host/protocol for
result file downloads and catalog listing. These properties
are not set per default. They can be set independently from
each other to change only selective parts of the URL.
If none of the propeties below are set the incomming URL is
being used to construct downloads and catalogs.
*/
// put("csip.public.scheme", ""); // e.g. https
// put("csip.public.host", ""); // e.g. csip.org
// put("csip.public.port", ""); // e.g. 8080 (-1 will remove the port in the url)
///////////////////////////////////////////////////////
//// more auxiliary properties
// thread management
put("codebase.threadpool", "32"); // 10 concurrent model runs.
put("codebase.url", "http://localhost:8080");
put("codebase.port", "8085");
put("codebase.localport", "8081");
put("codebase.servicename", "csip-vmscaler");
// postgis
put("pg.chunks", "0");
put("pg.url", "jdbc:postgresql://oms-db.engr.colostate.edu:5432/r2gis");
put("pg.connpool", "16"); // 16 default connections for db connection pool
// wine
put("wine.path", "/usr/bin/wine");
//rusle2/weps
put("r2.path", "/od/projects/csip.services/bin/RomeShell.exe"); // the path is the parent directory.
put("r2.db", "http://oms-db.engr.colostate.edu/r2");
put("weps.db", "http://oms-db.engr.colostate.edu/weps");
//oms related props
put("oms.java.home", "/usr/bin");
put("oms.esp.threads", "4");
// internal
put("vm.port", "8080");
update();
}
public static class Registry {
String context;
List<Class<?>> s;
Set<Class<?>> regServ;
void setContext(String context) {
this.context = context;
}
String getContext() {
return context;
}
public void register(Set<Class<?>> service) {
if (regServ != null) {
return;
}
regServ = service;
s = new ArrayList<>();
service.forEach(c -> {
if ((c.getCanonicalName().startsWith("m.") // model service
|| c.getCanonicalName().startsWith("d."))) { // data service
LOG.info("Register service: " + c.getName());
s.add(c);
callStaticMethodIfExist(c, "onContextInit");
}
});
Collections.sort(s, (Class<?> o1, Class<?> o2)
-> getServiceName(o1).compareTo(getServiceName(o2)));
LOG.info(">>> Registered " + service.size() + " CSIP services.");
}
public void unregister() {
if (regServ == null) {
return;
}
regServ = null;
s.forEach(c -> {
LOG.info("Unregister service " + c);
callStaticMethodIfExist(c, "onContextDestroy");
});
s.clear();
}
static void callStaticMethodIfExist(Class c, String method) {
try {
Method m = c.getMethod(method);
m.invoke(null);
LOG.info("Found and Invoked '" + method + "' in: " + c);
} catch (NoSuchMethodException ex) {
return; // no problem
} catch (SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
LOG.log(Level.SEVERE, null, ex);
}
}
List<Class<?>> getServices() {
return s;
}
String getServicePath(Class<?> i) {
Path p = (Path) i.getAnnotation(Path.class);
return (p == null) ? "" : p.value();
}
String getServiceName(Class<?> i) {
Name p = (Name) i.getAnnotation(Name.class);
return (p == null) ? "" : p.value();
}
String getServiceDescription(Class<?> i) {
Description p = (Description) i.getAnnotation(Description.class);
return (p == null) ? "" : p.value();
}
}
public static Registry registry() {
return reg;
}
/**
* session store.
*/
public interface SessionStore {
/**
* Set a model session.
*
* @param suid
* @param session
* @throws Exception
*/
void setSession(String suid, ModelSession session) throws Exception;
/**
* Get a model session
* @param suid
* @return
* @throws Exception
*/
ModelSession getSession(String suid) throws Exception;
boolean hasSession(String suid) throws Exception;
/**
* Remove a model session.
*
* @param suid
*/
void removeSession(String suid);
/**
* Shutdown the session store
* @throws Exception
*/
void shutdown() throws Exception;
/**
*
* @param skip the number of keys to skip
* @param limit the number of keys to return (0) means all
* @param sortby the 'field' to sort by.
* @param sortAscending true if sort is ascending, false otherwise.
* @return
*/
Set<String> keys(int skip, int limit, String sortby, boolean sortAscending);
/**
* Get the number of elements.
* @return
*/
long getCount();
/**
* Ping the store.
*
* @throws Exception if something is wrong.
*/
void ping() throws Exception;
}
/**
* Archive Store
*/
public interface ArchiveStore {
static String ARCHIVE_TOO_BIG = "the archive was too big to store.";
void archiveSession(String sid, ModelArchive ma, File f) throws Exception;
ModelArchive getArchive(String suid) throws Exception;
boolean hasArchive(String suid) throws Exception;
void removeArchive(String suid);
byte[] getFile(String suid, String filename) throws Exception;
void shutdown() throws Exception;
/**
* Get the number of elements.
* @return
*/
long getCount();
Set<String> keys(int skip, int limit, String sortby, boolean sortAsc);
ArchiveStore NONE = new ArchiveStore() {
Set<String> keys = new HashSet<>();
@Override
public void archiveSession(String sid, ModelArchive ma, File f) throws Exception {
}
@Override
public ModelArchive getArchive(String suid) throws Exception {
return null;
}
@Override
public void removeArchive(String suid) {
}
@Override
public byte[] getFile(String suid, String filename) throws Exception {
return null;
}
@Override
public void shutdown() throws Exception {
}
@Override
public Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
return keys;
}
@Override
public long getCount() {
return keys.size();
}
@Override
public boolean hasArchive(String suid) throws Exception {
return false;
}
};
}
/**
* Session logging.
*/
interface AccessLogStore {
void log(String suid, String service_url, String req_ip, String node_ip, String req_time, String status, int duration);
AccessLogStore NONE = new AccessLogStore() {
Set<String> keys = new HashSet<>();
@Override
public void log(String suid, String service_url, String req_ip, String node_ip, String req_time, String status, int duration) {
}
};
}
/**
* This is a local session store. One instance only, no failover.
*/
private static class LocalStore implements SessionStore {
Map<String, ModelSession> m = Collections.synchronizedMap(new HashMap<String, ModelSession>());
public LocalStore() {
LOG.info("Using local session store.");
}
@Override
public void shutdown() throws Exception {
m.clear();
}
@Override
public void setSession(String key, ModelSession session) throws Exception {
m.put(key, session);
}
@Override
public ModelSession getSession(String key) throws Exception {
return m.get(key);
}
@Override
public void removeSession(String key) {
m.remove(key);
}
@Override
public Set<String> keys(int skip, int limit, String sortby, boolean sortAscending) {
return m.keySet();
}
@Override
public long getCount() {
return m.size();
}
@Override
public void ping() {
if (m == null) {
throw new NullPointerException("Illegal hashtable store.");
}
}
@Override
public boolean hasSession(String suid) throws Exception {
return m.containsKey(suid);
}
}
/**
* Redis storage pool.
*
*/
private static class RedisStore implements SessionStore {
JedisPool redispool;
final String prefix;
RedisStore(String prefix, JedisPool redispool) {
this.redispool = redispool;
this.prefix = prefix;
Jedis redis = redispool.getResource();
redis.connect();
String res = redis.ping();
redis.close();
if (!res.equals("PONG")) {
throw new IllegalArgumentException("Redis not available.");
}
LOG.info("Jedis connected to " + prefix);
}
@Override
public synchronized void setSession(String suid, ModelSession ms) throws Exception {
try (Jedis redis = redispool.getResource()) {
String key = prefix + suid;
Transaction t = redis.multi();
t.del(key);
t.rpush(key, ms.getTstamp());
t.rpush(key, ms.getExpDate());
t.rpush(key, ms.getService());
t.rpush(key, ms.getStatus());
t.rpush(key, ms.getNodeIP());
t.rpush(key, ms.getReqIP());
t.rpush(key, ms.getCputime());
t.rpush(key, ms.getProgress() == null ? "" : ms.getProgress());
String[] a = ms.getAttachments();
t.rpush(key, Integer.toString(a.length));
for (String att : a) {
t.rpush(key, att);
}
t.exec();
} catch (Exception E) {
E.printStackTrace(System.err);
}
}
@Override
public synchronized ModelSession getSession(String suid) throws Exception {
try (Jedis redis = redispool.getResource()) {
List<String> v = redis.lrange(prefix + suid, 0, -1);
if (v == null || v.isEmpty()) {
return null;
}
ModelSession s = new ModelSession();
s.setTstamp(v.get(0));
s.setExpDate(v.get(1));
s.setService(v.get(2));
s.setStatus(v.get(3));
s.setNodeIP(v.get(4));
s.setReqIP(v.get(5));
s.setCputime(v.get(6));
s.setProgress(v.get(7).equals("") ? null : v.get(7));
String[] att = new String[Integer.parseInt(v.get(8))];
for (int i = 0; i < att.length; i++) {
att[i] = v.get(i + 9);
}
s.setAttachments(att);
return s;
} catch (Exception E) {
E.printStackTrace(System.err);
return null;
}
}
@Override
public synchronized boolean hasSession(String suid) throws Exception {
try (Jedis redis = redispool.getResource()) {
return redis.exists(prefix + suid);
} catch (Exception E) {
E.printStackTrace(System.err);
return false;
}
}
@Override
public synchronized void removeSession(String suid) {
try (Jedis redis = redispool.getResource()) {
redis.del(prefix + suid);
} catch (Exception E) {
E.printStackTrace(System.err);
}
}
@Override
public synchronized void shutdown() throws Exception {
redispool.destroy();
}
@Override
public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAscending) {
try (Jedis redis = redispool.getResource()) {
Set<String> l = new HashSet<>();
ScanParams params = new ScanParams();
params.match(prefix + "*");
// An iteration starts at "0": http://redis.io/commands/scan
ScanResult<String> scanResult = redis.scan("0", params);
String nextCursor = scanResult.getStringCursor();
while (!nextCursor.equals("0")) {
for (String key : scanResult.getResult()) {
l.add(key.substring(prefix.length()));
}
scanResult = redis.scan(nextCursor, params);
nextCursor = scanResult.getStringCursor();
}
return l;
}
}
@Override
public synchronized long getCount() {
return keys(0, 0, null, false).size();
}
@Override
public void ping() throws Exception {
Jedis redis = redispool.getResource();
redis.connect();
String res = redis.ping();
redis.close();
if (!res.equals("PONG")) {
throw new Exception("Redis not available.");
}
LOG.info("Jedis connected to " + prefix);
}
}
/**
*
*/
private static class MongoDBSessionStore implements SessionStore {
MongoClient mongo;
MongoDatabase db;
static final String coll = "session";
MongoDBSessionStore(String uri) {
MongoClientURI u = new MongoClientURI(uri);
String dbname = u.getDatabase();
if (dbname == null) {
dbname = "csip";
}
mongo = new MongoClient(u);
db = mongo.getDatabase(dbname);
LOG.info("Connected to mongo session csip store : " + uri);
}
@Override
public synchronized void setSession(String suid, ModelSession session) throws Exception {
Document doc = new Document();
doc.append("_id", suid)
.append("tst", session.getTstamp())
.append("exp", session.getExpDate())
.append("srv", session.getService())
.append("sta", session.getStatus())
.append("nip", session.getNodeIP())
.append("rip", session.getReqIP())
.append("cpu", session.getCputime().isEmpty() ? -1 : Integer.parseInt(session.getCputime()))
.append("pro", session.getProgress() == null ? ""
: session.getProgress());
String[] a = session.getAttachments();
doc.append("att", String.join(",", a));
UpdateOptions opt = new UpdateOptions();
opt.upsert(true);
MongoCollection<Document> c = db.getCollection(coll);
c.replaceOne(new Document("_id", suid), doc, opt);
LOG.info("added: " + suid + " " + doc.toJson());
}
@Override
public synchronized ModelSession getSession(String suid) throws Exception {
FindIterable<Document> r = db.getCollection(coll).find(new Document("_id", suid));
Document o = r.first();
if (o != null) {
// Build our document and add all the fields
ModelSession s = new ModelSession();
s.setTstamp(o.get("tst", String.class));
s.setExpDate(o.get("exp", String.class));
s.setService(o.get("srv", String.class));
s.setStatus(o.get("sta", String.class));
s.setNodeIP(o.get("nip", String.class));
s.setReqIP(o.get("rip", String.class));
int cpu = o.get("cpu", Integer.class);
s.setCputime(cpu == -1 ? "" : Integer.toString(cpu));
s.setProgress((o.get("pro", String.class)).equals("") ? null : (o.get("ts", String.class)));
String l = o.get("att", String.class);
s.setAttachments(l.isEmpty() ? ModelSession.NO_ATTACHMENTS : l.split(","));
return s;
} else {
LOG.warning("Unable get session : " + suid + " as record already exists!!!");
return null;
}
}
@Override
public synchronized boolean hasSession(String suid) throws Exception {
return db.getCollection(coll).count(new Document("_id", suid)) == 1;
}
@Override
public synchronized void removeSession(String suid) {
DeleteResult r = db.getCollection(coll).deleteOne(new Document("_id", suid));
if (r.getDeletedCount() != 1) {
throw new RuntimeException("Not deleted: " + suid);
}
}
@Override
public synchronized void shutdown() throws Exception {
mongo.close();
}
@Override
public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
if (sortby == null) {
sortby = "tst";
sortAsc = false;
}
Document sort = new Document(sortby, sortAsc ? 1 : -1);
Set<String> l = new LinkedHashSet<>();
MongoCollection<Document> c = db.getCollection(coll);
for (Document doc : c.find().sort(sort).skip(skip).limit(limit)) {
l.add(doc.get("_id", String.class));
}
return l;
}
@Override
public synchronized long getCount() {
return db.getCollection(coll).count();
}
@Override
public void ping() throws Exception {
MongoCollection<Document> c = db.getCollection(coll + "_test");
String id = "test_id_1234";
// insert document
Document doc = new Document("_id", id);
UpdateOptions opt = new UpdateOptions();
opt.upsert(true);
UpdateResult res = c.replaceOne(new Document("_id", id), doc, opt);
LOG.info("added " + res);
// find it.
FindIterable<Document> d = c.find(new Document("_id", id));
Document o = d.first();
if (o == null) {
throw new Exception("Not found: " + id);
}
if (!id.equals(o.getString("_id"))) {
throw new Exception("Id not found: " + id);
}
LOG.info("found " + o);
// remove it.
DeleteResult r = c.deleteOne(new Document("_id", id));
if (r.getDeletedCount() != 1) {
throw new Exception("Not deleted: " + id);
}
LOG.info("deleted " + r);
c.drop();
}
}
/**
*
*/
private static class MongoDBArchive implements ArchiveStore {
MongoClient mongo;
String dbname;
static final String fsColl = "fs";
MongoDBArchive(String uri) {
// "mongodb://user:pass@host:port/db"
MongoClientURI u = new MongoClientURI(uri);
dbname = u.getDatabase();
if (dbname == null) {
dbname = "csip";
}
mongo = new MongoClient(u);
LOG.info("Connected to archive csip store : " + uri);
}
@Override
public synchronized void archiveSession(String sid, ModelArchive ma, File f) throws Exception {
DB db = mongo.getDB(dbname);
//Let's store the standard data in regular collection
DBCollection collection = db.getCollection(fsColl);
String max = getString("csip.archive.max.filesize");
long limit = Binaries.parseByteSize(max);
String filename = null;
long filesize = 0;
InputStream is = null;
if (f.length() <= limit) {
filename = f.getName();
filesize = f.length();
is = new FileInputStream(f);
} else {
filename = sid + ".txt";
filesize = ARCHIVE_TOO_BIG.length();
is = new ByteArrayInputStream(ARCHIVE_TOO_BIG.getBytes());
}
// Let's query to ensure ID does not already exist in Mongo
// if it does, we will alert the user
BasicDBObject query = new BasicDBObject("_id", sid);
DBCursor cursor = collection.find(query);
if (!cursor.hasNext()) {
// Build our document and add all the fields
BasicDBObject doc = new BasicDBObject();
doc.append("_id", sid);
doc.append("service", ma.getService());
doc.append("status", ma.getStatus());
doc.append("ctime", ma.getCtime());
doc.append("etime", ma.getEtime());
doc.append("req_ip", ma.getReqIP());
doc.append("filename", "files-" + filename);
doc.append("filelength", filesize);
//insert the document into the collection
collection.insert(doc);
// Now let's store the binary file data using filestore GridFS
GridFS fileStore = new GridFS(db, fsColl);
GridFSInputFile file = fileStore.createFile(is);
file.setId(sid);
file.setFilename("files-" + filename);
file.save();
} else {
LOG.warning("Unable to archive record with ID: " + sid + " as record already exists!!!");
}
}
@Override
public synchronized ModelArchive getArchive(String suid) throws Exception {
DB db = mongo.getDB(dbname);
DBCollection collection = db.getCollection(fsColl);
BasicDBObject findQuery = new BasicDBObject("_id", suid);
DBObject doc = collection.findOne(findQuery);
if (doc != null) {
ModelArchive ma = new ModelArchive(
doc.get("ctime").toString(),
doc.get("etime").toString(),
doc.get("service").toString(),
doc.get("status").toString(),
doc.get("req_ip").toString());
return ma;
}
return null;
}
@Override
public synchronized void removeArchive(String suid) {
DB db = mongo.getDB(dbname);
DBCollection collection = db.getCollection(fsColl);
BasicDBObject q = new BasicDBObject("_id", suid);
WriteResult res = collection.remove(q);
GridFS fileStore = new GridFS(db, fsColl);
fileStore.remove(q);
}
@Override
public synchronized byte[] getFile(String suid, String filename) throws Exception {
DB db = mongo.getDB(dbname);
GridFS fileStore = new GridFS(db, fsColl);
BasicDBObject query = new BasicDBObject("_id", suid);
GridFSDBFile gridFile = fileStore.findOne(query);
if (gridFile != null) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
gridFile.writeTo(out);
return out.toByteArray();
}
return null;
}
@Override
public synchronized void shutdown() throws Exception {
mongo.close();
}
@Override
public synchronized long getCount() {
DB db = mongo.getDB(dbname);
return db.getCollection(fsColl).count();
}
// new keys implementation.
public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
if (sortby == null) {
sortby = "ctime";
sortAsc = false;
}
BasicDBObject sort = new BasicDBObject(sortby, sortAsc ? 1 : -1);
Set<String> l = new LinkedHashSet<>();
DB db = mongo.getDB(dbname);
DBCollection c = db.getCollection(fsColl);
DBCursor cursor = c.find().sort(sort).skip(skip).limit(limit);
while (cursor.hasNext()) {
DBObject o = cursor.next();
l.add(o.get("_id").toString());
}
return l;
}
@Override
public synchronized boolean hasArchive(String suid) throws Exception {
DB db = mongo.getDB(dbname);
DBCollection c = db.getCollection(fsColl);
return c.count(new BasicDBObject("_id", suid)) == 1;
}
}
/**
* Postgres session store.
*/
static class PostgresAccessLogStore implements AccessLogStore {
static final String jdbc_session_id = "jdbc_logsession";
SessionLogger l = new SessionLogger(null, "PostgresAccessLog", "");
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
int timeout = getInt("csip.accesslog.postgres.timeout", 3);
int retry = getInt("csip.accesslog.postgres.retry", 10);
PostgresAccessLogStore(String url) {
Binaries.addToJDBCPool(jdbc_session_id, url);
}
/*
CREATE TABLE accesslog (
suid varchar not null,
date timestamp,
service_url varchar not null,
status varchar not null,
duration int,
req_ip varchar not null,
node_ip varchar not null,
PRIMARY KEY (date)
);
*/
@Override
public synchronized void log(String suid, String service_url, String req_ip, String node_ip, String req_time, String status, int duration) {
if (retry == 0) {
LOG.info("accesslog retry max reached.");
return;
}
try (Connection c = Binaries.getConnection(jdbc_session_id, l)) {
try (Statement st = c.createStatement()) {
st.setQueryTimeout(timeout);
st.executeUpdate("insert into accesslog(suid,date,service_url,status,duration,req_ip,node_ip) values('"
+ suid + "','"
+ df.format(new Date()) + "','"
+ service_url + "','"
+ status + "',"
+ duration + ",'"
+ req_ip + "','"
+ node_ip + "');");
}
} catch (SQLTimeoutException to) {
retry--;
LOG.info("accesslog timeout " + to.getMessage());
l.info("accesslog timeout " + to.getMessage());
} catch (SQLException | ServiceException ex) {
retry--;
l.log(Level.SEVERE, null, ex);
LOG.log(Level.SEVERE, null, ex);
}
}
}
static boolean isArchiveEnabled() {
if (hasProperty("csip.archive.enabled")) {
return getBoolean("csip.archive.enabled");
}
return !getString("csip.archive.backend", "none").equals("none");
}
static boolean isAccessLogEnabled() {
if (hasProperty("csip.accesslog.enabled")) {
return getBoolean("csip.accesslog.enabled");
}
return !getString("csip.accesslog.backend", "none").equals("none");
}
static synchronized SessionStore getSessionStore() {
if (session == null) {
switch (getString("csip.session.backend")) {
case "redis":
JedisPool pool = new JedisPool(new JedisPoolConfig(), getString("csip.session.redis.server"),
getInt("csip.session.redis.port"), getInt("csip.redis.timeout"));
session = new RedisStore(SESSION_PREFIX + ":", pool);
break;
case "mongodb":
session = new MongoDBSessionStore(getString("csip.session.mongodb.uri"));
break;
case "local":
session = new LocalStore();
break;
default:
throw new RuntimeException("unknown session backend: " + getString("csip.session.backend"));
}
}
return session;
}
static synchronized ArchiveStore getArchiveStore() {
if (archive == null) {
if (!isArchiveEnabled()) {
archive = ArchiveStore.NONE;
} else {
switch (getString("csip.archive.backend")) {
case "mongodb":
archive = new MongoDBArchive(getString("csip.archive.mongodb.uri"));
break;
case "none":
archive = ArchiveStore.NONE;
break;
default:
throw new RuntimeException("unknown archive backend: " + getString("csip.archive.backend"));
}
}
}
return archive;
}
static synchronized AccessLogStore getAccessLogStore() {
if (logstore == null) {
if (!isAccessLogEnabled()) {
logstore = AccessLogStore.NONE;
} else {
switch (getString("csip.accesslog.backend")) {
case "postgres":
logstore = new PostgresAccessLogStore(getString("csip.accesslog.postgres.url"));
break;
case "none":
logstore = AccessLogStore.NONE;
break;
default:
throw new RuntimeException("unknown accesslog backend: " + getString("csip.accesslog.backend"));
}
}
}
return logstore;
}
static synchronized Timer getTimer() {
if (timer == null) {
timer = new Timer();
}
return timer;
}
static synchronized ExecutorService getExecutorService() {
if (exec == null) {
exec = Executors.newCachedThreadPool();
}
return exec;
}
static List<ModelDataService.Task> getModelTasks() {
return tasks;
}
/**
* Start up the servlet.
*
* @param context
*/
static void startup(ServletContext context) {
reg.setContext(context.getContextPath());
}
/**
* Shut down the servlet.
*
* @param context
*/
static void shutdown(ServletContext context) {
for (ModelDataService.Task t : tasks) {
t.cancel();
}
reg.unregister();
if (exec != null) {
LOG.info("Shutting down ExecutorService");
exec.shutdownNow();
}
if (session != null) {
try {
session.shutdown();
} catch (Exception ex) {
LOG.log(Level.SEVERE, "Exception", ex);
}
session = null;
}
if (archive != null) {
try {
archive.shutdown();
} catch (Exception ex) {
LOG.log(Level.SEVERE, "Exception", ex);
}
archive = null;
}
if (timer != null) {
timer.cancel();
}
Binaries.shutdownJDBC();
}
/*
This is being called upon configuration update.
*/
static void update() {
}
static public Collection<PostgresChunk> getPostgresChunks() {
return pcs;
}
public static String getString(String key, String def) {
return p.getProperty(key, def);
}
public static String getString(String key) {
return p.getProperty(key);
}
public static boolean isString(String key, String str) {
return p.getProperty(key) != null && p.getProperty(key).equals(str);
}
public static boolean getBoolean(String key, boolean def) {
return Boolean.parseBoolean(p.getProperty(key, Boolean.toString(def)));
}
public static boolean getBoolean(String key) {
return Boolean.parseBoolean(p.getProperty(key, "false"));
}
public static boolean hasProperty(String key) {
return p.getProperty(key) != null;
}
public static int getInt(String key, int def) {
return Integer.parseInt(p.getProperty(key, Integer.toString(def)));
}
public static int getInt(String key) {
return Integer.parseInt(p.getProperty(key, "0"));
}
public static long getLong(String key, long def) {
return Long.parseLong(p.getProperty(key, Long.toString(def)));
}
public static long getLong(String key) {
return Long.parseLong(p.getProperty(key, "0L"));
}
public static double getDouble(String key, double def) {
return Double.parseDouble(p.getProperty(key, Double.toString(def)));
}
static Properties getProperties() {
return p;
}
static void put(String key, String value) {
p.setProperty(key, value);
}
public static class PostgresChunk {
String name;
double minLong;
double maxLong;
Connection[] connections;
int currentConnection;
public PostgresChunk(String name, double minLong, double maxLong) {
this.name = name;
this.minLong = minLong;
this.maxLong = maxLong;
}
public boolean equals(Object obj) {
return this.name.equals(((PostgresChunk) obj).getName());
}
public String getName() {
return name;
}
public double getMinLong() {
return minLong;
}
public double getMaxLong() {
return maxLong;
}
public void setConnections(Connection[] connections) {
this.connections = connections;
}
public Connection[] getConnections() {
return connections;
}
public void connectionInc() {
currentConnection++;
}
public int getCurrentConnectionIdx() {
return currentConnection;
}
public void setCurrentConnectionIdx(int currentConnection) {
this.currentConnection = currentConnection;
}
public Connection getCurrentConnection() {
if ((connections != null) && (connections.length >= currentConnection)) {
return connections[currentConnection];
} else {
return null;
}
}
public JSONObject getJSON() {
try {
JSONObject pgchunk = new JSONObject();
pgchunk.put("name", name);
pgchunk.put("minLong", minLong);
pgchunk.put("maxLong", maxLong);
return pgchunk;
} catch (JSONException jse) {
LOG.warning("Error creating JSON representation of a db chunk object");
}
return new JSONObject();
}
}
}