SDMClient.java [src/csip/sdm] Revision: f4e1cacdc10d5e03e67cbd6cb0a741e826b419ee  Date: Wed Mar 02 14:15:51 MST 2022
/*
 * $Id$
 *
 * This file is part of the Cloud Services Integration Platform (CSIP),
 * a Model-as-a-Service framework, API and application suite.
 *
 * 2012-2017, Olaf David and others, OMSLab, Colorado State University.
 *
 * OMSLab licenses this file to you under the MIT license.
 * See the LICENSE file in the project root for more information.
 */
package csip.sdm;

import csip.sdm.SDMDriver.DBProperties;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.net.ssl.HttpsURLConnection;
import org.apache.commons.io.IOUtils;
import org.apache.http.HeaderElement;
import org.apache.http.HeaderElementIterator;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.AuthSchemes;
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.EntityBuilder;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.auth.BasicScheme;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.pool.PoolStats;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.ssl.SSLContexts;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;

/**
 *
 * @author <a href="mailto:shaun.case@colostate.edu">Shaun Case</a>
 */
public class SDMClient {

  private static int MAX_RETRIES = 10;
  private static int THREAD_SLEEP_ON_RETRY = 500;  // 1/2 second, wait between retries.
  private String url;

  private static final PoolingHttpClientConnectionManager CM;

  static ConnectionKeepAliveStrategy myStrategy = (HttpResponse response, HttpContext context) -> {
    // Honor 'keep-alive' header
    HeaderElementIterator it = new BasicHeaderElementIterator(
        response.headerIterator(HTTP.CONN_KEEP_ALIVE));
    while (it.hasNext()) {
      HeaderElement he = it.nextElement();
      String param = he.getName();
      String value = he.getValue();
      if (value != null && param.equalsIgnoreCase("timeout")) {
        try {
          return Long.parseLong(value) * 1000;
        } catch (NumberFormatException ignore) {
        }
      }
    }
    HttpHost target = (HttpHost) context.getAttribute(
        HttpClientContext.HTTP_TARGET_HOST);

    return 30 * 1000;
  };

  static {

    CM = new PoolingHttpClientConnectionManager(
        RegistryBuilder.<ConnectionSocketFactory>create()
            .register("http", PlainConnectionSocketFactory.getSocketFactory())
            .register("https", new SSLConnectionSocketFactory(SSLContexts.createSystemDefault()))
            .build()
    );

    CM.setMaxTotal(3000);
    // Increase default max connection per route to 75
    CM.setDefaultMaxPerRoute(2500);
    // Increase max connections for localhost:80 to 125
    //HttpHost localhost = new HttpHost("locahost", 80);
    //CM.setMaxPerRoute(new HttpRoute(localhost), 150);   
  }

  /**
   * Pings a HTTP URL. This effectively sends a HEAD request and returns
   * <code>true</code> if the response code is in the 200-399 range.
   *
   * @param url The HTTP URL to be pinged.
   * @param timeout The timeout in millis for both the connection timeout and
   * the response read timeout. Note that the total timeout is effectively two
   * times the given timeout.
   * @return the time to respond if the given HTTP URL has returned response
   * code 200-399 on a HEAD request within the given timeout, otherwise
   * <code>-1</code>.
   */
  public static long ping(String url, int timeout) {
    try {
      HttpsURLConnection connection = (HttpsURLConnection) new URL(url).openConnection();
      connection.setConnectTimeout(timeout);
      connection.setReadTimeout(timeout);
      connection.setRequestMethod("HEAD");
      long start = System.currentTimeMillis();
      int responseCode = connection.getResponseCode();
      long end = System.currentTimeMillis();
      return (200 <= responseCode && responseCode <= 399) ? (end - start) : -1;
    } catch (IOException E) {
      return -1;
    }
  }

  private CredentialsProvider creds;
  private CloseableHttpClient httpClient;

  private Logger log;

  /**
   * Create a Client
   */
  SDMClient(DBProperties props, String url) {
    this.url = url;
    if (!props.isLoggingDefault()) {
      String level = props.getLogging();
      log = Logger.getAnonymousLogger();
      log.setLevel(Level.parse(level));
    }

    int timeout = props.getTimeout();

    RequestConfig reqConfig = init(timeout);
    httpClient = HttpClientBuilder.create()
        .setDefaultRequestConfig(reqConfig)
        .setConnectionManager(CM)
        .setKeepAliveStrategy(myStrategy)
        .build();
  }

  private RequestConfig init(int timeout) {
    RequestConfig reqConfig = RequestConfig.custom()
        .setConnectTimeout(timeout * 1000)
        .setConnectionRequestTimeout(timeout * 1000)
        .setSocketTimeout(timeout * 1000) // 10 minute max per run
        .setCookieSpec(CookieSpecs.BEST_MATCH)
        .setExpectContinueEnabled(true)
        .setStaleConnectionCheckEnabled(true)
        .setTargetPreferredAuthSchemes(Arrays.asList(AuthSchemes.NTLM, AuthSchemes.DIGEST))
        .setProxyPreferredAuthSchemes(Arrays.asList(AuthSchemes.BASIC))
        .build();
    return reqConfig;
  }

  /**
   * HTTP Post with the request JSON.
   *
   * @param uri the endpoint
   * @param req the request JSON
   * @return the response JSON
   * @throws java.sql.SQLException
   */
  JSONObject doPOST(JSONObject req) throws SQLException {
    int retryCount = 0;

    EntityBuilder builder = EntityBuilder.create();
    builder.setText(req.toString());
    builder.setContentType(ContentType.APPLICATION_JSON);

    HttpPost post = new HttpPost(url);
    post.setEntity(builder.build());

    if (log != null && log.isLoggable(Level.INFO)) {
      log.info("POST: " + url);
      log.info("REQUEST JSON: " + req);
    }

    while (retryCount < MAX_RETRIES) {
      try (CloseableHttpResponse httpResponse = httpClient.execute(post)) {
        InputStream responseStream = httpResponse.getEntity().getContent();
        String response = IOUtils.toString(responseStream);
        if (log != null && log.isLoggable(Level.INFO)) {
          log.info("RESPONSE JSON: " + response);
          PoolStats ps = CM.getTotalStats();
          log.info("http-client-connections pool [avail=" + ps.getAvailable() + " leased=" + ps.getLeased() + " pending=" + ps.getPending() + "] ");
        }
        return new JSONObject(response);
      } catch (JSONException ex) {
        throw new SQLException("Invalid SDM return data: " + ex.getMessage(), ex);
      } catch (IOException ex) {
        String exMessage = ex.getMessage();
        if (exMessage.toLowerCase().contains(" closed") || exMessage.toLowerCase().contains("connection refused")) {
          retryCount++;
          log.warning("Retrying: " + retryCount + ", for" + url + ":  " + exMessage);
          try {
            Thread.sleep(THREAD_SLEEP_ON_RETRY);
          } catch (InterruptedException ex1) {
            log.info("Thread sleep in retry of doPost interrupted.");
          }
        } else {
          throw new SQLException("Communication error with SDM host: " + exMessage, ex);
        }
      } finally {
        //post.releaseConnection();    
        post.completed();
      }
    }

    return null;
  }

  private class PreemptiveAuthInterceptor implements HttpRequestInterceptor {

    @Override
    @SuppressWarnings("deprecation")
    public void process(final HttpRequest request, final HttpContext context) throws HttpException, IOException {
      request.addHeader(BasicScheme.authenticate(creds.getCredentials(AuthScope.ANY), "US-ASCII", false));
    }
  }
}