KafkaPublisher.java [src/csip] Revision: default Date:
/*
* $Id$
*
* This file is part of the Cloud Services Integration Platform (CSIP),
* a Model-as-a-Service framework, API and application suite.
*
* 2012-2022, Olaf David and others, OMSLab, Colorado State University.
*
* OMSLab licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package csip;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* Kafka Publisher.
*
* @author od
*/
class KafkaPublisher implements Publisher {
static final String STRING_SER = "org.apache.kafka.common.serialization.StringSerializer";
Producer<String, String> producer;
ProducerCallback callback;
String topic;
Logger LOG;
KafkaPublisher(Logger LOG, String topic, String bootstrap_servers, String acks,
int retries, int max_block_ms) {
if (topic == null)
throw new IllegalArgumentException("Not publishing to kafka because of 'null' topic.");
if (bootstrap_servers == null)
throw new IllegalArgumentException("Not publishing to kafka because of 'null' bootstrap_servers.");
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
p.put(ProducerConfig.ACKS_CONFIG, acks);
p.put(ProducerConfig.RETRIES_CONFIG, retries);
p.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, max_block_ms);
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, STRING_SER);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, STRING_SER);
this.LOG = LOG;
this.topic = topic;
producer = new KafkaProducer<>(p);
callback = new ProducerCallback();
}
@Override
public void publish(String key, String value) {
if (LOG.isLoggable(Level.INFO))
LOG.info("Publishing to topic '" + topic + "': " + key + "->" + value);
try {
producer.send(new ProducerRecord<>(topic, key, value), callback).get();
} catch (Exception ex) {
LOG.log(Level.SEVERE, "Publish error:", ex);
}
}
@Override
public void close() throws Exception {
if (producer != null)
producer.close();
}
private class ProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null)
LOG.severe("Error publishing: " + e.getMessage());
}
}
}