PubSubService.java [src/csip] Revision:   Date:
/*
 * $Id$
 *
 * This file is part of the Cloud Services Integration Platform (CSIP),
 * a Model-as-a-Service framework, API and application suite.
 *
 * 2012-2022, Olaf David and others, OMSLab, Colorado State University.
 *
 * OMSLab licenses this file to you under the MIT license.
 * See the LICENSE file in the project root for more information.
 */
package csip;

import csip.api.server.ServiceException;
import csip.utils.Client;
import static csip.ModelDataService.KEY_METAINFO;
import static csip.ModelDataService.KEY_SUUID;
import csip.utils.SimpleCache;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.PathParam;
import org.codehaus.jettison.json.JSONObject;

public abstract class PubSubService extends ModelDataService {

  static final String KEY_QUEUE_POS = "queue_pos";

  static String delegateUrl = Config.getString("csip.pubsub.delegate.url");
  static boolean needsWebHook = Config.getBoolean("csip.pubsub.webhook.payload", true);
  static int queueLen = Config.getInt("csip.pubsub.queue.len", Integer.MAX_VALUE);
  static int queueRemainingLen = Config.getInt("csip.pubsub.queue.remaining.len", 25);

  static Logger l = Config.LOG;

  private static QueueManagement mgmt;


  static {
    try {
      mgmt = new QueueManagement();
    } catch (Exception E) {
      l.log(Level.SEVERE, "Init error:", E);
    }
  }

  @PathParam("delegate")
  String delegate;

  boolean isQ = false;


  @Override
  boolean isQueued() {
    return isQ;
  }


  @Override
  protected void doProcess() throws Exception {
    if (delegate == null || delegate.isEmpty())
      throw new ServiceException("No delegate service provided.");

    switch (delegate) {
      // Debugging/status
      case "status":
        results().put("req_in_queue", mgmt.getQueueLen(), "number of requests currently in queue.")
            .put("queue_remaining", mgmt.getRemainingCapacity(), "remaining queue capacity.")
            .put("queue_capacity", queueLen, "total queue capacity.")
            .put("incoming", mgmt.incoming.get(), "total number of externally received requests.")
            .put("queued_sub", mgmt.queued_sub.get(), "total number of queued requests.")
            .put("queued_back", mgmt.queued_back.get(), "total number of requests put back because of backend capacity at max.")
            .put("exec_sub", mgmt.exec_sub.get(), "total number of requests submitted for backend execution.")
            .put("exec_rec", mgmt.exec_rec.get(), "total number of responses received from backend.")
            .put("openQueue", mgmt.queueOpen.get(), "is the queue open or not.");
        return;
      case "reset":
        synchronized (mgmt) {
          mgmt.queue.clear();
          mgmt.sn.set(0);
        }
        results().put("ok", true);
        return;
      case "payloads":
        int i = 0;
        for (QueueManagement.Payload payload : mgmt.queue) {
          results().put(i++ + ": " + payload.url, payload.request);
        }
        results().put("ok", true);
        return;
      case "toggle":
        mgmt.queueOpen.set(!mgmt.queueOpen.get());
        results().put("queueOpen", mgmt.queueOpen.get());
        return;
    }

    if (l.isLoggable(Level.INFO))
      l.info(delegate);

    if (!mgmt.queueOpen.get())
      throw new ServiceException("Queue closed for submission, try again later.");

    if (mgmt.getRemainingCapacity() < queueRemainingLen)
      throw new ServiceException("Queue capacity reached, try again later.");

    if (needsWebHook && !metainfo().hasName(KEY_WEBHOOK))
      throw new ServiceException("'webhook' metainfo missing.");

    if (delegateUrl == null) {
      String u = request().getURL();
      delegateUrl = u;
    }

    String delegateService = delegateUrl + "/" + delegate;

    JSONObject v = new JSONObject(request().getRequest().toString());

    JSONObject mi = v.getJSONObject(KEY_METAINFO);
    mi.put(KEY_MODE, ASYNC);
    mi.remove("cloud_node");
    mi.remove("status");
    mi.remove("tstamp");
    mi.remove("request_ip");
    mi.put("csip-auth", request().getAuthToken());

    try {
      if (mgmt.checkTarget) {
        // check if target service is available
        long p = Client.ping(delegateService, mgmt.pingTimeout);
        if (p == -1)
          throw new ServiceException("Target service not available: " + delegateService);
      }
      mgmt.incoming.incrementAndGet();
      long pos = mgmt.doQueue(delegateService, v.toString());
      if (pos == -1)
        throw new ServiceException("Error queueing the service, try again later.");

      metainfo().put(KEY_QUEUE_POS, pos);
      if (l.isLoggable(Level.INFO))
        l.info("QUEUE POS, " + pos);

      isQ = true;
    } catch (Exception E) {
      throw new ServiceException("Error queueing the service", E);
    }
  }

