QueueingModelDataService.java [src/csip] Revision: default  Date:
 * $Id$
 * This file is part of the Cloud Services Integration Platform (CSIP),
 * a Model-as-a-Service framework, API and application suite.
 * 2012-2022, Olaf David and others, OMSLab, Colorado State University.
 * OMSLab licenses this file to you under the MIT license.
 * See the LICENSE file in the project root for more information.
package csip;

import csip.api.server.ServiceException;
import csip.utils.Client;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.UpdateResult;
import static csip.ModelDataService.CANCELED;
import static csip.ModelDataService.FAILED;
import static csip.ModelDataService.FINISHED;
import static csip.ModelDataService.KEY_METAINFO;
import static csip.ModelDataService.KEY_SUUID;
import static csip.ModelDataService.TIMEDOUT;
import csip.annotations.Author;
import csip.annotations.Description;
import csip.annotations.Documentation;
import csip.annotations.License;
import csip.annotations.Name;
import csip.annotations.State;
import csip.annotations.VersionInfo;
import csip.utils.SimpleCache;
import javax.ws.rs.Path;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.PathParam;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.bson.Document;
import org.codehaus.jettison.json.JSONObject;

@Author(name = "od", email = "<odavid@colostate.edu>", org = "CSU")
public class QueueingModelDataService extends ModelDataService {

  static final String KEY_QUEUE_POS = "queue_pos";

  static String delegateUrl = Config.getString("csip.pubsub.delegate.url");
  static boolean needsWebHook = Config.getBoolean("csip.pubsub.webhook.payload", true);
  static int queueLen = Config.getInt("csip.pubsub.queue.len", Integer.MAX_VALUE);
  static int queueRemainingLen = Config.getInt("csip.pubsub.queue.remaining.len", 25);

  static Logger l = Config.LOG;

  private static QueueManagement mgmt;

