Config.java [src/csip] Revision: d5970d7c0e30454d24a80dc6a55dd59e3bcbc398 Date: Sat Apr 15 07:53:13 MDT 2017
/*
* $Id$
*
* This file is part of the Cloud Services Integration Platform (CSIP),
* a Model-as-a-Service framework, API and application suite.
*
* 2012-2017, Olaf David and others, OMSLab, Colorado State University.
*
* OMSLab licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package csip;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.WriteResult;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.gridfs.GridFS;
import com.mongodb.gridfs.GridFSDBFile;
import com.mongodb.gridfs.GridFSInputFile;
import csip.ModelDataService.Task;
import csip.utils.Binaries;
import csip.utils.SimpleCache;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Path;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.xml.bind.DatatypeConverter;
import oms3.annotations.Description;
import oms3.annotations.Name;
import org.apache.commons.net.util.SubnetUtils;
import org.bson.Document;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
/**
* Global properties, changeable at runtime.
*
* @author od
*/
public class Config {
private static final Properties p = new Properties();
private static final Properties allProps = new Properties();
private static final LinkedList<PostgresChunk> pcs = new LinkedList<>();
private static final List<Task> tasks = Collections.synchronizedList(new ArrayList<Task>());
//
//
private static ArchiveStore archive;
private static SessionStore session;
// private static AccessLogStore logstore;
private static ResultStore resultStore;
//
private static ExecutorService exec;
private static Timer timer;
private static final Registry reg = new Registry();
private static final Logger LOG = Logger.getLogger(Config.class.getName());
//
public static final String CSIP_VERSION = "csip.version";
public static final String CSIP_ARCH = "csip.arch";
public static final String CSIP_REMOTE_ACL = "csip.remote.acl";
public static final String CSIP_TIMEZONE = "csip.timezone";
public static final String CSIP_LOGGING_LEVEL = "csip.logging.level";
public static final String CSIP_RESPONSE_STACKTRACE = "csip.response.stacktrace";
//
public static final String CSIP_SESSION_BACKEND = "csip.session.backend";
public static final String CSIP_SESSION_TTL = "csip.session.ttl";
public static final String CSIP_SESSION_MONGODB_URI = "csip.session.mongodb.uri";
public static final String CSIP_SESSION_TTL_FAILED = "csip.session.ttl.failed";
public static final String CSIP_SESSION_TTL_CANCELLED = "csip.session.ttl.cancelled";
public static final String CSIP_ARCHIVE_BACKEND = "csip.archive.backend";
public static final String CSIP_ARCHIVE_MONGODB_URI = "csip.archive.mongodb.uri";
public static final String CSIP_ARCHIVE_MAX_FILE_SIZE = "csip.archive.max.filesize";
public static final String CSIP_ARCHIVE_TTL = "csip.archive.ttl";
public static final String CSIP_ARCHIVE_FAILEDONLY = "csip.archive.failedonly";
public static final String CSIP_RESULTSTORE_BACKEND = "csip.resultstore.backend";
public static final String CSIP_RESULTSTORE_MONGODB_URI = "csip.resultstore.mongodb.uri";
//
public static final String CSIP_DIR = "csip.dir";
public static final String CSIP_BIN_DIR = "csip.bin.dir";
public static final String CSIP_WORK_DIR = "csip.work.dir";
public static final String CSIP_RESULTS_DIR = "csip.results.dir";
public static final String CSIP_CACHE_DIR = "csip.cache.dir";
public static final String CSIP_DATA_DIR = "csip.data.dir";
//
public static final String CSIP_LOGGING_STRMAX = "csip.logging.strmax";
public static final String CSIP_SNAPSHOT = "csip.snapshot";
public static final String CSIP_KEEPWORKSPACE = "csip.keepworkspace";
public static final String CSIP_JDBC_CHECKVALID = "csip.jdbc.checkvalid";
public static final String NONE = "none";
public static Map<Object, Object> properties() {
return Collections.unmodifiableMap(p);
}
//
static SimpleCache<File, ReentrantLock> wsFileLocks = new SimpleCache<>();
static {
/*
The CSIP version
*/
put(CSIP_VERSION, "$version: 2.1.177 af2572bb0c10 2017-04-10 od, built at 2017-04-14 13:28 by od$");
/*
* The runtime architecture.
*/
put(CSIP_ARCH, Binaries.getArch());
// for legacy settings only.
put("arch", Binaries.getArch());
// remote access acl
/*
remote access for UI and config. Provide a list of IPs or
subnets that are allows to connect. This ACL is not for the
services, just service management. The default is localhost only.
Example: "csip.remoteaccess.acl": "127.0.0.1/32 10.2.222.0/24"
*/
put(CSIP_REMOTE_ACL, "127.0.0.1/32");
// session
/*
The backend to use for session management. valid choices are
"mongodb", "sql", "local".
*/
put(CSIP_SESSION_BACKEND, "local");
/*
The mongodb connecion string.
*/
put(CSIP_SESSION_MONGODB_URI, "mongodb://localhost:27017/csip");
/*
The default time in seconds for a session to stay active after the
model finishes. All model results will be available for that period.
After this period expires the session will be removed and the model results
will be removed or archived. This value can be altered using
the "keep_results" metainfo value of a request.
see duration string examples: https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html#parse-java.lang.CharSequence-
*/
put(CSIP_SESSION_TTL, "PT30S"); // 30 sec ttl
put(CSIP_SESSION_TTL_FAILED, "PT30S");
put(CSIP_SESSION_TTL_CANCELLED, "PT30S");
/*
The default csip timezone to be used for time management.
*/
put(CSIP_TIMEZONE, "MST7MDT");
// archive
/*
defines the archive backend implementation: "mongodb" or "none" are
possible.
"none" means disabled.
*/
put(CSIP_ARCHIVE_BACKEND, NONE);
/*
The mongodb connection uri, if the backend is set to "mongodb"
*/
put(CSIP_ARCHIVE_MONGODB_URI, "mongodb://localhost:27017/csip");
/*
The max file size for an attachment to be archived.
*/
put(CSIP_ARCHIVE_MAX_FILE_SIZE, "10MB");
/*
The default time in seconds for an entry to stay in the archive.
All archived model results will be available for that period after
the session expired. After this period expires the archive will be removed.
see duration string examples: https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html#parse-java.lang.CharSequence-.
*/
put(CSIP_ARCHIVE_TTL, "P1D"); // one day.
/*
If the archive is enabled, only archive failed runs, default: false
*/
put(CSIP_ARCHIVE_FAILEDONLY, "false");
// logger
/*
The log level for service logging. This is only used if "csip.logging.enabled"
is set to true. All java.util.logging.Level log levels are usable.
*/
put(CSIP_LOGGING_LEVEL, "INFO");
/*
control if the stack trace should be part of the response metadata
if the logic fails. default is true.
*/
put(CSIP_RESPONSE_STACKTRACE, "true");
/* Result store handling
*/
put(CSIP_RESULTSTORE_BACKEND, NONE);
put(CSIP_RESULTSTORE_MONGODB_URI, "mongodb://localhost:27017/csip");
/*
The csip root directory
*/
put(CSIP_DIR, "/tmp/csip");
/*
The csip directories for executables.
*/
put(CSIP_BIN_DIR, "${csip.dir}/bin");
/*
The csip directory for sessions.
*/
put(CSIP_WORK_DIR, "${csip.dir}/work");
/*
The csip directory to store results.
*/
put(CSIP_RESULTS_DIR, "${csip.dir}/results");
/*
The csip cache file directory.
*/
put(CSIP_CACHE_DIR, "${csip.dir}/cache");
/*
The csip data directory.
*/
put(CSIP_DATA_DIR, "${csip.dir}/data");
/*
External url pats These
properties can be set to force a public scheme/host/protocol for
result file downloads and catalog listing. These properties
are not set per default. They can be set independently from
each other to change only selective parts of the URL.
If none of the propeties below are set the incomming URL is
being used to construct downloads and catalogs.
*/
// put("csip.public.scheme", ""); // e.g. https
// put("csip.public.host", ""); // e.g. csip.org
// put("csip.public.port", ""); // e.g. 8080 (-1 will remove the port in the url)
///////////////////////////////////////////////////////
//// more auxiliary properties
// thread management
put("codebase.threadpool", "32"); // 10 concurrent model runs.
put("codebase.url", "http://localhost:8080");
put("codebase.port", "8085");
put("codebase.localport", "8081");
put("codebase.servicename", "csip-vmscaler");
// postgis
put("pg.chunks", "0");
put("pg.url", "jdbc:postgresql://oms-db.engr.colostate.edu:5432/r2gis");
put("pg.connpool", "16"); // 16 default connections for db connection pool
// wine
put("wine.path", "/usr/bin/wine");
//rusle2/weps
put("r2.path", "/od/projects/csip.services/bin/RomeShell.exe"); // the path is the parent directory.
put("r2.db", "http://oms-db.engr.colostate.edu/r2");
put("weps.db", "http://oms-db.engr.colostate.edu/weps");
//oms related props
put("oms.java.home", "/usr/bin");
put("oms.esp.threads", "4");
// internal
put("vm.port", "8080");
update();
}
public static void register(Set<Class<?>> service) {
registry().register(service);
}
/**
*
*/
public static class Registry {
String context;
List<Class<?>> s;
Set<Class<?>> regServ;
void setContext(String context) {
this.context = context;
}
String getContext() {
return context;
}
public void register(Set<Class<?>> service) {
if (regServ != null) {
return;
}
regServ = service;
s = new ArrayList<>();
service.forEach(c -> {
if ((c.getCanonicalName().startsWith("m.") // model service
|| c.getCanonicalName().startsWith("d."))) { // data service
LOG.info("Register service " + c);
s.add(c);
callStaticMethodIfExist(c, "onContextInit");
}
});
Collections.sort(s, (Class<?> o1, Class<?> o2)
-> getServiceName(o1).compareTo(getServiceName(o2)));
LOG.info(">>>>>>>> Registered " + s.size() + " CSIP services.");
}
public void unregister() {
if (regServ == null) {
return;
}
regServ = null;
s.forEach(c -> {
LOG.info("Unregister service " + c);
callStaticMethodIfExist(c, "onContextDestroy");
});
s.clear();
}
static void callStaticMethodIfExist(Class c, String method) {
try {
Method m = c.getMethod(method);
m.invoke(null);
LOG.info("Found and Invoked '" + method + "' in: " + c);
} catch (NoSuchMethodException ex) {
return; // no problem
} catch (SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
LOG.log(Level.SEVERE, null, ex);
}
}
List<Class<?>> getServices() {
return s;
}
String getServicePath(Class<?> i) {
Path p = (Path) i.getAnnotation(Path.class);
return (p == null) ? "" : p.value();
}
String getServiceName(Class<?> i) {
Name p = (Name) i.getAnnotation(Name.class);
return (p == null) ? "" : p.value();
}
String getServiceDescription(Class<?> i) {
Description p = (Description) i.getAnnotation(Description.class);
return (p == null) ? "" : p.value();
}
}
public static Registry registry() {
return reg;
}
/**
* session store.
*/
interface SessionStore {
/**
* Set a model session.
*
* @param suid
* @param session
* @throws Exception
*/
void setSession(String suid, ModelSession session) throws Exception;
/**
* Get a model session
* @param suid
* @return the model session
* @throws Exception
*/
ModelSession getSession(String suid) throws Exception;
boolean hasSession(String suid) throws Exception;
/**
* Remove a model session.
*
* @param suid
*/
void removeSession(String suid);
/**
* Shutdown the session store
* @throws Exception
*/
void shutdown() throws Exception;
/**
*
* @param skip the number of keys to skip
* @param limit the number of keys to return (0) means all
* @param sortby the 'field' to sort by.
* @param sortAscending true if sort is ascending, false otherwise.
* @return the keys.
*/
Set<String> keys(int skip, int limit, String sortby, boolean sortAscending);
/**
* Get the number of elements.
* @return the number of elements.
*/
long getCount();
/**
* Ping the store.
*
* @throws Exception if something is wrong.
*/
void ping() throws Exception;
}
/**
* Archive Store
*/
interface ArchiveStore {
static String ARCHIVE_TOO_BIG = "the archive was too big to store.";
void archiveSession(String sid, ModelArchive ma, File f) throws Exception;
ModelArchive getArchive(String suid) throws Exception;
boolean hasArchive(String suid) throws Exception;
void removeArchive(String suid);
byte[] getFile(String suid, String filename) throws Exception;
void shutdown() throws Exception;
/**
* Get the number of elements.
* @return the number of elements
*/
long getCount();
Set<String> keys(int skip, int limit, String sortby, boolean sortAsc);
ArchiveStore NONE = new ArchiveStore() {
Set<String> keys = new HashSet<>();
@Override
public void archiveSession(String sid, ModelArchive ma, File f) throws Exception {
}
@Override
public ModelArchive getArchive(String suid) throws Exception {
return null;
}
@Override
public void removeArchive(String suid) {
}
@Override
public byte[] getFile(String suid, String filename) throws Exception {
return null;
}
@Override
public void shutdown() throws Exception {
}
@Override
public Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
return keys;
}
@Override
public long getCount() {
return keys.size();
}
@Override
public boolean hasArchive(String suid) throws Exception {
return false;
}
};
}
interface ResultStore {
/**
*
* @param request
* @return the result json.
* @throws Exception
*/
String getResult(String request);
/**
* Returns hash
* @param digest
* @param results
* @throws Exception
*/
void putResult(String digest, String results);
/**
* Get the digest.
* @param request
* @return the sha1 digest.
*/
default String getDigest(String request) {
request = request.replace(" ", "").replace("\n", "").replace("\t", "");
try {
return DatatypeConverter.printHexBinary(
MessageDigest.getInstance("SHA1").digest(request.getBytes("UTF-8"))).toLowerCase();
} catch (NoSuchAlgorithmException | UnsupportedEncodingException ex) {
return null;
}
}
/**
* Purge the results.
*/
void purge();
/**
* Shut down the resource
* @throws Exception
*/
default void shutdown() throws Exception {
}
ResultStore NONE = new ResultStore() {
@Override
public String getResult(String request) {
return null;
}
@Override
public void putResult(String digest, String results) {
}
@Override
public String getDigest(String request) {
return null;
}
@Override
public void purge() {
}
};
}
/**
*
*/
static class MongoResultStore implements ResultStore {
MongoClient mongo;
MongoDatabase db;
static final String RESULTS = "results";
MongoResultStore(String uri) {
MongoClientURI u = new MongoClientURI(uri);
String dbname = u.getDatabase();
if (dbname == null) {
dbname = "csip";
}
mongo = new MongoClient(u);
db = mongo.getDatabase(dbname);
LOG.info("Connected to mongo result store : " + uri);
}
@Override
public synchronized String getResult(String digest) {
FindIterable<Document> r = db.getCollection(RESULTS).find(new Document("_id", digest));
Document o = r.first();
if (o != null) {
return o.getString("result");
} else {
LOG.warning("No previous result for : " + digest);
return null;
}
}
@Override
public synchronized void putResult(String digest, String result) {
Document doc = new Document();
doc.append("_id", digest);
doc.append("result", result);
UpdateOptions opt = new UpdateOptions();
opt.upsert(true);
MongoCollection<Document> c = db.getCollection(RESULTS);
c.replaceOne(new Document("_id", digest), doc, opt);
LOG.info("put result: " + digest + " " + doc.toJson());
}
@Override
public synchronized void purge() {
db.getCollection(RESULTS).drop();
}
@Override
public void shutdown() throws Exception {
mongo.close();
}
}
/**
* This is a local session store. One instance only, no failover.
*/
private static class LocalStore implements SessionStore {
Map<String, ModelSession> m = Collections.synchronizedMap(new HashMap<String, ModelSession>());
public LocalStore() {
LOG.info("Using local session store.");
}
@Override
public void shutdown() throws Exception {
m.clear();
}
@Override
public void setSession(String key, ModelSession session) throws Exception {
m.put(key, session);
}
@Override
public ModelSession getSession(String key) throws Exception {
return m.get(key);
}
@Override
public void removeSession(String key) {
m.remove(key);
}
@Override
public Set<String> keys(int skip, int limit, String sortby, boolean sortAscending) {
return m.keySet();
}
@Override
public long getCount() {
return m.size();
}
@Override
public void ping() {
if (m == null) {
throw new NullPointerException("Illegal hashtable store.");
}
}
@Override
public boolean hasSession(String suid) throws Exception {
return m.containsKey(suid);
}
}
/**
*
*/
private static class SQLSessionStore implements SessionStore {
static final String jdbc_session_id = "jdbc_session";
SessionLogger l = new SessionLogger(null, "SQLSessionLog", "");
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
boolean isMSSQL = false;
SQLSessionStore(String url) {
Map<String, String> m = new HashMap<>();
m.put("defaultAutoCommit", "true");
Binaries.addToJDBCPool(jdbc_session_id, url, m);
createTableIfNeeded();
}
@Override
public synchronized void setSession(String suid, ModelSession session) throws Exception {
try (Connection c = Binaries.getConnection(jdbc_session_id, l)) {
try (Statement st = c.createStatement()) {
String[] a = session.getAttachments();
String att = String.join(",", a);
String sql = null;
if (hasSession(suid)) {
sql = "UPDATE csip_sessions SET tst='" + session.getTstamp()
+ "', exp='" + session.getExpDate()
+ "', srv='" + session.getService()
+ "', sta='" + session.getStatus()
+ "', nip='" + session.getNodeIP()
+ "', rip='" + session.getReqIP()
+ "', cpu=" + (session.getCputime().isEmpty() ? -1 : Integer.parseInt(session.getCputime()))
+ ", pro='" + (session.getProgress() == null ? "" : session.getProgress())
+ "', att='" + att + "';";
} else {
sql = "INSERT INTO csip_sessions(suid,tst,exp,srv,sta,nip,rip,cpu,pro,att) VALUES('"
+ suid + "','"
+ session.getTstamp() + "','"
+ session.getExpDate() + "','"
+ session.getService() + "','"
+ session.getStatus() + "','"
+ session.getNodeIP() + "','"
+ session.getReqIP() + "','"
+ (session.getCputime().isEmpty() ? -1 : Integer.parseInt(session.getCputime())) + "','"
+ (session.getProgress() == null ? "" : session.getProgress()) + "','" + att + "');";
}
l.info("setSession() " + sql);
st.executeUpdate(sql);
}
} catch (SQLException | ServiceException ex) {
l.log(Level.SEVERE, null, ex);
LOG.log(Level.SEVERE, null, ex);
}
}
@Override
public synchronized ModelSession getSession(String suid) throws Exception {
try (Connection c = Binaries.getConnection(jdbc_session_id, l)) {
try (Statement st = c.createStatement()) {
String sql = "SELECT tst,exp,srv,sta,nip,rip,cpu,pro,att FROM csip_sessions WHERE suid='" + suid + "';";
l.info("getSession() " + sql);
ResultSet rs = st.executeQuery(sql);
rs.next();
ModelSession s = new ModelSession();
s.setTstamp(rs.getString(1));
s.setExpDate(rs.getString(2));
s.setService(rs.getString(3));
s.setStatus(rs.getString(4));
s.setNodeIP(rs.getString(5));
s.setReqIP(rs.getString(6));
int cpu = rs.getInt(7);
s.setCputime(cpu == -1 ? "" : Integer.toString(cpu));
String pro = rs.getString(8);
s.setProgress(pro.equals("") ? null : pro);
String att = rs.getString(9);
s.setAttachments(att.isEmpty() ? ModelSession.NO_ATTACHMENTS : att.split(","));
return s;
}
} catch (SQLException | ServiceException ex) {
l.log(Level.SEVERE, null, ex);
LOG.log(Level.SEVERE, null, ex);
}
return null;
}
@Override
public synchronized boolean hasSession(String suid) throws Exception {
try (Connection c = Binaries.getConnection(jdbc_session_id, l)) {
try (Statement st = c.createStatement()) {
String sql = "SELECT suid FROM csip_sessions WHERE suid = '" + suid + "';";
l.info("hasSession() " + sql);
ResultSet rs = st.executeQuery(sql);
return rs.next();
}
} catch (SQLException | ServiceException ex) {
l.log(Level.SEVERE, null, ex);
LOG.log(Level.SEVERE, null, ex);
}
return false;
}
@Override
public void removeSession(String suid) {
try (Connection c = Binaries.getConnection(jdbc_session_id, l)) {
try (Statement st = c.createStatement()) {
String sql = "DELETE FROM csip_sessions WHERE suid='" + suid + "';";
l.info("removeSession() " + sql);
st.executeUpdate(sql);
}
} catch (SQLException | ServiceException ex) {
l.log(Level.SEVERE, null, ex);
LOG.log(Level.SEVERE, null, ex);
}
}
@Override
public void shutdown() throws Exception {
// should shut down at the end.
}
@Override
public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
if (sortby == null) {
sortby = "tst";
sortAsc = false;
}
Set<String> keys = new LinkedHashSet<>();
try (Connection c = Binaries.getConnection(jdbc_session_id, l)) {
try (Statement st = c.createStatement()) {
String sql = isMSSQL
? "SELECT suid FROM csip_sessions ORDER BY " + sortby + " OFFSET " + skip + " ROWS FETCH NEXT " + limit + " ROWS ONLY;"
: "SELECT suid FROM csip_sessions ORDER BY " + sortby + " LIMIT " + limit + " OFFSET " + skip + ";";
l.info("keys() " + sql);
ResultSet rs = st.executeQuery(sql);
while (rs.next()) {
keys.add(rs.getString(1));
}
}
} catch (SQLException | ServiceException ex) {
l.log(Level.SEVERE, null, ex);
LOG.log(Level.SEVERE, null, ex);
}
return keys;
}
@Override
public synchronized long getCount() {
try (Connection c = Binaries.getConnection(jdbc_session_id, l)) {
try (Statement st = c.createStatement()) {
String sql = "SELECT COUNT(*) FROM csip_sessions;";
l.info("getCount() " + sql);
ResultSet rs = st.executeQuery(sql);
rs.next();
return rs.getLong(1);
}
} catch (SQLException | ServiceException ex) {
l.log(Level.SEVERE, null, ex);
LOG.log(Level.SEVERE, null, ex);
}
return -1;
}
@Override
public void ping() throws Exception {
}
private void createTableIfNeeded() {
try (Connection c = Binaries.getConnection(jdbc_session_id, l)) {
String driver = c.getMetaData().getDriverName();
l.info("Driver name: " + driver);
isMSSQL = driver.contains("Microsoft");
try (Statement st = c.createStatement()) {
// mssql
String sql = isMSSQL
? "if not exists (select * from sysobjects where name='csip_sessions' and xtype='U')"
+ " BEGIN CREATE TABLE csip_sessions ("
+ " suid VARCHAR(64) primary key,"
+ " tst VARCHAR(64),"
+ " exp VARCHAR(64),"
+ " srv VARCHAR(64),"
+ " sta VARCHAR(64),"
+ " nip VARCHAR(64),"
+ " rip VARCHAR(64),"
+ " cpu INTEGER,"
+ " pro VARCHAR(64),"
+ " att VARCHAR(64)"
+ ") END;"
//
: "CREATE TABLE IF NOT EXISTS csip_sessions ("
+ " suid VARCHAR(64) primary key,"
+ " tst VARCHAR(64),"
+ " exp VARCHAR(64),"
+ " srv VARCHAR(64),"
+ " sta VARCHAR(64),"
+ " nip VARCHAR(64),"
+ " rip VARCHAR(64),"
+ " cpu INTEGER,"
+ " pro VARCHAR(64),"
+ " att VARCHAR(64)"
+ ");";
l.info("createTable() " + sql);
st.execute(sql);
l.info("created Table. ");
}
} catch (SQLException | ServiceException ex) {
l.severe("ERROR: connecting to session store, check connection string or database setup. need r/w SQL access.");
l.log(Level.SEVERE, null, ex);
LOG.log(Level.SEVERE, null, ex);
}
}
}
/**
*
*/
private static class MongoDBSessionStore implements SessionStore {
MongoClient mongo;
MongoDatabase db;
static final String coll = "session";
MongoDBSessionStore(String uri) {
MongoClientURI u = new MongoClientURI(uri);
String dbname = u.getDatabase();
if (dbname == null) {
dbname = "csip";
}
mongo = new MongoClient(u);
db = mongo.getDatabase(dbname);
LOG.info("Connected to mongo session csip store : " + uri);
}
@Override
public synchronized void setSession(String suid, ModelSession session) throws Exception {
Document doc = new Document();
doc.append("_id", suid)
.append("tst", session.getTstamp())
.append("exp", session.getExpDate())
.append("srv", session.getService())
.append("sta", session.getStatus())
.append("nip", session.getNodeIP())
.append("rip", session.getReqIP())
.append("cpu", session.getCputime().isEmpty() ? -1 : Integer.parseInt(session.getCputime()))
.append("pro", session.getProgress() == null ? ""
: session.getProgress());
String[] a = session.getAttachments();
doc.append("att", String.join(",", a));
UpdateOptions opt = new UpdateOptions();
opt.upsert(true);
MongoCollection<Document> c = db.getCollection(coll);
c.replaceOne(new Document("_id", suid), doc, opt);
LOG.info("added: " + suid + " " + doc.toJson());
}
@Override
public synchronized ModelSession getSession(String suid) throws Exception {
FindIterable<Document> r = db.getCollection(coll).find(new Document("_id", suid));
Document o = r.first();
if (o != null) {
// Build our document and add all the fields
ModelSession s = new ModelSession();
s.setTstamp(o.get("tst", String.class));
s.setExpDate(o.get("exp", String.class));
s.setService(o.get("srv", String.class));
s.setStatus(o.get("sta", String.class));
s.setNodeIP(o.get("nip", String.class));
s.setReqIP(o.get("rip", String.class));
int cpu = o.get("cpu", Integer.class);
s.setCputime(cpu == -1 ? "" : Integer.toString(cpu));
s.setProgress((o.get("pro", String.class)).equals("") ? null : (o.get("pro", String.class)));
String l = o.get("att", String.class);
s.setAttachments(l.isEmpty() ? ModelSession.NO_ATTACHMENTS : l.split(","));
return s;
} else {
LOG.warning("Unable get session : " + suid + " as record already exists!!!");
return null;
}
}
@Override
public synchronized boolean hasSession(String suid) throws Exception {
return db.getCollection(coll).count(new Document("_id", suid)) == 1;
}
@Override
public synchronized void removeSession(String suid) {
DeleteResult r = db.getCollection(coll).deleteOne(new Document("_id", suid));
if (r.getDeletedCount() != 1) {
throw new RuntimeException("Not deleted: " + suid);
}
}
@Override
public synchronized void shutdown() throws Exception {
mongo.close();
}
@Override
public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
if (sortby == null) {
sortby = "tst";
sortAsc = false;
}
Document sort = new Document(sortby, sortAsc ? 1 : -1);
Set<String> l = new LinkedHashSet<>();
MongoCollection<Document> c = db.getCollection(coll);
for (Document doc : c.find().sort(sort).skip(skip).limit(limit)) {
l.add(doc.get("_id", String.class));
}
return l;
}
@Override
public synchronized long getCount() {
return db.getCollection(coll).count();
}
@Override
public void ping() throws Exception {
MongoCollection<Document> c = db.getCollection(coll + "_test");
String id = "test_id_1234";
// insert document
Document doc = new Document("_id", id);
UpdateOptions opt = new UpdateOptions();
opt.upsert(true);
UpdateResult res = c.replaceOne(new Document("_id", id), doc, opt);
LOG.info("added " + res);
// find it.
FindIterable<Document> d = c.find(new Document("_id", id));
Document o = d.first();
if (o == null) {
throw new Exception("Not found: " + id);
}
if (!id.equals(o.getString("_id"))) {
throw new Exception("Id not found: " + id);
}
LOG.info("found " + o);
// remove it.
DeleteResult r = c.deleteOne(new Document("_id", id));
if (r.getDeletedCount() != 1) {
throw new Exception("Not deleted: " + id);
}
LOG.info("deleted " + r);
c.drop();
}
}
/**
*
*/
private static class MongoDBArchive implements ArchiveStore {
MongoClient mongo;
String dbname;
static final String fsColl = "fs";
MongoDBArchive(String uri) {
// "mongodb://user:pass@host:port/db"
MongoClientURI u = new MongoClientURI(uri);
dbname = u.getDatabase();
if (dbname == null) {
dbname = "csip";
}
mongo = new MongoClient(u);
LOG.info("Connected to archive csip store : " + uri);
}
public synchronized void removeAll() {
DB db = mongo.getDB(dbname);
//Let's store the standard data in regular collection
DBCollection collection = db.getCollection(fsColl);
collection.remove((new BasicDBObject()));
// remove all file in the bucket
GridFS fileStore = new GridFS(db, fsColl);
fileStore.remove((DBObject) null);
}
@Override
public synchronized void archiveSession(String sid, ModelArchive ma, File f) throws Exception {
DB db = mongo.getDB(dbname);
//Let's store the standard data in regular collection
DBCollection collection = db.getCollection(fsColl);
String max = getString(CSIP_ARCHIVE_MAX_FILE_SIZE);
long limit = Binaries.parseByteSize(max);
String filename = null;
long filesize = 0;
InputStream is = null;
if (f.length() <= limit) {
filename = f.getName();
filesize = f.length();
is = new FileInputStream(f);
} else {
filename = sid + ".txt";
filesize = ARCHIVE_TOO_BIG.length();
is = new ByteArrayInputStream(ARCHIVE_TOO_BIG.getBytes());
}
// Let's query to ensure ID does not already exist in Mongo
// if it does, we will alert the user
BasicDBObject query = new BasicDBObject("_id", sid);
DBCursor cursor = collection.find(query);
if (!cursor.hasNext()) {
// Build our document and add all the fields
BasicDBObject doc = new BasicDBObject();
doc.append("_id", sid);
doc.append("service", ma.getService());
doc.append("status", ma.getStatus());
doc.append("ctime", ma.getCtime());
doc.append("etime", ma.getEtime());
doc.append("req_ip", ma.getReqIP());
doc.append("filename", "files-" + filename);
doc.append("filelength", filesize);
//insert the document into the collection
collection.insert(doc);
// Now let's store the binary file data using filestore GridFS
GridFS fileStore = new GridFS(db, fsColl);
GridFSInputFile file = fileStore.createFile(is);
file.setId(sid);
file.setFilename("files-" + filename);
file.save();
} else {
LOG.warning("Unable to archive record with ID: " + sid + " as record already exists!!!");
}
}
@Override
public synchronized ModelArchive getArchive(String suid) throws Exception {
DB db = mongo.getDB(dbname);
DBCollection collection = db.getCollection(fsColl);
BasicDBObject findQuery = new BasicDBObject("_id", suid);
DBObject doc = collection.findOne(findQuery);
if (doc != null) {
ModelArchive ma = new ModelArchive(
doc.get("ctime").toString(),
doc.get("etime").toString(),
doc.get("service").toString(),
doc.get("status").toString(),
doc.get("req_ip").toString());
return ma;
}
return null;
}
@Override
public synchronized void removeArchive(String suid) {
DB db = mongo.getDB(dbname);
DBCollection collection = db.getCollection(fsColl);
BasicDBObject q = new BasicDBObject("_id", suid);
WriteResult res = collection.remove(q);
GridFS fileStore = new GridFS(db, fsColl);
fileStore.remove(q);
}
@Override
public synchronized byte[] getFile(String suid, String filename) throws Exception {
DB db = mongo.getDB(dbname);
GridFS fileStore = new GridFS(db, fsColl);
BasicDBObject query = new BasicDBObject("_id", suid);
GridFSDBFile gridFile = fileStore.findOne(query);
if (gridFile != null) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
gridFile.writeTo(out);
return out.toByteArray();
}
return null;
}
@Override
public synchronized void shutdown() throws Exception {
mongo.close();
}
@Override
public synchronized long getCount() {
DB db = mongo.getDB(dbname);
return db.getCollection(fsColl).count();
}
// new keys implementation.
public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) {
if (sortby == null) {
sortby = "ctime";
sortAsc = false;
}
BasicDBObject sort = new BasicDBObject(sortby, sortAsc ? 1 : -1);
Set<String> l = new LinkedHashSet<>();
DB db = mongo.getDB(dbname);
DBCollection c = db.getCollection(fsColl);
DBCursor cursor = c.find().sort(sort).skip(skip).limit(limit);
while (cursor.hasNext()) {
DBObject o = cursor.next();
l.add(o.get("_id").toString());
}
return l;
}
@Override
public synchronized boolean hasArchive(String suid) throws Exception {
DB db = mongo.getDB(dbname);
DBCollection c = db.getCollection(fsColl);
return c.count(new BasicDBObject("_id", suid)) == 1;
}
}
static boolean isArchiveEnabled() {
return !getString("csip.archive.backend", NONE).equals(NONE);
}
static boolean isResultStoreEnabled() {
return !getString(CSIP_RESULTSTORE_BACKEND, NONE).equals(NONE);
}
static synchronized SessionStore getSessionStore() {
if (session == null) {
String uri = null;
switch (getString(CSIP_SESSION_BACKEND)) {
case "mongodb":
uri = getString(CSIP_SESSION_MONGODB_URI);
if (uri == null) {
throw new RuntimeException("missing uri configuration entry 'csip.session.mongodb.uri'");
}
session = new MongoDBSessionStore(uri);
break;
case "sql":
uri = getString("csip.session.sql.uri");
if (uri == null) {
throw new RuntimeException("missing uri configuration entry 'csip.session.sql.uri'");
}
session = new SQLSessionStore(uri);
break;
case "local":
session = new LocalStore();
break;
default:
throw new RuntimeException("unknown session backend: " + getString(CSIP_SESSION_BACKEND));
}
}
return session;
}
static synchronized ArchiveStore getArchiveStore() {
if (archive == null) {
if (!isArchiveEnabled()) {
archive = ArchiveStore.NONE;
} else {
switch (getString(CSIP_ARCHIVE_BACKEND)) {
case "mongodb":
String uri = getString(CSIP_ARCHIVE_MONGODB_URI);
if (uri == null) {
throw new RuntimeException("missing uri configuration entry 'csip.archive.mongodb.uri'");
}
archive = new MongoDBArchive(uri);
break;
case NONE:
archive = ArchiveStore.NONE;
break;
default:
throw new RuntimeException("unknown archive backend: " + getString(CSIP_ARCHIVE_BACKEND));
}
}
}
return archive;
}
static synchronized ResultStore getResultStore() {
if (resultStore == null) {
if (!isResultStoreEnabled()) {
resultStore = ResultStore.NONE;
} else {
switch (getString(CSIP_RESULTSTORE_BACKEND)) {
case "mongodb":
resultStore = new MongoResultStore(getString(CSIP_RESULTSTORE_MONGODB_URI));
break;
case NONE:
resultStore = ResultStore.NONE;
break;
default:
throw new RuntimeException("unknown resultstore backend: " + getString(CSIP_RESULTSTORE_BACKEND));
}
}
}
return resultStore;
}
static synchronized Timer getTimer() {
if (timer == null) {
timer = new Timer();
}
return timer;
}
static synchronized ExecutorService getExecutorService() {
if (exec == null) {
exec = Executors.newCachedThreadPool();
}
return exec;
}
static List<ModelDataService.Task> getModelTasks() {
return tasks;
}
/**
* Start up the servlet.
*
* @param context
*/
static void startup(ServletContext context) {
reg.setContext(context.getContextPath());
}
/**
* Shut down the servlet.
*
* @param context
*/
static void shutdown(ServletContext context) {
for (ModelDataService.Task t : tasks) {
t.cancel();
}
reg.unregister();
if (exec != null) {
LOG.info("Shutting down ExecutorService");
exec.shutdownNow();
}
if (session != null) {
try {
session.shutdown();
} catch (Exception ex) {
LOG.log(Level.SEVERE, "Exception", ex);
}
session = null;
}
if (archive != null) {
try {
archive.shutdown();
} catch (Exception ex) {
LOG.log(Level.SEVERE, "Exception", ex);
}
archive = null;
}
if (resultStore != null) {
try {
resultStore.shutdown();
} catch (Exception ex) {
LOG.log(Level.SEVERE, "Exception", ex);
}
resultStore = null;
}
if (timer != null) {
timer.cancel();
}
Binaries.shutdownJDBC();
}
/*
This is being called upon configuration update.
*/
static void update() {
rehashProperties();
}
static public Collection<PostgresChunk> getPostgresChunks() {
return pcs;
}
public static boolean hasProperty(String key) {
return allProps.containsKey(key);
}
public static boolean isString(String key, String str) {
String s = getString(key);
return (s != null) && s.equals(str);
}
public static String getString(String key, String def) {
return getP(key, def);
}
public static String getString(String key) {
return getP(key, null);
}
public static boolean getBoolean(String key, boolean def) {
return Boolean.parseBoolean(getP(key, Boolean.toString(def)));
}
public static boolean getBoolean(String key) {
return Boolean.parseBoolean(getP(key, "false"));
}
public static int getInt(String key, int def) {
return Integer.parseInt(getP(key, Integer.toString(def)));
}
public static int getInt(String key) {
return Integer.parseInt(getP(key, "0"));
}
public static long getLong(String key, long def) {
return Long.parseLong(getP(key, Long.toString(def)));
}
public static long getLong(String key) {
return Long.parseLong(getP(key, "0L"));
}
public static double getDouble(String key, double def) {
return Double.parseDouble(getP(key, Double.toString(def)));
}
public static double getDouble(String key) {
return Double.parseDouble(getP(key, "0.0"));
}
private static String getP(String key, String def) {
return resolve(allProps.getProperty(key, def));
}
static Properties getProperties() {
return p;
}
static Properties getMergedProperties() {
return allProps;
}
private static void put(String key, String value) {
p.setProperty(key, value);
}
public static void main(String[] args) {
// System.out.println(System.getenv());
String a = resolve("${test.port} abc ${java.home} ///${PATH}/// def ${csip.bin.dir}");
System.out.println(a);
String b = "d".replace("___", "-").replace("__", ".");
System.out.println(b);
}
/**
* Resolve a string with system and CSIP properties.
*
* @param str the string to resolve
* @return the resolved string.
*/
public static String resolve(String str) {
if (str == null) {
return null;
}
if (!str.contains("${")) {
return str;
}
String res = resolve0(str, allProps, new HashSet<>());
if (res.contains("${")) {
LOG.warning("Resolving one or more varariables failed in: " + res);
}
return res;
}
/**
* Called when the properties are updated.
*
*/
private static void rehashProperties() {
allProps.clear();
allProps.putAll(Config.properties());
allProps.putAll(System.getProperties());
Map<String, String> env = System.getenv();
for (String key : env.keySet()) {
String newKey = key.replace("___", "-").replace("__", ".");
allProps.put(newKey, env.get(key));
}
}
/**
* property substitution in a string.
*
* @param str
* @return
*/
private static String resolve0(String str, Properties prop, Set<String> keys) {
int idx = 0;
while (idx < str.length()) {
int start = str.indexOf("${", idx);
int end = str.indexOf("}", idx);
if (start == -1 || end == -1 || end < start) {
break;
}
String key = str.substring(start + 2, end);
if (keys.contains(key)) {
System.err.println("Circular property reference: " + key);
break;
}
String val = prop.getProperty(key);
if (val != null) {
keys.add(key);
val = resolve0(val, prop, keys);
keys.remove(key);
str = str.replace("${" + key + "}", val);
idx = start + val.length();
} else {
idx = start + key.length() + 3;
}
}
return str;
}
static void checkRemoteAccessACL(HttpServletRequest req) {
String reqIp = req.getHeader("X-Forwarded-For");
if (reqIp == null) {
reqIp = req.getRemoteAddr();
}
if (!checkRemoteAccessACL(reqIp)) {
LOG.log(Level.WARNING, req.getMethod() + " " + req.getRequestURI() + ", denied for " + reqIp);
throw new WebApplicationException(Response.Status.UNAUTHORIZED);
}
LOG.log(Level.INFO, req.getMethod() + " " + req.getRequestURI() + ", OK for " + reqIp);
}
static boolean checkRemoteAccessACL(String ip) {
// default is "localhost" only.
String acls = Config.getString(CSIP_REMOTE_ACL, "127.0.0.1/32");
String[] acl = acls.split("\\s+");
for (String sn : acl) {
if (sn.indexOf('/') == -1) {
if (sn.equals(ip)) {
return true;
}
} else {
if (isInSubnet(ip, sn)) {
return true;
}
}
}
return false;
}
static boolean isInSubnet(String ip, String subnet) {
try {
SubnetUtils utils = new SubnetUtils(subnet);
utils.setInclusiveHostCount(true);
return utils.getInfo().isInRange(ip);
} catch (Exception E) {
LOG.log(Level.WARNING, "Invalid Subnet: " + subnet, E);
return false;
}
}
public static class PostgresChunk {
String name;
double minLong;
double maxLong;
Connection[] connections;
int currentConnection;
public PostgresChunk(String name, double minLong, double maxLong) {
this.name = name;
this.minLong = minLong;
this.maxLong = maxLong;
}
public boolean equals(Object obj) {
return this.name.equals(((PostgresChunk) obj).getName());
}
public String getName() {
return name;
}
public double getMinLong() {
return minLong;
}
public double getMaxLong() {
return maxLong;
}
public void setConnections(Connection[] connections) {
this.connections = connections;
}
public Connection[] getConnections() {
return connections;
}
public void connectionInc() {
currentConnection++;
}
public int getCurrentConnectionIdx() {
return currentConnection;
}
public void setCurrentConnectionIdx(int currentConnection) {
this.currentConnection = currentConnection;
}
public Connection getCurrentConnection() {
if ((connections != null) && (connections.length >= currentConnection)) {
return connections[currentConnection];
} else {
return null;
}
}
public JSONObject getJSON() {
try {
JSONObject pgchunk = new JSONObject();
pgchunk.put("name", name);
pgchunk.put("minLong", minLong);
pgchunk.put("maxLong", maxLong);
return pgchunk;
} catch (JSONException jse) {
LOG.warning("Error creating JSON representation of a db chunk object");
}
return new JSONObject();
}
}
}