  /**
   * QueueManagement
   */
  private static class QueueManagement {

    long submitDelay = Config.getLong("csip.pubsub.submit.delay.ms", 1000);
    long delayAtCapacilty = Config.getLong("csip.pubsub.atcapacity.delay.ms", 2000);
    int defaultCapacity = Config.getInt("csip.pubsub.default.capacity", 8);
    int pingTimeout = Config.getInt("csip.pubsub.ping.timeout.ms", 1000);
    boolean checkTarget = Config.getBoolean("csip.pubsub.check.target", false);
    long offerMS = Config.getLong("csip.pubsub.offer.ms", 500);
    long pollMS = Config.getLong("csip.pubsub.poll.ms", 2000);
    long loadcheck = Config.getLong("csip.pubsub.loadcheck.ms", 2000);

    ExecutorService submitExec = Executors.newCachedThreadPool();
    ScheduledExecutorService probeExec = Executors.newSingleThreadScheduledExecutor();

    class LoadProbe implements Runnable {

      int MAX_TTL = 10;

      private class Load {

        Integer running;
        AtomicInteger ttl = new AtomicInteger(MAX_TTL);


        @Override
        public String toString() {
          return "Load[" + running + "," + ttl + "]";
        }

      }

      // service context > current load
      Map<String, Load> loads = new ConcurrentHashMap<>();
      Map<String, String> sh = new HashMap<>();


      @Override
      public void run() {
        try {
          if (submissionRunning.get() && !queue.isEmpty())
            update();
        } catch (Exception ex) {
          l.log(Level.SEVERE, null, ex);
        }
      }


      void close() {
      }


      private synchronized void update() throws Exception {
        if (l.isLoggable(Level.INFO))
          l.info("Backend update.");

        // keys to remove
        List<String> context_to_remove = new ArrayList<>();
        for (Map.Entry<String, Load> entry : loads.entrySet()) {
          String context = entry.getKey();
          Load load = entry.getValue();
          int ttl = load.ttl.decrementAndGet();
          // no being used for 10 * 2 seconds (default)
          if (ttl <= 0) {
            context_to_remove.add(context);
          } else {
            Integer r = query(context);
            load.running = r;
            if (r == Integer.MAX_VALUE)
              load.ttl.set(1);

            if (l.isLoggable(Level.INFO))
              l.info("probe: " + context + " -> " + load);
          }
        }
        for (String ctx : context_to_remove) {
          Load load = loads.remove(ctx);
          if (l.isLoggable(Level.INFO))
            l.info("removed probe for : " + ctx + " " + load);
        }

      }


      private Integer query(String s) throws Exception {
        try {
//          Client cl = new Client(l);
//          String result = cl.doGET(s + "/q/running");
//          cl.close();
//          return Integer.valueOf(result);

          return readFrom(s + "/q/running");
        } catch (Exception E) {
          l.log(Level.SEVERE, "Error getting the current running services: ", E);
          return Integer.MAX_VALUE;
        }
      }


