PubSubService.java [src/csip] Revision: Date:
/*
* $Id$
*
* This file is part of the Cloud Services Integration Platform (CSIP),
* a Model-as-a-Service framework, API and application suite.
*
* 2012-2022, Olaf David and others, OMSLab, Colorado State University.
*
* OMSLab licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package csip;
import csip.api.server.ServiceException;
import csip.utils.Client;
import static csip.ModelDataService.KEY_METAINFO;
import static csip.ModelDataService.KEY_SUUID;
import csip.utils.SimpleCache;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.PathParam;
import org.codehaus.jettison.json.JSONObject;
public abstract class PubSubService extends ModelDataService {
static final String KEY_QUEUE_POS = "queue_pos";
static String delegateUrl = Config.getString("csip.pubsub.delegate.url");
static boolean needsWebHook = Config.getBoolean("csip.pubsub.webhook.payload", true);
static int queueLen = Config.getInt("csip.pubsub.queue.len", Integer.MAX_VALUE);
static int queueRemainingLen = Config.getInt("csip.pubsub.queue.remaining.len", 25);
static Logger l = Config.LOG;
private static QueueManagement mgmt;
static {
try {
mgmt = new QueueManagement();
} catch (Exception E) {
l.log(Level.SEVERE, "Init error:", E);
}
}
@PathParam("delegate")
String delegate;
boolean isQ = false;
@Override
boolean isQueued() {
return isQ;
}
@Override
protected void doProcess() throws Exception {
if (delegate == null || delegate.isEmpty())
throw new ServiceException("No delegate service provided.");
switch (delegate) {
// Debugging/status
case "status":
results().put("req_in_queue", mgmt.getQueueLen(), "number of requests currently in queue.")
.put("queue_remaining", mgmt.getRemainingCapacity(), "remaining queue capacity.")
.put("queue_capacity", queueLen, "total queue capacity.")
.put("incoming", mgmt.incoming.get(), "total number of externally received requests.")
.put("queued_sub", mgmt.queued_sub.get(), "total number of queued requests.")
.put("queued_back", mgmt.queued_back.get(), "total number of requests put back because of backend capacity at max.")
.put("exec_sub", mgmt.exec_sub.get(), "total number of requests submitted for backend execution.")
.put("exec_rec", mgmt.exec_rec.get(), "total number of responses received from backend.")
.put("openQueue", mgmt.queueOpen.get(), "is the queue open or not.");
return;
case "reset":
synchronized (mgmt) {
mgmt.queue.clear();
mgmt.sn.set(0);
}
results().put("ok", true);
return;
case "payloads":
int i = 0;
for (QueueManagement.Payload payload : mgmt.queue) {
results().put(i++ + ": " + payload.url, payload.request);
}
results().put("ok", true);
return;
case "toggle":
mgmt.queueOpen.set(!mgmt.queueOpen.get());
results().put("queueOpen", mgmt.queueOpen.get());
return;
}
if (l.isLoggable(Level.INFO))
l.info(delegate);
if (!mgmt.queueOpen.get())
throw new ServiceException("Queue closed for submission, try again later.");
if (mgmt.getRemainingCapacity() < queueRemainingLen)
throw new ServiceException("Queue capacity reached, try again later.");
if (needsWebHook && !metainfo().hasName(KEY_WEBHOOK))
throw new ServiceException("'webhook' metainfo missing.");
if (delegateUrl == null) {
String u = request().getURL();
delegateUrl = u;
}
String delegateService = delegateUrl + "/" + delegate;
JSONObject v = new JSONObject(request().getRequest().toString());
JSONObject mi = v.getJSONObject(KEY_METAINFO);
mi.put(KEY_MODE, ASYNC);
mi.remove("cloud_node");
mi.remove("status");
mi.remove("tstamp");
mi.remove("request_ip");
mi.put("csip-auth", request().getAuthToken());
try {
if (mgmt.checkTarget) {
// check if target service is available
long p = Client.ping(delegateService, mgmt.pingTimeout);
if (p == -1)
throw new ServiceException("Target service not available: " + delegateService);
}
mgmt.incoming.incrementAndGet();
long pos = mgmt.doQueue(delegateService, v.toString());
if (pos == -1)
throw new ServiceException("Error queueing the service, try again later.");
metainfo().put(KEY_QUEUE_POS, pos);
if (l.isLoggable(Level.INFO))
l.info("QUEUE POS, " + pos);
isQ = true;
} catch (Exception E) {
throw new ServiceException("Error queueing the service", E);
}
}
/**
* QueueManagement
*/
private static class QueueManagement {
long submitDelay = Config.getLong("csip.pubsub.submit.delay.ms", 1000);
long delayAtCapacilty = Config.getLong("csip.pubsub.atcapacity.delay.ms", 2000);
int defaultCapacity = Config.getInt("csip.pubsub.default.capacity", 8);
int pingTimeout = Config.getInt("csip.pubsub.ping.timeout.ms", 1000);
boolean checkTarget = Config.getBoolean("csip.pubsub.check.target", false);
long offerMS = Config.getLong("csip.pubsub.offer.ms", 500);
long pollMS = Config.getLong("csip.pubsub.poll.ms", 2000);
long loadcheck = Config.getLong("csip.pubsub.loadcheck.ms", 2000);
ExecutorService submitExec = Executors.newCachedThreadPool();
ScheduledExecutorService probeExec = Executors.newSingleThreadScheduledExecutor();
class LoadProbe implements Runnable {
int MAX_TTL = 10;
private class Load {
Integer running;
AtomicInteger ttl = new AtomicInteger(MAX_TTL);
@Override
public String toString() {
return "Load[" + running + "," + ttl + "]";
}
}
// service context > current load
Map<String, Load> loads = new ConcurrentHashMap<>();
Map<String, String> sh = new HashMap<>();
@Override
public void run() {
try {
if (submissionRunning.get() && !queue.isEmpty())
update();
} catch (Exception ex) {
l.log(Level.SEVERE, null, ex);
}
}
void close() {
}
private synchronized void update() throws Exception {
if (l.isLoggable(Level.INFO))
l.info("Backend update.");
// keys to remove
List<String> context_to_remove = new ArrayList<>();
for (Map.Entry<String, Load> entry : loads.entrySet()) {
String context = entry.getKey();
Load load = entry.getValue();
int ttl = load.ttl.decrementAndGet();
// no being used for 10 * 2 seconds (default)
if (ttl <= 0) {
context_to_remove.add(context);
} else {
Integer r = query(context);
load.running = r;
if (r == Integer.MAX_VALUE)
load.ttl.set(1);
if (l.isLoggable(Level.INFO))
l.info("probe: " + context + " -> " + load);
}
}
for (String ctx : context_to_remove) {
Load load = loads.remove(ctx);
if (l.isLoggable(Level.INFO))
l.info("removed probe for : " + ctx + " " + load);
}
}
private Integer query(String s) throws Exception {
try {
// Client cl = new Client(l);
// String result = cl.doGET(s + "/q/running");
// cl.close();
// return Integer.valueOf(result);
return readFrom(s + "/q/running");
} catch (Exception E) {
l.log(Level.SEVERE, "Error getting the current running services: ", E);
return Integer.MAX_VALUE;
}
}
private synchronized int getCurrentLoad(String service) throws Exception {
String context = getContext(service);
Load load = loads.get(context);
if (load == null) {
load = new Load();
load.running = query(context);
loads.put(context, load);
if (l.isLoggable(Level.INFO))
l.info("create probe for : " + service + " " + load);
}
// reset the use tick
load.ttl.set(load.running == Integer.MAX_VALUE ? 1 : MAX_TTL);
if (l.isLoggable(Level.INFO))
l.info("get probe for : " + service + " " + load);
return load.running;
}
private String getContext(String service) throws URISyntaxException {
String context = sh.get(service);
if (context == null) {
String[] u = Utils.getURIParts(service);
sh.put(service, context = u[0] + u[1] + u[2] + "/" + u[3]);
}
return context;
}
}
static Integer readFrom(String url) {
Integer running = null;
for (int i = 0; i < 3; i++) {
if (running != null)
return running;
try {
URL u = new URL(url);
HttpURLConnection con = (HttpURLConnection) u.openConnection();
con.setConnectTimeout(500);
con.setRequestProperty("accept", "text/plain");
Scanner sc = new Scanner(con.getInputStream(), "UTF-8");
String text = sc.next();
sc.close();
if (l.isLoggable(Level.INFO))
l.info("read from : " + url + " " + text);
running = Integer.valueOf(text);
if (running < 0)
running = null;
} catch (IOException | NumberFormatException E) {
l.log(Level.SEVERE, "Error " + i + "getting the current running services: ", E);
// System.out.println("Error getting the current running services: " + E);
// System.out.println("Attempt " + i);
}
}
return Integer.MAX_VALUE;
}
public static void main(String[] args) throws Exception {
for (int i = 0; i < 50; i++) {
Thread.sleep(500);
System.out.println(i + " " + QueueManagement.readFrom("http://csip.engr.colostate.edu:8097/csip-weps/q/running"));
}
}
BlockingQueue<Payload> queue = new LinkedBlockingQueue<>(queueLen);
LoadProbe probe = new LoadProbe();
FutureTask<String> submitTask = new FutureTask<>(new SubmitJobThread());
SimpleCache<String, Integer> capacities = new SimpleCache<>();
final AtomicBoolean submissionRunning = new AtomicBoolean(true);
final AtomicBoolean queueOpen = new AtomicBoolean(true);
AtomicInteger incoming = new AtomicInteger(0);
AtomicInteger queued_sub = new AtomicInteger(0);
AtomicInteger queued_rec = new AtomicInteger(0);
AtomicInteger queued_back = new AtomicInteger(0);
AtomicInteger exec_sub = new AtomicInteger(0);
AtomicInteger exec_rec = new AtomicInteger(0);
AtomicInteger sn = new AtomicInteger(0);
static class Payload {
String url;
String request;
Payload(String url, String request) {
this.url = url;
this.request = request;
}
}
public int getQueueLen() {
return queue.size();
}
public int getRemainingCapacity() {
return queue.remainingCapacity();
}
synchronized int doQueue(String url, String request) throws Exception {
if (l.isLoggable(Level.INFO))
l.log(Level.INFO, "Queueing request :{0}", new Object[]{url});
boolean s = queue.offer(new Payload(url, request), offerMS, TimeUnit.MILLISECONDS);
if (!s)
return -1;
queued_sub.getAndIncrement();
return getQueueLen();
}
/**
* static capacity, based on property settings.
*/
int getContextCapacity(String context) {
return capacities.get(context, c
-> Config.getInt("csip.pubsub."
+ c.replace('/', '.').replace(':', '.') + ".capacity", defaultCapacity));
}
/**
* Submit for execution.
*
* This thread pulls the entries from the queue and submits it for
* execution.
*
*/
class SubmitJobThread implements Callable<String> {
long delay = submitDelay;
private void executeAsync(String url, String payload, int capacity) {
try (Client c0 = new Client(l)) {
JSONObject o = new JSONObject(payload);
Map<String, String> header = new HashMap<>();
header.put(KEY_SUUID, o.getJSONObject(KEY_METAINFO).getString(KEY_SUUID));
header.put("Connection", "Close");
o.getJSONObject("metainfo")
.put("cap", capacity)
.put("sn", sn.get())
// to prevent abuse
.put("csip.archive.enabled", false);
sn.incrementAndGet();
JSONObject result = c0.doPOST(url, o, header);
if (l.isLoggable(Level.FINE))
l.fine("POST Run to " + url + " ... received: " + result.toString());
exec_sub.incrementAndGet();
} catch (Exception ex) {
l.log(Level.SEVERE, null, ex);
}
}
private void submit(String serviceUrl, String servicePayload) throws Exception {
// wait a bit before continue processing
try {
Thread.sleep(delay);
} catch (InterruptedException ex) {
l.info("Interrupted");
}
if (checkTarget) {
// Ping the service first.
long p = Client.ping(serviceUrl, pingTimeout);
if (p == -1) {
doQueue(serviceUrl, servicePayload);
delay = delayAtCapacilty;
if (l.isLoggable(Level.INFO))
l.info("Cannot ping the service, back in line...");
return;
}
}
// check the current load in the backend.
int currentLoad = probe.getCurrentLoad(serviceUrl);
int contextCapacity = getContextCapacity(probe.getContext(serviceUrl));
if (l.isLoggable(Level.INFO))
l.log(Level.INFO, "Load for {2}: {0}/{1}",
new Object[]{currentLoad, contextCapacity, serviceUrl});
// compare the current backend load against the backend capacity
if (currentLoad >= contextCapacity) {
// capacity reached, put it back in line
doQueue(serviceUrl, servicePayload);
queued_back.incrementAndGet();
delay = delayAtCapacilty;
if (l.isLoggable(Level.INFO))
l.log(Level.INFO, "back in line...{0}, {1}/{2}",
new Object[]{serviceUrl, currentLoad, contextCapacity});
} else {
// capacity is fine, submit for execution.
queued_rec.incrementAndGet();
executeAsync(serviceUrl, servicePayload, currentLoad);
delay = submitDelay;
}
}
@Override
public String call() throws Exception {
try {
while (submissionRunning.get()) {
Payload payload = queue.poll(pollMS, TimeUnit.MILLISECONDS);
if (payload != null) {
if (l.isLoggable(Level.INFO))
l.info("RECEIVED: " + payload.url);
if (l.isLoggable(Level.FINE))
l.fine(" Request: " + payload.request);
submit(payload.url, payload.request);
}
if (l.isLoggable(Level.INFO))
l.info("Submit Alive.");
}
} finally {
l.info("Submitter closed.");
}
return "Done Submit.";
}
}
void shutdown() {
submissionRunning.set(false);
try {
l.log(Level.INFO, submitTask.get());
} catch (InterruptedException | ExecutionException ex) {
l.log(Level.SEVERE, null, ex);
}
submitExec.shutdown();
probeExec.shutdown();
probe.close();
}
void startup() {
submitExec.submit(submitTask);
probeExec.scheduleWithFixedDelay(probe, 2000, loadcheck, TimeUnit.MILLISECONDS);
}
}
public static void onContextInit() {
try {
mgmt.startup();
l.log(Level.INFO, "Started Pub/Sub Threads.");
} catch (Exception E) {
l.log(Level.SEVERE, null, E);
}
}
public static void onContextDestroy() {
try {
mgmt.shutdown();
l.log(Level.INFO, "Stopped Pub/Sub Threads.");
} catch (Exception E) {
l.log(Level.SEVERE, null, E);
}
}
}