Kafka

Kafka初始offset的确定

Dec 26, 2016 | | Say something

如果kafka的topic中已经累积了很多消息,这时新组的第一个消费者开始消费该topic中的消息,这时消费者是从topic的头开始处理,还是从topic的的尾开始处理呢,分三种情况: 使用老的消费者接口且自动确定初始offset: auto.offset.reset属性决定初始offset。默认值为largest,表示从topic的的尾开始处理,可选的值还有smallest,表示从topic的头开始处理,设置为其他值时报异常。 使用新的消费者接口且自动确定初始offset: auto.offset.reset属性决定初始offset。默认值为latest,表示从topic的的尾开始处理,可选的值还有earliest,表示从topic的头开始处理,,设置为none且以前没有offset时报异常。设置为其他值时直接报异常 手动指定offset: KafkaConsumer.seek方法可以将offset置为任意的值

kafka SSL配置

Dec 26, 2016 | | Say something

一、证书产生 #CA产生自己的私钥(存于ca-key文件)和证书(存于ca-cert文件),在当前目录(本文由javacoder.cn博主整理,转载请注明出处) openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 #在kafka服务器上执行,如果有多台服务器,请在每台上都执行。-alias选项用例标识jks文件中每个条目的。 #产生key keytool -keystore server.keystore.jks -alias 192.168.14.140 -validity 365 -keyalg RSA -genkey #导出key对应的证书 keytool -keystore server.keystore.jks -alias 192.168.14.140 -certreq -file cert-file #在client端执行,如果不要求客户端认证(服务器没有配置ssl.client.auth=required),那么这步可以省略。 #产生key keytool -keystore client.keystore.jks -alias 192.168.14.1 -validity 365 -keyalg RSA -genkey #导出key对应的证书 keytool -keystore client.keystore.jks -alias 192.168.14.1 -certreq -file cert-file-client #使用CA证书(ca-key,ca-cert)分别对服务器(cert-file)和客户端(cert-file-client)的证书进行签名。产生的证书分别为(cert-signed,cert-signed-client) […more]

kafka限流(quota)设置

Dec 24, 2016 | | Say something

如果kafka客户端是认证的。那么可以使用userId和clientId两种认证方式。如果没有认证只能使用clientId限流。 bin/kafka-configs.sh –zookeeper localhost:2181 –alter –add-config ‘producer_byte_rate=1048576,consumer_byte_rate=1024′ –entity-type clients –entity-name clientA 对clientId=clientA的客户端添加限流设置。producer_byte_rate表示每秒最多能写入到消息量,单位为byte/sec。consumer_byte_rate表示每秒最多能消费的消息了,单位也为byte/sec。 可以使用bin/kafka-producer-perf-test.sh压力测试脚本来验证你的限流配置是否生效。 bin/kafka-producer-perf-test.sh –topic test_perf –num-records 10000 –record-size 100 –throughput 150 –producer-props bootstrap.servers=localhost:9092 client.id=clientA 注意”client.id=clientA”属性值。 当然,你会简单的zookeeper命令行操作,也可以登录zookeeper执行如下的命令 [zk: localhost:2181(CONNECTED) 16] get /config/clients/clientA {“version”:1,”config”:{“producer_byte_rate”:”20480″,”consumer_byte_rate”:”1024″}} 发现kafka将我们的限流设置以json的格式写入了zookeeper中。

kafka日志清理过程

Dec 23, 2016 | | Say something

总所周知,kafka将topic分成不同的partitions,每个partition的日志分成不同的segments,最后以segment为单位将陈旧的日志从文件系统删除,本文由javacoder.cn整理,转载请注明出处。 假设kafka的日志目录为tmp/kafka-logs,对于名为test_perf的topic。假设两个partitions,那么我们可以在tmp/kafka-logs目录下看到目录VST_TOPIC-0,VST_TOPIC-1。也就是说kafka使用目录表示topic 分区。 VST_TOPIC-0目录下下,可以看到后缀名为.log和.index的文件,如下 [root@kafka kafka-logs]# ls test_perf-0/ 00000000000003417135.index.deleted 00000000000003518540.index 00000000000003619945.index 00000000000003417135.log.deleted 00000000000003518540.log 00000000000003619945.log 如果所有待删除的陈旧日志都清理了,那么是看不到后缀名为.deleted的文件的。 kafka默认的清理策略为: log.retention.hours=168 //7d log.retention.check.interval.ms=300000 //5min log.segment.bytes=1073741824 //1G log.cleaner.delete.retention.ms=86400000 // 1d log.cleaner.backoff.ms=15000 //15s 每个segment的大小为1GB,每5分钟检查一次是否有segment已经查过了7d,如果有将其标记为deleted。标记为deleted的segment默认会保留1天,清理线程会每隔15秒检查一次,是否有标记为deleted的segment的保留时间超过一天了,如果有将其从文件系统删除。 大家注意,kafka清理时是不管该segment中的消息是否被消费过,它清理的依据为是否超过了指定的保留时间,仅此而已。 kafka还提供基于日志的大小的清理策略。 log.segment.bytes参数默认没有指定。 你可以同时指定log.segment.bytes和log.retention.hours来混合指定保留规则。一旦日志的大小超过了log.segment.bytes就清除老的segment,一旦某个segment的保留时间超过了规定的值同样将其清除。 log.cleanup.policy属性指定清理策略,默认策略为delete,可选的为compact

Kafka自带的性能测试脚本

Dec 23, 2016 | | Say something

压测写入消息 ./kafka-producer-perf-test.sh –topic test_perf –num-records 1000000 –record-size 1000 –throughput 20000 –producer-props bootstrap.servers=localhost:9092 88126 records sent, 17625.2 records/sec (16.81 MB/sec), 968.5 ms avg latency, 1409.0 max latency. 113096 records sent, 22587.6 records/sec (21.54 MB/sec), 249.2 ms avg latency, 681.0 max latency. 63205 records sent, 12615.8 records/sec (12.03 MB/sec), 1200.1 ms avg latency, 2560.0 max latency. 137255 […more]

kafka broker advertised.listeners属性的设置

Nov 19, 2016 | | Say something

kafka 0.9.x以后的版本,有一个配置属性叫advertised.listeners,在server.properties中,该属性默认是注释掉的,解释如下【注:本文由javacoder.cn整理】 #Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for “listeners” if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://192.168.14.140:9092 “PLAINTEXT”表示协议,可选的值有PLAINTEXT和SSL,hostname可以指定IP地址,也可以用”0.0.0.0″表示对所有的网络接口有效,如果hostname为空表示只对默认的网络接口有效 也就是说如果你没有配置advertised.listeners,就使用listeners的配置通告给消息的生产者和消费者,这个过程是在生产者和消费者获取源数据(metadata)。如果都没配置,那么就使用java.net.InetAddress.getCanonicalHostName()返回的值,对于ipv4,基本就是localhost了 然后你的生产者就会一直卡死,没有反应,如果添加slf4j 桥接 log4j,将日志级别调到debug,发现如下的日志输出 Updated cluster metadata version 2 to Cluster(nodes = [Node(0, 127.0.0.1, 9092)], partitions = []) […more]