      private synchronized int getCurrentLoad(String service) throws Exception {
        String context = getContext(service);
        Load load = loads.get(context);
        if (load == null) {
          load = new Load();
          load.running = query(context);
          loads.put(context, load);
          if (l.isLoggable(Level.INFO))
            l.info("create probe for : " + service + " " + load);
        }
        // reset the use tick
        load.ttl.set(load.running == Integer.MAX_VALUE ? 1 : MAX_TTL);
        if (l.isLoggable(Level.INFO))
          l.info("get probe for : " + service + " " + load);
        return load.running;
      }


      private String getContext(String service) throws URISyntaxException {
        String context = sh.get(service);
        if (context == null) {
          String[] u = Utils.getURIParts(service);
          sh.put(service, context = u[0] + u[1] + u[2] + "/" + u[3]);
        }
        return context;
      }
    }


    static Integer readFrom(String url) {
      Integer running = null;
      for (int i = 0; i < 3; i++) {
        if (running != null)
          return running;
        try {
          URL u = new URL(url);
          HttpURLConnection con = (HttpURLConnection) u.openConnection();
          con.setConnectTimeout(500);
          con.setRequestProperty("accept", "text/plain");
          Scanner sc = new Scanner(con.getInputStream(), "UTF-8");
          String text = sc.next();
          sc.close();
          if (l.isLoggable(Level.INFO))
            l.info("read from : " + url + " " + text);
          running = Integer.valueOf(text);
          if (running < 0)
            running = null;
        } catch (IOException | NumberFormatException E) {
          l.log(Level.SEVERE, "Error " + i + "getting the current running services: ", E);
//          System.out.println("Error getting the current running services: " + E);
//          System.out.println("Attempt " + i);

        }
      }
      return Integer.MAX_VALUE;
    }


    public static void main(String[] args) throws Exception {
      for (int i = 0; i < 50; i++) {
        Thread.sleep(500);
        System.out.println(i + " " + QueueManagement.readFrom("http://csip.engr.colostate.edu:8097/csip-weps/q/running"));
      }
    }

    BlockingQueue<Payload> queue = new LinkedBlockingQueue<>(queueLen);
    LoadProbe probe = new LoadProbe();

    FutureTask<String> submitTask = new FutureTask<>(new SubmitJobThread());
    SimpleCache<String, Integer> capacities = new SimpleCache<>();

    final AtomicBoolean submissionRunning = new AtomicBoolean(true);
    final AtomicBoolean queueOpen = new AtomicBoolean(true);

    AtomicInteger incoming = new AtomicInteger(0);
    AtomicInteger queued_sub = new AtomicInteger(0);
    AtomicInteger queued_rec = new AtomicInteger(0);
    AtomicInteger queued_back = new AtomicInteger(0);
    AtomicInteger exec_sub = new AtomicInteger(0);
    AtomicInteger exec_rec = new AtomicInteger(0);
    AtomicInteger sn = new AtomicInteger(0);

    static class Payload {

      String url;
      String request;


      Payload(String url, String request) {
        this.url = url;
        this.request = request;
      }
    }


    public int getQueueLen() {
      return queue.size();
    }


    public int getRemainingCapacity() {
      return queue.remainingCapacity();
    }


    synchronized int doQueue(String url, String request) throws Exception {
      if (l.isLoggable(Level.INFO))
        l.log(Level.INFO, "Queueing request :{0}", new Object[]{url});

      boolean s = queue.offer(new Payload(url, request), offerMS, TimeUnit.MILLISECONDS);
      if (!s)
        return -1;

      queued_sub.getAndIncrement();
      return getQueueLen();
    }


    /**
     * static capacity, based on property settings.
     */
    int getContextCapacity(String context) {
      return capacities.get(context, c
          -> Config.getInt("csip.pubsub."
              + c.replace('/', '.').replace(':', '.') + ".capacity", defaultCapacity));
    }

    /**
     * Submit for execution.
     *
     * This thread pulls the entries from the queue and submits it for
     * execution.
     *
     */
    class SubmitJobThread implements Callable<String> {

      long delay = submitDelay;


