Monday, October 9, 2023

Kafka producer and consumer

 HOWTO

 

 

Kafka producer

 bin/kafka-console-producer.sh --bootstrap-server=localhost:9092 --topic=test1 
>Hi from dave
[2023-10-09 14:29:27,126] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2023-10-09 14:29:27,231] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 5 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>dave
>dave

Server logs

[2023-10-09 13:52:42,475] INFO [MetadataCache brokerId=0] Updated cache from existing <empty> to latest FinalizedFeaturesAndEpoch(features=Map(), epoch=0). (kafka.server.metadata.ZkMetadataCache)
[2023-10-09 13:52:42,479] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2023-10-09 13:52:42,483] INFO [TxnMarkerSenderThread-0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2023-10-09 13:52:42,483] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2023-10-09 13:52:42,542] INFO [ExpirationReaper-0-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2023-10-09 13:52:42,566] INFO [Controller id=0, targetBrokerId=0] Node 0 disconnected. (org.apache.kafka.clients.NetworkClient)
[2023-10-09 13:52:42,570] WARN [Controller id=0, targetBrokerId=0] Connection to node 0 (dave/192.168.0.115:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2023-10-09 13:52:42,575] INFO [Controller id=0, targetBrokerId=0] Client requested connection close from node 0 (org.apache.kafka.clients.NetworkClient)
[2023-10-09 13:52:42,580] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2023-10-09 13:52:42,603] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Enabling request processing. (kafka.network.SocketServer)
[2023-10-09 13:52:42,608] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.DataPlaneAcceptor)
[2023-10-09 13:52:42,650] INFO Kafka version: 3.5.1 (org.apache.kafka.common.utils.AppInfoParser)
[2023-10-09 13:52:42,650] INFO Kafka commitId: 2c6fb6c54472e90a (org.apache.kafka.common.utils.AppInfoParser)
[2023-10-09 13:52:42,650] INFO Kafka startTimeMs: 1696852362645 (org.apache.kafka.common.utils.AppInfoParser)
[2023-10-09 13:52:42,652] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2023-10-09 13:52:42,756] INFO [zk-broker-0-to-controller-forwarding-channel-manager]: Recorded new controller, from now on will use node dave:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2023-10-09 13:52:42,833] INFO [zk-broker-0-to-controller-alter-partition-channel-manager]: Recorded new controller, from now on will use node dave:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2023-10-09 14:29:27,102] INFO Creating topic test1 with configuration {} and initial partition assignment Map(0 -> ArrayBuffer(0)) (kafka.zk.AdminZkClient)
[2023-10-09 14:29:27,170] INFO [Controller id=0, targetBrokerId=0] Node 0 disconnected. (org.apache.kafka.clients.NetworkClient)
[2023-10-09 14:29:27,200] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(test1-0) (kafka.server.ReplicaFetcherManager)
[2023-10-09 14:29:27,260] INFO [LogLoader partition=test1-0, dir=/app/kafka/logs] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
[2023-10-09 14:29:27,278] INFO Created log for partition test1-0 in /app/kafka/logs/test1-0 with properties {} (kafka.log.LogManager)
[2023-10-09 14:29:27,279] INFO [Partition test1-0 broker=0] No checkpointed highwatermark is found for partition test1-0 (kafka.cluster.Partition)
[2023-10-09 14:29:27,281] INFO [Partition test1-0 broker=0] Log loaded for partition test1-0 with initial high watermark 0 (kafka.cluster.Partition)

Kafka consumer

$ bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic=test1  
aa
bb

Kafka
]$ ls -l /app/kafka/logs/
total 220
-rw-r--r--. 1 dave dave    0 Oct  9 13:52 cleaner-offset-checkpoint
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-0
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-1
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-10
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-11
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-12
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-13
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-14
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-15
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-16
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-17
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-18
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-19
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-2
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-20
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-21
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-22
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-23
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-24
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-25
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-26
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-27
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-28
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-29
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-3
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-30
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-31
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-32
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-33
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-34
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-35
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-36
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-37
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-38
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-39
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-4
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-40
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-41
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-42
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-43
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-44
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-45
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-46
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-47
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-48
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-49
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-5
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-6
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-7
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-8
drwxr-xr-x. 1 dave dave  242 Oct  9 14:35 __consumer_offsets-9
-rw-r--r--. 1 dave dave    4 Oct  9 14:37 log-start-offset-checkpoint
-rw-r--r--. 1 dave dave   89 Oct  9 13:52 meta.properties
-rw-r--r--. 1 dave dave 1205 Oct  9 14:37 recovery-point-offset-checkpoint
-rw-r--r--. 1 dave dave 1205 Oct  9 14:38 replication-offset-checkpoint
drwxr-xr-x. 1 dave dave  242 Oct  9 14:29 test1-0

topic test-1
$ ls -l /app/kafka/logs/test1-0/
total 12
-rw-r--r--. 1 dave dave 10485760 Oct  9 14:29 00000000000000000000.index
-rw-r--r--. 1 dave dave      364 Oct  9 14:35 00000000000000000000.log
-rw-r--r--. 1 dave dave 10485756 Oct  9 14:29 00000000000000000000.timeindex
-rw-r--r--. 1 dave dave        8 Oct  9 14:29 leader-epoch-checkpoint
-rw-r--r--. 1 dave dave       43 Oct  9 14:29 partition.metadata

Consumer group
 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group skupina1

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
skupina1        T3              0          76              76              0               console-consumer-0b468664-6bfe-4185-ac0f-cc27d6dd857a /192.168.0.115  console-consumer

list groups
 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
skupina1
skupina2

Kafka shells

No comments:

Post a Comment