  static {
    try {
      mgmt = new QueueManagement();
    } catch (Exception E) {
      l.log(Level.SEVERE, "Init error:", E);

  String delegate;

  boolean isQueued = false;

  boolean isQueued() {
    return isQueued;

  protected void doProcess() throws Exception {
    if (delegate == null || delegate.isEmpty())
      throw new ServiceException("No delegate service provided.");

    // Debugging/status
    if (delegate.equals("status")) {
      results().put("req_in_queue", mgmt.getQueueLen(), "number of requests currently in queue.")
          .put("queue_remaining", mgmt.getRemainingCapacity(), "remaining queue capacity.")
          .put("queue_capacity", queueLen, "total queue capacity.")
          //          .put("queue_min_remaining", queueRemainingLen)
          .put("res_in_publish_queue", mgmt.deliveryQueue.size(), "number of responses in publish queue.")
          .put("incoming", mgmt.incoming.get(), "total number of externally received requests.")
          .put("queued_sub", mgmt.queued_sub.get(), "total number of queued requests.")
          //          .put("queued_rec", mgmt.queued_rec.get())
          .put("queued_back", mgmt.queued_back.get(), "total number of requests put back because of backend capacity at max.")
          .put("exec_sub", mgmt.exec_sub.get(), "total number of requests submitted for backend execution.")
          .put("exec_rec", mgmt.exec_rec.get(), "total number of responses received from backend.")
          //          .put("webhook_sub", mgmt.webhook_sub.get())
          .put("webhook_sub_failed", mgmt.webhook_sub_failed.get(), "total number of responses failed for webhook submission.")
          .put("webhook_rec", mgmt.webhook_rec.get(), "total number of responses submitted with webhook.")
          .put("openQueue", mgmt.queueOpen.get(), "is the queue open or not.");
    if (delegate.equals("reset")) {
      synchronized (this) {
      results().put("ok", true);
    if (delegate.equals("payloads")) {
      int i = 0;
      for (QueueManagement.Payload payload : mgmt.queue) {
        results().put(i++ + ": " + payload.url, payload.request);
      results().put("ok", true);
    if (delegate.equals("toggle")) {
      results().put("queueOpen", mgmt.queueOpen.get());

    if (l.isLoggable(Level.INFO))
      l.log(Level.INFO, delegate);

    if (!mgmt.queueOpen.get())
      throw new ServiceException("Queue closed for submission, try again later.");

    if (mgmt.getRemainingCapacity() < queueRemainingLen)
      throw new ServiceException("Queue capacity reached, try again later.");

    if (needsWebHook && !metainfo().hasName(KEY_WEBHOOK))
      throw new ServiceException("'webhook' metainfo missing.");

    if (delegateUrl == null) {
      String u = request().getURL();
      delegateUrl = u.substring(0, u.indexOf(":"));

    String delegateService = delegateUrl + ":" + delegate;

    JSONObject v = new JSONObject(request().getRequest().toString());

    JSONObject mi = v.getJSONObject(KEY_METAINFO);
    mi.put(KEY_MODE, ASYNC);
    mi.put("csip-auth", request().getAuthToken());

    try {
      if (mgmt.checkTarget) {
        // check if target service is available
        long p = Client.ping(delegateService, mgmt.pingTimeout);
        if (p == -1)
          throw new ServiceException("Target service not available: " + delegateService);
      long pos = mgmt.queue(delegateService, v.toString());
      if (pos == -1)
        throw new ServiceException("Error queueing the service, try again later.");

      metainfo().put(KEY_QUEUE_POS, pos);
      if (l.isLoggable(Level.INFO))
        l.log(Level.INFO, "QUEUE POS, " + pos);

      isQueued = true;
    } catch (Exception E) {
      throw new ServiceException("Error queueing the service", E);

   * QueueManagement
  private static class QueueManagement {

    final String STRING_SER = "org.apache.kafka.common.serialization.StringSerializer";
    final String STRING_DESER = "org.apache.kafka.common.serialization.StringDeserializer";

    String bootstrap_servers = Config.getString("csip.pubsub.kafka.bootstrap_servers");
    long consumerPoll = Config.getLong("csip.pubsub.kafka.consumer.poll.ms", 10000);
    String consumerGroupId = Config.getString("csip.pubsub.kafka.consumer.group.id", "test-consumer-group");
    String resultTopic = Config.getString("csip.pubsub.result.topic", "8086");
    long submitDelay = Config.getLong("csip.pubsub.submit.delay.ms", 1000);
    int defaultCapacity = Config.getInt("csip.pubsub.default.capacity", 8);
    int pingTimeout = Config.getInt("csip.pubsub.ping.timeout.ms", 1000);
    long delayAtCapacilty = Config.getLong("csip.pubsub.atcapacity.delay.ms", 2000);
    boolean checkTarget = Config.getBoolean("csip.pubsub.check.target", false);
    long offerMS = Config.getLong("csip.pubsub.offer.ms", 500);
    long pollMS = Config.getLong("csip.pubsub.poll.ms", 2000);
    long loadcheck = Config.getLong("csip.pubsub.loadcheck.ms", 2000);

    String connect = Config.getString("csip.pubsub.stats", null);

    Consumer<String, String> receiveConsumer = getResultConsumer();

    ExecutorService executor = Executors.newCachedThreadPool();
    ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor();

    class LoadProbe implements Runnable {

      Client cl = new Client(l);
      int MAX_TTL = 10;

      private class Load {

        Integer load;
        AtomicInteger ttl = new AtomicInteger(MAX_TTL);

      // service context > current load
      Map<String, Load> loads = new ConcurrentHashMap<>();
      Map<String, String> sh = new HashMap<>();

      public void run() {
        try {
          if (threadsRunning.get())
        } catch (Exception ex) {
          l.log(Level.SEVERE, null, ex);

      void close() {

      private void update() throws Exception {
        if (l.isLoggable(Level.INFO))
          l.info("Backend update.");

        for (Map.Entry<String, Load> entry : loads.entrySet()) {
          String context = entry.getKey();
          int ttl = entry.getValue().ttl.decrementAndGet();
          if (ttl <= 0) {
            if (l.isLoggable(Level.INFO))
              l.info("removed probe for : " + context);
          } else {
            Integer i = query(context);
            entry.getValue().load = i;
            if (l.isLoggable(Level.INFO))
              l.info("probe: " + context + " -> " + i + ", " + ttl);

      private synchronized Integer query(String s) throws Exception {
        try {
          String result = cl.doGET(s + "/q/running");
          return Integer.valueOf(result);
        } catch (Exception E) {
          l.log(Level.SEVERE, "Error getting the current running services: ", E);
          return Integer.MAX_VALUE;

      int getCurrentLoad(String service) throws Exception {
        String context = getContext(service);
        Load load = loads.get(context);
        if (load == null) {
          load = new Load();
          load.load = query(context);
          loads.put(context, load);
        // reset the use tick
        return load.load;

      String getContext(String service) throws URISyntaxException {
        String context = sh.get(service);
        if (context == null) {
          String[] u = Utils.getURIParts(service);
          sh.put(service, context = u[0] + u[1] + u[2] + "/" + u[3]);
        return context;

    LoadProbe probe = new LoadProbe();

    FutureTask<String> submitTask = new FutureTask<>(new SubmitJobThread());
    FutureTask<String> receiveTask = new FutureTask<>(new ReceiveJobStatusThread());
    FutureTask<String> deliveryTask = new FutureTask<>(new PublishJobThread());

    final AtomicBoolean threadsRunning = new AtomicBoolean(true);

    SimpleCache<String, Integer> capacities = new SimpleCache<>();

    AtomicInteger incoming = new AtomicInteger(0);
    AtomicInteger queued_sub = new AtomicInteger(0);
    AtomicInteger queued_rec = new AtomicInteger(0);
    AtomicInteger queued_back = new AtomicInteger(0);
    AtomicInteger exec_sub = new AtomicInteger(0);
    AtomicInteger exec_rec = new AtomicInteger(0);
    AtomicInteger webhook_sub = new AtomicInteger(0);
    AtomicInteger webhook_sub_failed = new AtomicInteger(0);
    AtomicInteger webhook_rec = new AtomicInteger(0);
    AtomicInteger sn = new AtomicInteger(0);

    BlockingQueue<Payload> queue = new LinkedBlockingQueue<>(queueLen);
    BlockingQueue<String> deliveryQueue = new LinkedBlockingQueue<>();

    final AtomicBoolean queueOpen = new AtomicBoolean(true);

    Stats stats;

    static class Payload {

      String url;
      String request;

      Payload(String url, String request) {
        this.url = url;
        this.request = request;

    public int getQueueLen() {
      return queue.size();

    public int getRemainingCapacity() {
      return queue.remainingCapacity();

    Properties getConsumerProperties() {
      Properties p = new Properties();
      p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
      p.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
      p.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
      return p;

    Consumer<String, String> getResultConsumer() {
      Consumer<String, String> c = new KafkaConsumer<>(getConsumerProperties());
      return c;

    synchronized int queue(String url, String request) throws Exception {
      if (l.isLoggable(Level.INFO))
        l.log(Level.INFO, "Queueing  :{0} {1}", new Object[]{url, request});

      boolean s = queue.offer(new Payload(url, request), offerMS, TimeUnit.MILLISECONDS);
      if (!s)
        return -1;

      return getQueueLen();

     * static capacity, based on property settings.
    int getContextCapacity(String context) {
      return capacities.get(context, c
          -> Config.getInt("csip.pubsub."
              + c.replace('/', '.').replace(':', '.') + ".capacity", defaultCapacity));

     * Submit for execution.
     * This thread pulls the entries from the queue and submits it for
     * execution.
    class SubmitJobThread implements Callable<String> {

      Client cl = new Client(l);
      long delay = submitDelay;

      private void executeAsync(String url, String payload, int capacity, Client c) {
        try {
          JSONObject o = new JSONObject(payload);
          Map<String, String> header = new HashMap<>();
          header.put(KEY_SUUID, o.getJSONObject(KEY_METAINFO).getString(KEY_SUUID));
          o.getJSONObject("metainfo").put("cap", capacity);
          o.getJSONObject("metainfo").put("sn", sn.get());

          JSONObject result = c.doPOST(url, o, header);
          if (l.isLoggable(Level.FINE))
            l.log(Level.FINE, "POST Run to " + url + " ... received: " + result.toString());

        } catch (Exception ex) {
          l.log(Level.SEVERE, null, ex);

      private void submit(Client cl, String serviceUrl, String servicePayload) throws Exception {
        // wait a bit before continue processing
        try {
        } catch (InterruptedException ex) {
          l.log(Level.INFO, "Interrupted");
        if (checkTarget) {
          // Ping the service first.
          long p = Client.ping(serviceUrl, pingTimeout);
          if (p == -1) {
            queue(serviceUrl, servicePayload);
            delay = delayAtCapacilty;
            l.log(Level.INFO, "Cannot ping the service, back in line...");

        // check the current load in the backend.
        int currentLoad = probe.getCurrentLoad(serviceUrl);
        int contextCapacity = getContextCapacity(probe.getContext(serviceUrl));
        l.log(Level.INFO, "Load for {2}: {0}/{1}",
            new Object[]{currentLoad, contextCapacity, serviceUrl});

        // compare the current backend load against the backend capacity
        if (currentLoad >= contextCapacity) {
          // capacity reached, put it back in line
          queue(serviceUrl, servicePayload);
          delay = delayAtCapacilty;
          l.log(Level.WARNING, "back in line...{0}, {1}/{2}",
              new Object[]{serviceUrl, currentLoad, contextCapacity});
        } else {
          // capacity is fine, submit for execution.
          executeAsync(serviceUrl, servicePayload, currentLoad, cl);
          delay = submitDelay;

      public String call() throws Exception {
        try {
          while (threadsRunning.get()) {
            Payload payload = queue.poll(pollMS, TimeUnit.MILLISECONDS);
            if (payload != null) {
              l.log(Level.INFO, "RECEIVED: {0} ", new Object[]{payload.url});
              l.log(Level.FINE, "  Request: {0}", new Object[]{payload.request});
              submit(cl, payload.url, payload.request);
            l.log(Level.INFO, "Submit Alive.");
        } finally {
          l.log(Level.INFO, "Submitter closed.");
        return "Done Submit.";

     * Submit for execution.
     * This thread pulls the entries from the queue and submits it for
     * execution.
    class PublishJobThread implements Callable<String> {

      Client cl = new Client(l);

      private void publish(Client c, String result) throws Exception {

        // call webhook
        JSONObject o = new JSONObject(result);
        if (o.has(KEY_METAINFO)) {
          String url = o.getJSONObject(KEY_METAINFO).optString(KEY_WEBHOOK);
          if (!url.isEmpty()) {
            l.log(Level.INFO, "Webhook Post to " + url);
            String ack = c.doPOST(url, result);
            if (ack != null) {
              l.log(Level.INFO, "Delivered and Acknowledged: " + ack);
            if (mgmt.stats != null) {
              l.log(Level.INFO, "Calling statistics: ");
              String auth = o.getJSONObject(KEY_METAINFO).optString("csip-auth");
              String service = o.getJSONObject(KEY_METAINFO).optString("service_url");
              long cpu_time = o.getJSONObject(KEY_METAINFO).optLong("cpu_time");
              mgmt.stats.inc(l, service, cpu_time, auth);
          } else {
            l.log(Level.WARNING, "No webhook found: '" + url + "'");

        } else {
          l.log(Level.SEVERE, "PublishError for :" + result);

      public String call() throws Exception {
        try {
          while (threadsRunning.get()) {
            String payload = deliveryQueue.poll(pollMS, TimeUnit.MILLISECONDS);
            if (payload != null) {
              l.log(Level.INFO, "RECEIVED FOR PUBLISH: {0} ", new Object[]{payload});
              publish(cl, payload);
            l.log(Level.INFO, "Publish Alive.");
        } finally {
          l.log(Level.INFO, "Publisher closed.");
        return "Done Publish.";

     * ReceiveJobStatusThread. This thread receives the result status messages
     * from the services.
    class ReceiveJobStatusThread implements Callable<String> {

      Duration d = Duration.ofMillis(consumerPoll);
      Client cl = new Client(l);

      private void queryResults(String status, String suid_url, Client c) {
        if (status.equals(FINISHED)
            || status.equals(FAILED)
            || status.equals(CANCELED)
            || status.equals(TIMEDOUT)) {
          String[] v = suid_url.split("\\s+");
          try {
            // call Q service
            String[] u = Utils.getURIParts(v[1]);
            String url1 = u[0] + u[1] + u[2] + "/" + u[3] + "/q/" + v[0];
            l.log(Level.INFO, "Query Results " + url1);
            String result = c.doGET(url1);
            if (l.isLoggable(Level.FINE))
              l.log(Level.FINE, "Received RESULT for:  " + url1 + " " + result);

          } catch (Exception ex) {
            l.log(Level.SEVERE, null, ex);

      public String call() throws Exception {
        try {
          while (threadsRunning.get()) {
            ConsumerRecords<String, String> records = receiveConsumer.poll(d);
            if (records.count() > 0) {
              records.forEach(record -> {
                l.log(Level.INFO, "{0} RECEIVED: {1} {2}",
                    new Object[]{record.offset(), record.key(), record.value()});
                queryResults(record.key(), record.value(), cl);
            l.log(Level.INFO, "Receive Alive.");
        } catch (WakeupException E) {
          if (threadsRunning.get())
            throw E;
        } finally {
          l.log(Level.INFO, "Receiver closed.");
        return "Done Receive.";

    static class Stats {

      MongoClient mongo;
      MongoDatabase db;
      UpdateOptions opt = new UpdateOptions().upsert(true);

      static final Document INC = new Document("$inc", new Document("count", 1l));

      Stats(String uri) {
        MongoClientURI u = new MongoClientURI(uri);
        String dbname = u.getDatabase();
        if (dbname == null)
          dbname = "pubsub";

        mongo = new MongoClient(u);
        db = mongo.getDatabase(dbname);

      void inc(Logger l, String serviceUrl, long time, String collection) {
        if (collection.isEmpty()) {
          l.warning("No auth/collection for  " + serviceUrl);
        MongoCollection<Document> c = db.getCollection(collection);
        UpdateResult result = c.updateOne(Filters.eq("service", serviceUrl), INC);
        if (result.getModifiedCount() == 0) {
          c.insertOne(new Document("service", serviceUrl));
          c.updateOne(Filters.eq("service", serviceUrl), INC);
        c.updateOne(Filters.eq("service", serviceUrl),
            new Document("$inc", new Document("time", time)));

      void close() {

    void shutdown() {
      try {
        l.log(Level.INFO, receiveTask.get());
        l.log(Level.INFO, submitTask.get());
        l.log(Level.INFO, deliveryTask.get());
      } catch (InterruptedException | ExecutionException ex) {
        l.log(Level.SEVERE, null, ex);
      if (connect != null)


    void startup() {
      ses.scheduleWithFixedDelay(probe, 2000, loadcheck, TimeUnit.MILLISECONDS);
      if (connect != null)
        stats = new Stats(connect);

  public static void onContextInit() {
    try {
      l.log(Level.INFO, "Started Pub/Sub Threads.");
    } catch (Exception E) {
      l.log(Level.SEVERE, null, E);

  public static void onContextDestroy() {
    try {
      l.log(Level.INFO, "Stopped Pub/Sub Threads.");
    } catch (Exception E) {
      l.log(Level.SEVERE, null, E);