KafkaPublisher.java [src/csip] Revision:   Date:
/*
 * $Id$
 *
 * This file is part of the Cloud Services Integration Platform (CSIP),
 * a Model-as-a-Service framework, API and application suite.
 *
 * 2012-2022, Olaf David and others, OMSLab, Colorado State University.
 *
 * OMSLab licenses this file to you under the MIT license.
 * See the LICENSE file in the project root for more information.
 */
package csip;

import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * Kafka Publisher.
 *
 * @author od
 */
class KafkaPublisher implements Publisher {

  static final String STRING_SER = "org.apache.kafka.common.serialization.StringSerializer";

  Producer<String, String> producer;
  ProducerCallback callback;
  String topic;

  Logger LOG;


  KafkaPublisher(Logger LOG, String topic, String bootstrap_servers, String acks,
      int retries, int max_block_ms) {

    if (topic == null)
      throw new IllegalArgumentException("Not publishing to kafka because of 'null' topic.");

    if (bootstrap_servers == null)
      throw new IllegalArgumentException("Not publishing to kafka because of 'null' bootstrap_servers.");

    Properties p = new Properties();
    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
    p.put(ProducerConfig.ACKS_CONFIG, acks);
    p.put(ProducerConfig.RETRIES_CONFIG, retries);
    p.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, max_block_ms);
    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, STRING_SER);
    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, STRING_SER);

    this.LOG = LOG;
    this.topic = topic;

    producer = new KafkaProducer<>(p);
    callback = new ProducerCallback();
  }


  @Override
  public void publish(String key, String value) {
    if (LOG.isLoggable(Level.INFO))
      LOG.info("Publishing to topic '" + topic + "': " + key + "->" + value);

    try {
      producer.send(new ProducerRecord<>(topic, key, value), callback).get();
    } catch (Exception ex) {
      LOG.log(Level.SEVERE, "Publish error:", ex);
    }
  }


  @Override
  public void close() throws Exception {
    if (producer != null)
      producer.close();
  }

  private class ProducerCallback implements Callback {

    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
      if (e != null)
        LOG.severe("Error publishing: " + e.getMessage());
    }
  }
}