HOWTO
Java producer code
package dave;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class SimpleProducer {
public static void main(String args[])
{
String bootstrapServers="127.0.0.1:9092";
Properties properties=new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> record = new ProducerRecord<String, String>("topic", "Hello");
producer.send(record);
producer.flush();
producer.close();
}
}
CLI start
$ mvn exec:java -Dexec.mainClass="dave.SimpleProducer"
Java consumer code
package dave;
import java.util.Arrays;
import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.StringDeserializer;
public class SimpleConsumer {
public static void main(String args[])
{
String bootstrapServers="127.0.0.1:9092";
String group_id="my_consumer_group";
String topic="topic";
Properties properties=new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group_id);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String,String> consumer= new KafkaConsumer<String,String>(properties);
consumer.subscribe(Arrays.asList(topic));
while(true) {
ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record: records){
System.out.println("Key: "+ record.key() + ", Value:" +record.value());
System.out.println("Partition:" + record.partition()+",Offset:"+record.offset());
}
}
}
}
CLI start
mvn exec:java -Dexec.mainClass="dave.SimpleConsumer"
No comments:
Post a Comment