      private void executeAsync(String url, String payload, int capacity) {
        try (Client c0 = new Client(l)) {
          JSONObject o = new JSONObject(payload);
          Map<String, String> header = new HashMap<>();
          header.put(KEY_SUUID, o.getJSONObject(KEY_METAINFO).getString(KEY_SUUID));
          header.put("Connection", "Close");

          o.getJSONObject("metainfo")
              .put("cap", capacity)
              .put("sn", sn.get())
              // to prevent abuse
              .put("csip.archive.enabled", false);

          sn.incrementAndGet();

          JSONObject result = c0.doPOST(url, o, header);
          if (l.isLoggable(Level.FINE))
            l.fine("POST Run to " + url + " ... received: " + result.toString());

          exec_sub.incrementAndGet();
        } catch (Exception ex) {
          l.log(Level.SEVERE, null, ex);
        }
      }


      private void submit(String serviceUrl, String servicePayload) throws Exception {
        // wait a bit before continue processing
        try {
          Thread.sleep(delay);
        } catch (InterruptedException ex) {
          l.info("Interrupted");
        }

        if (checkTarget) {
          // Ping the service first.
          long p = Client.ping(serviceUrl, pingTimeout);
          if (p == -1) {
            doQueue(serviceUrl, servicePayload);
            delay = delayAtCapacilty;
            if (l.isLoggable(Level.INFO))
              l.info("Cannot ping the service, back in line...");
            return;
          }
        }

        // check the current load in the backend.
        int currentLoad = probe.getCurrentLoad(serviceUrl);
        int contextCapacity = getContextCapacity(probe.getContext(serviceUrl));
        if (l.isLoggable(Level.INFO))
          l.log(Level.INFO, "Load for {2}: {0}/{1}",
              new Object[]{currentLoad, contextCapacity, serviceUrl});

        // compare the current backend load against the backend capacity
        if (currentLoad >= contextCapacity) {
          // capacity reached, put it back in line
          doQueue(serviceUrl, servicePayload);
          queued_back.incrementAndGet();
          delay = delayAtCapacilty;
          if (l.isLoggable(Level.INFO))
            l.log(Level.INFO, "back in line...{0}, {1}/{2}",
                new Object[]{serviceUrl, currentLoad, contextCapacity});
        } else {
          // capacity is fine, submit for execution.
          queued_rec.incrementAndGet();
          executeAsync(serviceUrl, servicePayload, currentLoad);
          delay = submitDelay;
        }
      }


      @Override
      public String call() throws Exception {
        try {
          while (submissionRunning.get()) {
            Payload payload = queue.poll(pollMS, TimeUnit.MILLISECONDS);
            if (payload != null) {
              if (l.isLoggable(Level.INFO))
                l.info("RECEIVED: " + payload.url);
              if (l.isLoggable(Level.FINE))
                l.fine(" Request: " + payload.request);
              submit(payload.url, payload.request);
            }
            if (l.isLoggable(Level.INFO))
              l.info("Submit Alive.");
          }
        } finally {
          l.info("Submitter closed.");
        }
        return "Done Submit.";
      }
    }


    void shutdown() {
      submissionRunning.set(false);
      try {
        l.log(Level.INFO, submitTask.get());
      } catch (InterruptedException | ExecutionException ex) {
        l.log(Level.SEVERE, null, ex);
      }
      submitExec.shutdown();
      probeExec.shutdown();
      probe.close();
    }


    void startup() {
      submitExec.submit(submitTask);
      probeExec.scheduleWithFixedDelay(probe, 2000, loadcheck, TimeUnit.MILLISECONDS);
    }
  }


  public static void onContextInit() {
    try {
      mgmt.startup();
      l.log(Level.INFO, "Started Pub/Sub Threads.");
    } catch (Exception E) {
      l.log(Level.SEVERE, null, E);
    }
  }


  public static void onContextDestroy() {
    try {
      mgmt.shutdown();
      l.log(Level.INFO, "Stopped Pub/Sub Threads.");
    } catch (Exception E) {
      l.log(Level.SEVERE, null, E);
    }
  }
}