@@ -11,60 +11,16 @@ |
*/ |
package csip; |
|
-import com.mongodb.BasicDBObject; |
-import com.mongodb.DB; |
-import com.mongodb.DBCollection; |
-import com.mongodb.DBCursor; |
-import com.mongodb.DBObject; |
-import com.mongodb.MongoClient; |
-import com.mongodb.MongoClientURI; |
-import com.mongodb.WriteResult; |
-import com.mongodb.client.FindIterable; |
-import com.mongodb.client.MongoCollection; |
-import com.mongodb.client.MongoDatabase; |
-import com.mongodb.client.model.UpdateOptions; |
-import com.mongodb.client.result.DeleteResult; |
-import com.mongodb.client.result.UpdateResult; |
-import com.mongodb.gridfs.GridFS; |
-import com.mongodb.gridfs.GridFSDBFile; |
-import com.mongodb.gridfs.GridFSInputFile; |
import csip.ModelDataService.Task; |
import csip.utils.Binaries; |
import csip.utils.SimpleCache; |
-import java.io.ByteArrayInputStream; |
-import java.io.ByteArrayOutputStream; |
import java.io.File; |
-import java.io.FileInputStream; |
-import java.io.InputStream; |
-import java.io.UnsupportedEncodingException; |
-import java.lang.reflect.InvocationTargetException; |
-import java.lang.reflect.Method; |
-import java.security.MessageDigest; |
-import java.security.NoSuchAlgorithmException; |
-import java.sql.Connection; |
-import java.sql.ResultSet; |
-import java.sql.SQLException; |
-import java.sql.Statement; |
-import java.text.DateFormat; |
-import java.text.SimpleDateFormat; |
import java.util.*; |
import java.util.concurrent.ExecutorService; |
import java.util.concurrent.Executors; |
import java.util.concurrent.locks.ReentrantLock; |
import java.util.logging.Level; |
import java.util.logging.Logger; |
-import javax.servlet.ServletContext; |
-import javax.servlet.http.HttpServletRequest; |
-import javax.ws.rs.Path; |
-import javax.ws.rs.WebApplicationException; |
-import javax.ws.rs.core.Response; |
-import javax.xml.bind.DatatypeConverter; |
-import oms3.annotations.Description; |
-import oms3.annotations.Name; |
-import org.apache.commons.net.util.SubnetUtils; |
-import org.bson.Document; |
-import org.codehaus.jettison.json.JSONException; |
-import org.codehaus.jettison.json.JSONObject; |
|
/** |
* Global properties, changeable at runtime. |
@@ -72,24 +28,23 @@ |
* @author od |
*/ |
public class Config { |
- |
+ |
private static final Properties p = new Properties(); |
private static final Properties allProps = new Properties(); |
- |
+ |
private static final LinkedList<PostgresChunk> pcs = new LinkedList<>(); |
private static final List<Task> tasks = Collections.synchronizedList(new ArrayList<Task>()); |
// |
// |
private static ArchiveStore archive; |
private static SessionStore session; |
-// private static AccessLogStore logstore; |
private static ResultStore resultStore; |
|
// |
private static ExecutorService exec; |
private static Timer timer; |
private static final Registry reg = new Registry(); |
- private static final Logger LOG = Logger.getLogger(Config.class.getName()); |
+ static final Logger LOG = Logger.getLogger(Config.class.getName()); |
|
// |
public static final String CSIP_VERSION = "csip.version"; |
@@ -123,24 +78,28 @@ |
public static final String CSIP_SNAPSHOT = "csip.snapshot"; |
public static final String CSIP_KEEPWORKSPACE = "csip.keepworkspace"; |
public static final String CSIP_JDBC_CHECKVALID = "csip.jdbc.checkvalid"; |
- |
+ |
+ // values for session/archive/result |
+ public static final String MONGODB = "mongodb"; |
+ public static final String LOCAL = "local"; |
+ public static final String SQL = "sql"; |
public static final String NONE = "none"; |
- |
- |
+ |
+ |
public static Map<Object, Object> properties() { |
return Collections.unmodifiableMap(p); |
} |
|
// |
static SimpleCache<File, ReentrantLock> wsFileLocks = new SimpleCache<>(); |
- |
- |
+ |
+ |
static { |
|
/* |
The CSIP version |
*/ |
- put(CSIP_VERSION, "$version: 2.1.179 5b6bb423bd93 2017-04-17 od, built at 2017-04-20 09:18 by od$"); |
+ put(CSIP_VERSION, "$version: 2.1.180 8d161c806793 2017-04-20 od, built at 2017-04-21 11:40 by od$"); |
|
/* |
* The runtime architecture. |
@@ -164,7 +123,7 @@ |
The backend to use for session management. valid choices are |
"mongodb", "sql", "local". |
*/ |
- put(CSIP_SESSION_BACKEND, "local"); |
+ put(CSIP_SESSION_BACKEND, LOCAL); |
|
|
/* |
@@ -311,1048 +270,60 @@ |
|
// internal |
put("vm.port", "8080"); |
- |
+ |
update(); |
} |
- |
+ |
|
public static void register(Set<Class<?>> service) { |
- registry().register(service); |
+ getRegistry().register(service); |
} |
|
+ |
/** |
- * |
+ * @return @deprecated use register() instead. |
*/ |
- public static class Registry { |
- |
- String context; |
- List<Class<?>> s; |
- Set<Class<?>> regServ; |
- |
- |
- void setContext(String context) { |
- this.context = context; |
- } |
- |
- |
- String getContext() { |
- return context; |
- } |
- |
- |
- public void register(Set<Class<?>> service) { |
- if (regServ != null) { |
- return; |
- } |
- regServ = service; |
- s = new ArrayList<>(); |
- |
- service.forEach(c -> { |
- if ((c.getCanonicalName().startsWith("m.") // model service |
- || c.getCanonicalName().startsWith("d."))) { // data service |
- LOG.info("Register service " + c); |
- s.add(c); |
- callStaticMethodIfExist(c, "onContextInit"); |
- } |
- }); |
- Collections.sort(s, (Class<?> o1, Class<?> o2) |
- -> getServiceName(o1).compareTo(getServiceName(o2))); |
- LOG.info(">>>>>>>> Registered " + s.size() + " CSIP services."); |
- } |
- |
- |
- public void unregister() { |
- if (regServ == null) { |
- return; |
- } |
- regServ = null; |
- s.forEach(c -> { |
- LOG.info("Unregister service " + c); |
- callStaticMethodIfExist(c, "onContextDestroy"); |
- }); |
- s.clear(); |
- } |
- |
- |
- static void callStaticMethodIfExist(Class c, String method) { |
- try { |
- Method m = c.getMethod(method); |
- m.invoke(null); |
- LOG.info("Found and Invoked '" + method + "' in: " + c); |
- } catch (NoSuchMethodException ex) { |
- return; // no problem |
- } catch (SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) { |
- LOG.log(Level.SEVERE, null, ex); |
- } |
- } |
- |
- |
- List<Class<?>> getServices() { |
- return s; |
- } |
- |
- |
- String getServicePath(Class<?> i) { |
- Path p = (Path) i.getAnnotation(Path.class); |
- return (p == null) ? "" : p.value(); |
- } |
- |
- |
- String getServiceName(Class<?> i) { |
- Name p = (Name) i.getAnnotation(Name.class); |
- return (p == null) ? "" : p.value(); |
- } |
- |
- |
- String getServiceDescription(Class<?> i) { |
- Description p = (Description) i.getAnnotation(Description.class); |
- return (p == null) ? "" : p.value(); |
- } |
- } |
- |
- |
+ @Deprecated |
public static Registry registry() { |
return reg; |
} |
|
- /** |
- * session store. |
- */ |
- interface SessionStore { |
|
- /** |
- * Set a model session. |
- * |
- * @param suid |
- * @param session |
- * @throws Exception |
- */ |
- void setSession(String suid, ModelSession session) throws Exception; |
- |
- |
- /** |
- * Get a model session |
- * @param suid |
- * @return the model session |
- * @throws Exception |
- */ |
- ModelSession getSession(String suid) throws Exception; |
- |
- |
- boolean hasSession(String suid) throws Exception; |
- |
- |
- /** |
- * Remove a model session. |
- * |
- * @param suid |
- */ |
- void removeSession(String suid); |
- |
- |
- /** |
- * Shutdown the session store |
- * @throws Exception |
- */ |
- void shutdown() throws Exception; |
- |
- |
- /** |
- * |
- * @param skip the number of keys to skip |
- * @param limit the number of keys to return (0) means all |
- * @param sortby the 'field' to sort by. |
- * @param sortAscending true if sort is ascending, false otherwise. |
- * @return the keys. |
- */ |
- Set<String> keys(int skip, int limit, String sortby, boolean sortAscending); |
- |
- |
- /** |
- * Get the number of elements. |
- * @return the number of elements. |
- */ |
- long getCount(); |
- |
- |
- /** |
- * Ping the store. |
- * |
- * @throws Exception if something is wrong. |
- */ |
- void ping() throws Exception; |
+ static Registry getRegistry() { |
+ return reg; |
} |
|
- /** |
- * Archive Store |
- */ |
- interface ArchiveStore { |
- |
- static String ARCHIVE_TOO_BIG = "the archive was too big to store."; |
- |
- |
- void archiveSession(String sid, ModelArchive ma, File f) throws Exception; |
- |
- |
- ModelArchive getArchive(String suid) throws Exception; |
- |
- |
- boolean hasArchive(String suid) throws Exception; |
- |
- |
- void removeArchive(String suid); |
- |
- |
- byte[] getFile(String suid, String filename) throws Exception; |
- |
- |
- void shutdown() throws Exception; |
|
- |
- /** |
- * Get the number of elements. |
- * @return the number of elements |
- */ |
- long getCount(); |
- |
- |
- Set<String> keys(int skip, int limit, String sortby, boolean sortAsc); |
- |
- ArchiveStore NONE = new ArchiveStore() { |
- Set<String> keys = new HashSet<>(); |
- |
- |
- @Override |
- public void archiveSession(String sid, ModelArchive ma, File f) throws Exception { |
- } |
- |
- |
- @Override |
- public ModelArchive getArchive(String suid) throws Exception { |
- return null; |
- } |
- |
- |
- @Override |
- public void removeArchive(String suid) { |
- } |
- |
- |
- @Override |
- public byte[] getFile(String suid, String filename) throws Exception { |
- return null; |
- } |
- |
- |
- @Override |
- public void shutdown() throws Exception { |
- } |
- |
- |
- @Override |
- public Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) { |
- return keys; |
- } |
- |
- |
- @Override |
- public long getCount() { |
- return keys.size(); |
- } |
- |
- |
- @Override |
- public boolean hasArchive(String suid) throws Exception { |
- return false; |
- } |
- }; |
- |
- } |
- |
- interface ResultStore { |
- |
- /** |
- * |
- * @param request |
- * @return the result json. |
- * @throws Exception |
- */ |
- String getResult(String request); |
- |
- |
- /** |
- * Returns hash |
- * @param digest |
- * @param results |
- * @throws Exception |
- */ |
- void putResult(String digest, String results); |
- |
- |
- /** |
- * Get the digest. |
- * @param request |
- * @return the sha1 digest. |
- */ |
- default String getDigest(String request) { |
- request = request.replace(" ", "").replace("\n", "").replace("\t", ""); |
- try { |
- return DatatypeConverter.printHexBinary( |
- MessageDigest.getInstance("SHA1").digest(request.getBytes("UTF-8"))).toLowerCase(); |
- } catch (NoSuchAlgorithmException | UnsupportedEncodingException ex) { |
- return null; |
- } |
- } |
- |
- |
- /** |
- * Purge the results. |
- */ |
- void purge(); |
- |
- |
- /** |
- * Shut down the resource |
- * @throws Exception |
- */ |
- default void shutdown() throws Exception { |
- } |
- |
- ResultStore NONE = new ResultStore() { |
- |
- @Override |
- public String getResult(String request) { |
- return null; |
- } |
- |
- |
- @Override |
- public void putResult(String digest, String results) { |
- } |
- |
- |
- @Override |
- public String getDigest(String request) { |
- return null; |
- } |
- |
- |
- @Override |
- public void purge() { |
- } |
- |
- }; |
- |
+ static boolean isArchiveEnabled() { |
+ return !getString(CSIP_ARCHIVE_BACKEND, NONE).equals(NONE); |
} |
|
- /** |
- * |
- */ |
- static class MongoResultStore implements ResultStore { |
- |
- MongoClient mongo; |
- MongoDatabase db; |
- static final String RESULTS = "results"; |
- |
- |
- MongoResultStore(String uri) { |
- MongoClientURI u = new MongoClientURI(uri); |
- String dbname = u.getDatabase(); |
- if (dbname == null) { |
- dbname = "csip"; |
- } |
- mongo = new MongoClient(u); |
- db = mongo.getDatabase(dbname); |
- LOG.info("Connected to mongo result store : " + uri); |
- } |
- |
- |
- @Override |
- public synchronized String getResult(String digest) { |
- FindIterable<Document> r = db.getCollection(RESULTS).find(new Document("_id", digest)); |
- Document o = r.first(); |
- if (o != null) { |
- return o.getString("result"); |
- } else { |
- LOG.warning("No previous result for : " + digest); |
- return null; |
- } |
- } |
- |
- |
- @Override |
- public synchronized void putResult(String digest, String result) { |
- Document doc = new Document(); |
- doc.append("_id", digest); |
- doc.append("result", result); |
- |
- UpdateOptions opt = new UpdateOptions(); |
- opt.upsert(true); |
- MongoCollection<Document> c = db.getCollection(RESULTS); |
- c.replaceOne(new Document("_id", digest), doc, opt); |
- |
- LOG.info("put result: " + digest + " " + doc.toJson()); |
- } |
- |
- |
- @Override |
- public synchronized void purge() { |
- db.getCollection(RESULTS).drop(); |
- } |
- |
- |
- @Override |
- public void shutdown() throws Exception { |
- mongo.close(); |
- } |
- } |
|
- /** |
- * This is a local session store. One instance only, no failover. |
- */ |
- private static class LocalStore implements SessionStore { |
- |
- Map<String, ModelSession> m = Collections.synchronizedMap(new HashMap<String, ModelSession>()); |
- |
- |
- public LocalStore() { |
- LOG.info("Using local session store."); |
- } |
- |
- |
- @Override |
- public void shutdown() throws Exception { |
- m.clear(); |
- } |
- |
- |
- @Override |
- public void setSession(String key, ModelSession session) throws Exception { |
- m.put(key, session); |
- } |
- |
- |
- @Override |
- public ModelSession getSession(String key) throws Exception { |
- return m.get(key); |
- } |
- |
- |
- @Override |
- public void removeSession(String key) { |
- m.remove(key); |
- } |
- |
- |
- @Override |
- public Set<String> keys(int skip, int limit, String sortby, boolean sortAscending) { |
- return m.keySet(); |
- } |
- |
- |
- @Override |
- public long getCount() { |
- return m.size(); |
- } |
- |
- |
- @Override |
- public void ping() { |
- if (m == null) { |
- throw new NullPointerException("Illegal hashtable store."); |
- } |
- } |
- |
- |
- @Override |
- public boolean hasSession(String suid) throws Exception { |
- return m.containsKey(suid); |
- } |
- } |
- |
- /** |
- * |
- */ |
- private static class SQLSessionStore implements SessionStore { |
- |
- static final String jdbc_session_id = "jdbc_session"; |
- |
- SessionLogger l = new SessionLogger(null, "SQLSessionLog", ""); |
- DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); |
- boolean isMSSQL = false; |
- |
- |
- SQLSessionStore(String url) { |
- Map<String, String> m = new HashMap<>(); |
- m.put("defaultAutoCommit", "true"); |
- Binaries.addToJDBCPool(jdbc_session_id, url, m); |
- |
- createTableIfNeeded(); |
- } |
- |
- |
- @Override |
- public synchronized void setSession(String suid, ModelSession session) throws Exception { |
- try (Connection c = Binaries.getConnection(jdbc_session_id, l)) { |
- try (Statement st = c.createStatement()) { |
- String[] a = session.getAttachments(); |
- String att = String.join(",", a); |
- String sql = null; |
- if (hasSession(suid)) { |
- sql = "UPDATE csip_sessions SET tst='" + session.getTstamp() |
- + "', exp='" + session.getExpDate() |
- + "', srv='" + session.getService() |
- + "', sta='" + session.getStatus() |
- + "', nip='" + session.getNodeIP() |
- + "', rip='" + session.getReqIP() |
- + "', cpu=" + (session.getCputime().isEmpty() ? -1 : Integer.parseInt(session.getCputime())) |
- + ", pro='" + (session.getProgress() == null ? "" : session.getProgress()) |
- + "', att='" + att + "';"; |
- } else { |
- sql = "INSERT INTO csip_sessions(suid,tst,exp,srv,sta,nip,rip,cpu,pro,att) VALUES('" |
- + suid + "','" |
- + session.getTstamp() + "','" |
- + session.getExpDate() + "','" |
- + session.getService() + "','" |
- + session.getStatus() + "','" |
- + session.getNodeIP() + "','" |
- + session.getReqIP() + "','" |
- + (session.getCputime().isEmpty() ? -1 : Integer.parseInt(session.getCputime())) + "','" |
- + (session.getProgress() == null ? "" : session.getProgress()) + "','" + att + "');"; |
- } |
- l.info("setSession() " + sql); |
- st.executeUpdate(sql); |
- } |
- } catch (SQLException | ServiceException ex) { |
- l.log(Level.SEVERE, null, ex); |
- LOG.log(Level.SEVERE, null, ex); |
- } |
- } |
- |
- |
- @Override |
- public synchronized ModelSession getSession(String suid) throws Exception { |
- try (Connection c = Binaries.getConnection(jdbc_session_id, l)) { |
- try (Statement st = c.createStatement()) { |
- String sql = "SELECT tst,exp,srv,sta,nip,rip,cpu,pro,att FROM csip_sessions WHERE suid='" + suid + "';"; |
- l.info("getSession() " + sql); |
- ResultSet rs = st.executeQuery(sql); |
- rs.next(); |
- |
- ModelSession s = new ModelSession(); |
- s.setTstamp(rs.getString(1)); |
- s.setExpDate(rs.getString(2)); |
- s.setService(rs.getString(3)); |
- s.setStatus(rs.getString(4)); |
- s.setNodeIP(rs.getString(5)); |
- s.setReqIP(rs.getString(6)); |
- int cpu = rs.getInt(7); |
- s.setCputime(cpu == -1 ? "" : Integer.toString(cpu)); |
- String pro = rs.getString(8); |
- s.setProgress(pro.equals("") ? null : pro); |
- String att = rs.getString(9); |
- s.setAttachments(att.isEmpty() ? ModelSession.NO_ATTACHMENTS : att.split(",")); |
- return s; |
- } |
- } catch (SQLException | ServiceException ex) { |
- l.log(Level.SEVERE, null, ex); |
- LOG.log(Level.SEVERE, null, ex); |
- } |
- return null; |
- } |
- |
- |
- @Override |
- public synchronized boolean hasSession(String suid) throws Exception { |
- try (Connection c = Binaries.getConnection(jdbc_session_id, l)) { |
- try (Statement st = c.createStatement()) { |
- String sql = "SELECT suid FROM csip_sessions WHERE suid = '" + suid + "';"; |
- l.info("hasSession() " + sql); |
- ResultSet rs = st.executeQuery(sql); |
- return rs.next(); |
- } |
- } catch (SQLException | ServiceException ex) { |
- l.log(Level.SEVERE, null, ex); |
- LOG.log(Level.SEVERE, null, ex); |
- } |
- return false; |
- } |
- |
- |
- @Override |
- public void removeSession(String suid) { |
- try (Connection c = Binaries.getConnection(jdbc_session_id, l)) { |
- try (Statement st = c.createStatement()) { |
- String sql = "DELETE FROM csip_sessions WHERE suid='" + suid + "';"; |
- l.info("removeSession() " + sql); |
- st.executeUpdate(sql); |
- } |
- } catch (SQLException | ServiceException ex) { |
- l.log(Level.SEVERE, null, ex); |
- LOG.log(Level.SEVERE, null, ex); |
- } |
- } |
- |
- |
- @Override |
- public void shutdown() throws Exception { |
- // should shut down at the end. |
- } |
- |
- |
- @Override |
- public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) { |
- if (sortby == null) { |
- sortby = "tst"; |
- sortAsc = false; |
- } |
- Set<String> keys = new LinkedHashSet<>(); |
- try (Connection c = Binaries.getConnection(jdbc_session_id, l)) { |
- try (Statement st = c.createStatement()) { |
- String sql = isMSSQL |
- ? "SELECT suid FROM csip_sessions ORDER BY " + sortby + " OFFSET " + skip + " ROWS FETCH NEXT " + limit + " ROWS ONLY;" |
- : "SELECT suid FROM csip_sessions ORDER BY " + sortby + " LIMIT " + limit + " OFFSET " + skip + ";"; |
- l.info("keys() " + sql); |
- ResultSet rs = st.executeQuery(sql); |
- while (rs.next()) { |
- keys.add(rs.getString(1)); |
- } |
- } |
- } catch (SQLException | ServiceException ex) { |
- l.log(Level.SEVERE, null, ex); |
- LOG.log(Level.SEVERE, null, ex); |
- } |
- return keys; |
- } |
- |
- |
- @Override |
- public synchronized long getCount() { |
- try (Connection c = Binaries.getConnection(jdbc_session_id, l)) { |
- try (Statement st = c.createStatement()) { |
- String sql = "SELECT COUNT(*) FROM csip_sessions;"; |
- l.info("getCount() " + sql); |
- ResultSet rs = st.executeQuery(sql); |
- rs.next(); |
- return rs.getLong(1); |
- } |
- } catch (SQLException | ServiceException ex) { |
- l.log(Level.SEVERE, null, ex); |
- LOG.log(Level.SEVERE, null, ex); |
- } |
- return -1; |
- } |
- |
- |
- @Override |
- public void ping() throws Exception { |
- } |
- |
- |
- private void createTableIfNeeded() { |
- try (Connection c = Binaries.getConnection(jdbc_session_id, l)) { |
- String driver = c.getMetaData().getDriverName(); |
- l.info("Driver name: " + driver); |
- isMSSQL = driver.contains("Microsoft"); |
- try (Statement st = c.createStatement()) { |
- // mssql |
- String sql = isMSSQL |
- ? "if not exists (select * from sysobjects where name='csip_sessions' and xtype='U')" |
- + " BEGIN CREATE TABLE csip_sessions (" |
- + " suid VARCHAR(64) primary key," |
- + " tst VARCHAR(64)," |
- + " exp VARCHAR(64)," |
- + " srv VARCHAR(64)," |
- + " sta VARCHAR(64)," |
- + " nip VARCHAR(64)," |
- + " rip VARCHAR(64)," |
- + " cpu INTEGER," |
- + " pro VARCHAR(64)," |
- + " att VARCHAR(64)" |
- + ") END;" |
- // |
- : "CREATE TABLE IF NOT EXISTS csip_sessions (" |
- + " suid VARCHAR(64) primary key," |
- + " tst VARCHAR(64)," |
- + " exp VARCHAR(64)," |
- + " srv VARCHAR(64)," |
- + " sta VARCHAR(64)," |
- + " nip VARCHAR(64)," |
- + " rip VARCHAR(64)," |
- + " cpu INTEGER," |
- + " pro VARCHAR(64)," |
- + " att VARCHAR(64)" |
- + ");"; |
- l.info("createTable() " + sql); |
- st.execute(sql); |
- l.info("created Table. "); |
- } |
- } catch (SQLException | ServiceException ex) { |
- l.severe("ERROR: connecting to session store, check connection string or database setup. need r/w SQL access."); |
- l.log(Level.SEVERE, null, ex); |
- LOG.log(Level.SEVERE, null, ex); |
- } |
- } |
- } |
- |
- /** |
- * |
- */ |
- private static class MongoDBSessionStore implements SessionStore { |
- |
- MongoClient mongo; |
- MongoDatabase db; |
- static final String coll = "session"; |
- |
- |
- MongoDBSessionStore(String uri) { |
- MongoClientURI u = new MongoClientURI(uri); |
- String dbname = u.getDatabase(); |
- if (dbname == null) { |
- dbname = "csip"; |
- } |
- mongo = new MongoClient(u); |
- db = mongo.getDatabase(dbname); |
- LOG.info("Connected to mongo session csip store : " + uri); |
- } |
- |
- |
- @Override |
- public synchronized void setSession(String suid, ModelSession session) throws Exception { |
- Document doc = new Document(); |
- doc.append("_id", suid) |
- .append("tst", session.getTstamp()) |
- .append("exp", session.getExpDate()) |
- .append("srv", session.getService()) |
- .append("sta", session.getStatus()) |
- .append("nip", session.getNodeIP()) |
- .append("rip", session.getReqIP()) |
- .append("cpu", session.getCputime().isEmpty() ? -1 : Integer.parseInt(session.getCputime())) |
- .append("pro", session.getProgress() == null ? "" |
- : session.getProgress()); |
- |
- String[] a = session.getAttachments(); |
- doc.append("att", String.join(",", a)); |
- |
- UpdateOptions opt = new UpdateOptions(); |
- opt.upsert(true); |
- MongoCollection<Document> c = db.getCollection(coll); |
- c.replaceOne(new Document("_id", suid), doc, opt); |
- LOG.info("added: " + suid + " " + doc.toJson()); |
- } |
- |
- |
- @Override |
- public synchronized ModelSession getSession(String suid) throws Exception { |
- FindIterable<Document> r = db.getCollection(coll).find(new Document("_id", suid)); |
- Document o = r.first(); |
- if (o != null) { |
- // Build our document and add all the fields |
- ModelSession s = new ModelSession(); |
- s.setTstamp(o.get("tst", String.class)); |
- s.setExpDate(o.get("exp", String.class)); |
- s.setService(o.get("srv", String.class)); |
- s.setStatus(o.get("sta", String.class)); |
- s.setNodeIP(o.get("nip", String.class)); |
- s.setReqIP(o.get("rip", String.class)); |
- int cpu = o.get("cpu", Integer.class); |
- s.setCputime(cpu == -1 ? "" : Integer.toString(cpu)); |
- s.setProgress((o.get("pro", String.class)).equals("") ? null : (o.get("pro", String.class))); |
- String l = o.get("att", String.class); |
- s.setAttachments(l.isEmpty() ? ModelSession.NO_ATTACHMENTS : l.split(",")); |
- return s; |
- } else { |
- LOG.warning("Unable get session : " + suid + " as record already exists!!!"); |
- return null; |
- } |
- } |
- |
- |
- @Override |
- public synchronized boolean hasSession(String suid) throws Exception { |
- return db.getCollection(coll).count(new Document("_id", suid)) == 1; |
- } |
- |
- |
- @Override |
- public synchronized void removeSession(String suid) { |
- DeleteResult r = db.getCollection(coll).deleteOne(new Document("_id", suid)); |
- if (r.getDeletedCount() != 1) { |
- throw new RuntimeException("Not deleted: " + suid); |
- } |
- } |
- |
- |
- @Override |
- public synchronized void shutdown() throws Exception { |
- mongo.close(); |
- } |
- |
- |
- @Override |
- public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) { |
- if (sortby == null) { |
- sortby = "tst"; |
- sortAsc = false; |
- } |
- Document sort = new Document(sortby, sortAsc ? 1 : -1); |
- Set<String> l = new LinkedHashSet<>(); |
- MongoCollection<Document> c = db.getCollection(coll); |
- for (Document doc : c.find().sort(sort).skip(skip).limit(limit)) { |
- l.add(doc.get("_id", String.class)); |
- } |
- return l; |
- } |
- |
- |
- @Override |
- public synchronized long getCount() { |
- return db.getCollection(coll).count(); |
- } |
- |
- |
- @Override |
- public void ping() throws Exception { |
- MongoCollection<Document> c = db.getCollection(coll + "_test"); |
- |
- String id = "test_id_1234"; |
- |
- // insert document |
- Document doc = new Document("_id", id); |
- UpdateOptions opt = new UpdateOptions(); |
- opt.upsert(true); |
- UpdateResult res = c.replaceOne(new Document("_id", id), doc, opt); |
- LOG.info("added " + res); |
- // find it. |
- FindIterable<Document> d = c.find(new Document("_id", id)); |
- Document o = d.first(); |
- if (o == null) { |
- throw new Exception("Not found: " + id); |
- } |
- if (!id.equals(o.getString("_id"))) { |
- throw new Exception("Id not found: " + id); |
- } |
- LOG.info("found " + o); |
- |
- // remove it. |
- DeleteResult r = c.deleteOne(new Document("_id", id)); |
- if (r.getDeletedCount() != 1) { |
- throw new Exception("Not deleted: " + id); |
- } |
- LOG.info("deleted " + r); |
- c.drop(); |
- } |
- } |
- |
- /** |
- * |
- */ |
- private static class MongoDBArchive implements ArchiveStore { |
- |
- MongoClient mongo; |
- String dbname; |
- static final String fsColl = "fs"; |
- |
- |
- MongoDBArchive(String uri) { |
- // "mongodb://user:pass@host:port/db" |
- MongoClientURI u = new MongoClientURI(uri); |
- dbname = u.getDatabase(); |
- if (dbname == null) { |
- dbname = "csip"; |
- } |
- mongo = new MongoClient(u); |
- LOG.info("Connected to archive csip store : " + uri); |
- } |
- |
- |
- public synchronized void removeAll() { |
- DB db = mongo.getDB(dbname); |
- |
- //Let's store the standard data in regular collection |
- DBCollection collection = db.getCollection(fsColl); |
- collection.remove((new BasicDBObject())); |
- |
- // remove all file in the bucket |
- GridFS fileStore = new GridFS(db, fsColl); |
- fileStore.remove((DBObject) null); |
- } |
- |
- |
- @Override |
- public synchronized void archiveSession(String sid, ModelArchive ma, File f) throws Exception { |
- DB db = mongo.getDB(dbname); |
- |
- //Let's store the standard data in regular collection |
- DBCollection collection = db.getCollection(fsColl); |
- |
- String max = getString(CSIP_ARCHIVE_MAX_FILE_SIZE); |
- long limit = Binaries.parseByteSize(max); |
- |
- String filename = null; |
- long filesize = 0; |
- InputStream is = null; |
- |
- if (f.length() <= limit) { |
- filename = f.getName(); |
- filesize = f.length(); |
- is = new FileInputStream(f); |
- } else { |
- filename = sid + ".txt"; |
- filesize = ARCHIVE_TOO_BIG.length(); |
- is = new ByteArrayInputStream(ARCHIVE_TOO_BIG.getBytes()); |
- } |
- |
- // Let's query to ensure ID does not already exist in Mongo |
- // if it does, we will alert the user |
- BasicDBObject query = new BasicDBObject("_id", sid); |
- DBCursor cursor = collection.find(query); |
- if (!cursor.hasNext()) { |
- // Build our document and add all the fields |
- BasicDBObject doc = new BasicDBObject(); |
- doc.append("_id", sid); |
- doc.append("service", ma.getService()); |
- doc.append("status", ma.getStatus()); |
- doc.append("ctime", ma.getCtime()); |
- doc.append("etime", ma.getEtime()); |
- doc.append("req_ip", ma.getReqIP()); |
- doc.append("filename", "files-" + filename); |
- doc.append("filelength", filesize); |
- |
- //insert the document into the collection |
- collection.insert(doc); |
- |
- // Now let's store the binary file data using filestore GridFS |
- GridFS fileStore = new GridFS(db, fsColl); |
- GridFSInputFile file = fileStore.createFile(is); |
- file.setId(sid); |
- file.setFilename("files-" + filename); |
- file.save(); |
- } else { |
- LOG.warning("Unable to archive record with ID: " + sid + " as record already exists!!!"); |
- } |
- } |
- |
- |
- @Override |
- public synchronized ModelArchive getArchive(String suid) throws Exception { |
- DB db = mongo.getDB(dbname); |
- DBCollection collection = db.getCollection(fsColl); |
- |
- BasicDBObject findQuery = new BasicDBObject("_id", suid); |
- DBObject doc = collection.findOne(findQuery); |
- if (doc != null) { |
- ModelArchive ma = new ModelArchive( |
- doc.get("ctime").toString(), |
- doc.get("etime").toString(), |
- doc.get("service").toString(), |
- doc.get("status").toString(), |
- doc.get("req_ip").toString()); |
- return ma; |
- } |
- return null; |
- } |
- |
- |
- @Override |
- public synchronized void removeArchive(String suid) { |
- DB db = mongo.getDB(dbname); |
- DBCollection collection = db.getCollection(fsColl); |
- BasicDBObject q = new BasicDBObject("_id", suid); |
- WriteResult res = collection.remove(q); |
- |
- GridFS fileStore = new GridFS(db, fsColl); |
- fileStore.remove(q); |
- } |
- |
- |
- @Override |
- public synchronized byte[] getFile(String suid, String filename) throws Exception { |
- DB db = mongo.getDB(dbname); |
- GridFS fileStore = new GridFS(db, fsColl); |
- BasicDBObject query = new BasicDBObject("_id", suid); |
- GridFSDBFile gridFile = fileStore.findOne(query); |
- if (gridFile != null) { |
- ByteArrayOutputStream out = new ByteArrayOutputStream(); |
- gridFile.writeTo(out); |
- return out.toByteArray(); |
- } |
- return null; |
- } |
- |
- |
- @Override |
- public synchronized void shutdown() throws Exception { |
- mongo.close(); |
- } |
- |
- |
- @Override |
- public synchronized long getCount() { |
- DB db = mongo.getDB(dbname); |
- return db.getCollection(fsColl).count(); |
- } |
- |
- |
- // new keys implementation. |
- public synchronized Set<String> keys(int skip, int limit, String sortby, boolean sortAsc) { |
- if (sortby == null) { |
- sortby = "ctime"; |
- sortAsc = false; |
- } |
- BasicDBObject sort = new BasicDBObject(sortby, sortAsc ? 1 : -1); |
- Set<String> l = new LinkedHashSet<>(); |
- |
- DB db = mongo.getDB(dbname); |
- DBCollection c = db.getCollection(fsColl); |
- DBCursor cursor = c.find().sort(sort).skip(skip).limit(limit); |
- while (cursor.hasNext()) { |
- DBObject o = cursor.next(); |
- l.add(o.get("_id").toString()); |
- } |
- return l; |
- } |
- |
- |
- @Override |
- public synchronized boolean hasArchive(String suid) throws Exception { |
- DB db = mongo.getDB(dbname); |
- DBCollection c = db.getCollection(fsColl); |
- return c.count(new BasicDBObject("_id", suid)) == 1; |
- } |
- } |
- |
- |
- static boolean isArchiveEnabled() { |
- return !getString("csip.archive.backend", NONE).equals(NONE); |
- } |
- |
- |
static boolean isResultStoreEnabled() { |
return !getString(CSIP_RESULTSTORE_BACKEND, NONE).equals(NONE); |
} |
- |
- |
+ |
+ |
static synchronized SessionStore getSessionStore() { |
if (session == null) { |
String uri = null; |
switch (getString(CSIP_SESSION_BACKEND)) { |
- case "mongodb": |
+ case MONGODB: |
uri = getString(CSIP_SESSION_MONGODB_URI); |
if (uri == null) { |
throw new RuntimeException("missing uri configuration entry 'csip.session.mongodb.uri'"); |
} |
- session = new MongoDBSessionStore(uri); |
+ session = new MongoSessionStore(uri); |
break; |
- case "sql": |
+ case SQL: |
uri = getString("csip.session.sql.uri"); |
if (uri == null) { |
throw new RuntimeException("missing uri configuration entry 'csip.session.sql.uri'"); |
} |
session = new SQLSessionStore(uri); |
break; |
- case "local": |
- session = new LocalStore(); |
+ case LOCAL: |
+ session = new LocalSessionStore(); |
break; |
default: |
throw new RuntimeException("unknown session backend: " + getString(CSIP_SESSION_BACKEND)); |
@@ -1360,20 +331,20 @@ |
} |
return session; |
} |
- |
- |
+ |
+ |
static synchronized ArchiveStore getArchiveStore() { |
if (archive == null) { |
if (!isArchiveEnabled()) { |
archive = ArchiveStore.NONE; |
} else { |
switch (getString(CSIP_ARCHIVE_BACKEND)) { |
- case "mongodb": |
+ case MONGODB: |
String uri = getString(CSIP_ARCHIVE_MONGODB_URI); |
if (uri == null) { |
throw new RuntimeException("missing uri configuration entry 'csip.archive.mongodb.uri'"); |
} |
- archive = new MongoDBArchive(uri); |
+ archive = new MongoArchiveStore(uri); |
break; |
case NONE: |
archive = ArchiveStore.NONE; |
@@ -1385,15 +356,15 @@ |
} |
return archive; |
} |
- |
- |
+ |
+ |
static synchronized ResultStore getResultStore() { |
if (resultStore == null) { |
if (!isResultStoreEnabled()) { |
resultStore = ResultStore.NONE; |
} else { |
switch (getString(CSIP_RESULTSTORE_BACKEND)) { |
- case "mongodb": |
+ case MONGODB: |
resultStore = new MongoResultStore(getString(CSIP_RESULTSTORE_MONGODB_URI)); |
break; |
case NONE: |
@@ -1406,24 +377,24 @@ |
} |
return resultStore; |
} |
- |
- |
+ |
+ |
static synchronized Timer getTimer() { |
if (timer == null) { |
timer = new Timer(); |
} |
return timer; |
} |
- |
- |
+ |
+ |
static synchronized ExecutorService getExecutorService() { |
if (exec == null) { |
exec = Executors.newCachedThreadPool(); |
} |
return exec; |
} |
- |
- |
+ |
+ |
static List<ModelDataService.Task> getModelTasks() { |
return tasks; |
} |
@@ -1434,8 +405,7 @@ |
* |
* @param context |
*/ |
- static void startup(ServletContext context) { |
- reg.setContext(context.getContextPath()); |
+ static void startup() { |
} |
|
|
@@ -1444,13 +414,13 @@ |
* |
* @param context |
*/ |
- static void shutdown(ServletContext context) { |
+ static void shutdown() { |
for (ModelDataService.Task t : tasks) { |
t.cancel(); |
} |
- |
+ |
reg.unregister(); |
- |
+ |
if (exec != null) { |
LOG.info("Shutting down ExecutorService"); |
exec.shutdownNow(); |
@@ -1463,7 +433,7 @@ |
} |
session = null; |
} |
- |
+ |
if (archive != null) { |
try { |
archive.shutdown(); |
@@ -1472,7 +442,7 @@ |
} |
archive = null; |
} |
- |
+ |
if (resultStore != null) { |
try { |
resultStore.shutdown(); |
@@ -1484,7 +454,7 @@ |
if (timer != null) { |
timer.cancel(); |
} |
- |
+ |
Binaries.shutdownJDBC(); |
} |
|
@@ -1495,123 +465,92 @@ |
static void update() { |
rehashProperties(); |
} |
- |
- |
+ |
+ |
static public Collection<PostgresChunk> getPostgresChunks() { |
return pcs; |
} |
- |
- |
+ |
+ |
public static boolean hasProperty(String key) { |
return allProps.containsKey(key); |
} |
- |
- |
+ |
+ |
public static boolean isString(String key, String str) { |
String s = getString(key); |
return (s != null) && s.equals(str); |
} |
- |
- |
+ |
+ |
public static String getString(String key, String def) { |
return getP(key, def); |
} |
- |
- |
+ |
+ |
public static String getString(String key) { |
return getP(key, null); |
} |
- |
- |
+ |
+ |
public static boolean getBoolean(String key, boolean def) { |
return Boolean.parseBoolean(getP(key, Boolean.toString(def))); |
} |
- |
- |
+ |
+ |
public static boolean getBoolean(String key) { |
return Boolean.parseBoolean(getP(key, "false")); |
} |
- |
- |
+ |
+ |
public static int getInt(String key, int def) { |
return Integer.parseInt(getP(key, Integer.toString(def))); |
} |
- |
- |
+ |
+ |
public static int getInt(String key) { |
return Integer.parseInt(getP(key, "0")); |
} |
- |
- |
+ |
+ |
public static long getLong(String key, long def) { |
return Long.parseLong(getP(key, Long.toString(def))); |
} |
- |
- |
+ |
+ |
public static long getLong(String key) { |
return Long.parseLong(getP(key, "0L")); |
} |
- |
- |
+ |
+ |
public static double getDouble(String key, double def) { |
return Double.parseDouble(getP(key, Double.toString(def))); |
} |
- |
- |
+ |
+ |
public static double getDouble(String key) { |
return Double.parseDouble(getP(key, "0.0")); |
} |
- |
- |
+ |
+ |
private static String getP(String key, String def) { |
- return resolve(allProps.getProperty(key, def)); |
+ return Utils.resolve(allProps.getProperty(key, def)); |
} |
- |
- |
+ |
+ |
static Properties getProperties() { |
return p; |
} |
- |
- |
+ |
+ |
static Properties getMergedProperties() { |
return allProps; |
} |
- |
- |
+ |
+ |
private static void put(String key, String value) { |
p.setProperty(key, value); |
} |
- |
- |
- public static void main(String[] args) { |
-// System.out.println(System.getenv()); |
- String a = resolve("${test.port} abc ${java.home} ///${PATH}/// def ${csip.bin.dir}"); |
- System.out.println(a); |
- |
- String b = "d".replace("___", "-").replace("__", "."); |
- System.out.println(b); |
- } |
- |
- |
- /** |
- * Resolve a string with system and CSIP properties. |
- * |
- * @param str the string to resolve |
- * @return the resolved string. |
- */ |
- public static String resolve(String str) { |
- if (str == null) { |
- return null; |
- } |
- if (!str.contains("${")) { |
- return str; |
- } |
- String res = resolve0(str, allProps, new HashSet<>()); |
- if (res.contains("${")) { |
- LOG.warning("Resolving one or more varariables failed in: " + res); |
- } |
- return res; |
- } |
|
|
/** |
@@ -1622,7 +561,6 @@ |
allProps.clear(); |
allProps.putAll(Config.properties()); |
allProps.putAll(System.getProperties()); |
- |
Map<String, String> env = System.getenv(); |
for (String key : env.keySet()) { |
String newKey = key.replace("___", "-").replace("__", "."); |
@@ -1630,165 +568,4 @@ |
} |
} |
|
- |
- /** |
- * property substitution in a string. |
- * |
- * @param str |
- * @return |
- */ |
- private static String resolve0(String str, Properties prop, Set<String> keys) { |
- int idx = 0; |
- while (idx < str.length()) { |
- int start = str.indexOf("${", idx); |
- int end = str.indexOf("}", idx); |
- if (start == -1 || end == -1 || end < start) { |
- break; |
- } |
- String key = str.substring(start + 2, end); |
- if (keys.contains(key)) { |
- System.err.println("Circular property reference: " + key); |
- break; |
- } |
- String val = prop.getProperty(key); |
- if (val != null) { |
- keys.add(key); |
- val = resolve0(val, prop, keys); |
- keys.remove(key); |
- str = str.replace("${" + key + "}", val); |
- idx = start + val.length(); |
- } else { |
- idx = start + key.length() + 3; |
- } |
- } |
- return str; |
- } |
- |
- |
- static void checkRemoteAccessACL(HttpServletRequest req) { |
- String reqIp = req.getHeader("X-Forwarded-For"); |
- if (reqIp == null) { |
- reqIp = req.getRemoteAddr(); |
- } |
- if (!checkRemoteAccessACL(reqIp)) { |
- LOG.log(Level.WARNING, req.getMethod() + " " + req.getRequestURI() + ", denied for " + reqIp); |
- throw new WebApplicationException(Response.Status.UNAUTHORIZED); |
- } |
- LOG.log(Level.INFO, req.getMethod() + " " + req.getRequestURI() + ", OK for " + reqIp); |
- } |
- |
- |
- static boolean checkRemoteAccessACL(String ip) { |
- // default is "localhost" only. |
- String acls = Config.getString(CSIP_REMOTE_ACL, "127.0.0.1/32"); |
- String[] acl = acls.split("\\s+"); |
- for (String sn : acl) { |
- if (sn.indexOf('/') == -1) { |
- if (sn.equals(ip)) { |
- return true; |
- } |
- } else { |
- if (isInSubnet(ip, sn)) { |
- return true; |
- } |
- } |
- } |
- return false; |
- } |
- |
- |
- static boolean isInSubnet(String ip, String subnet) { |
- try { |
- SubnetUtils utils = new SubnetUtils(subnet); |
- utils.setInclusiveHostCount(true); |
- return utils.getInfo().isInRange(ip); |
- } catch (Exception E) { |
- LOG.log(Level.WARNING, "Invalid Subnet: " + subnet, E); |
- return false; |
- } |
- } |
- |
- public static class PostgresChunk { |
- |
- String name; |
- double minLong; |
- double maxLong; |
- Connection[] connections; |
- int currentConnection; |
- |
- |
- public PostgresChunk(String name, double minLong, double maxLong) { |
- this.name = name; |
- this.minLong = minLong; |
- this.maxLong = maxLong; |
- } |
- |
- |
- public boolean equals(Object obj) { |
- return this.name.equals(((PostgresChunk) obj).getName()); |
- } |
- |
- |
- public String getName() { |
- return name; |
- } |
- |
- |
- public double getMinLong() { |
- return minLong; |
- } |
- |
- |
- public double getMaxLong() { |
- return maxLong; |
- } |
- |
- |
- public void setConnections(Connection[] connections) { |
- this.connections = connections; |
- } |
- |
- |
- public Connection[] getConnections() { |
- return connections; |
- } |
- |
- |
- public void connectionInc() { |
- currentConnection++; |
- } |
- |
- |
- public int getCurrentConnectionIdx() { |
- return currentConnection; |
- } |
- |
- |
- public void setCurrentConnectionIdx(int currentConnection) { |
- this.currentConnection = currentConnection; |
- } |
- |
- |
- public Connection getCurrentConnection() { |
- if ((connections != null) && (connections.length >= currentConnection)) { |
- return connections[currentConnection]; |
- } else { |
- return null; |
- } |
- } |
- |
- |
- public JSONObject getJSON() { |
- try { |
- JSONObject pgchunk = new JSONObject(); |
- pgchunk.put("name", name); |
- pgchunk.put("minLong", minLong); |
- pgchunk.put("maxLong", maxLong); |
- return pgchunk; |
- } catch (JSONException jse) { |
- LOG.warning("Error creating JSON representation of a db chunk object"); |
- } |
- return new JSONObject(); |
- } |
- } |
} |