Blog

kafka SSL配置

Dec 26, 2016 | | Say something

一、证书产生 #CA产生自己的私钥(存于ca-key文件)和证书(存于ca-cert文件),在当前目录(本文由javacoder.cn博主整理,转载请注明出处) openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 #在kafka服务器上执行,如果有多台服务器,请在每台上都执行。-alias选项用例标识jks文件中每个条目的。 #产生key keytool -keystore server.keystore.jks -alias 192.168.14.140 -validity 365 -keyalg RSA -genkey #导出key对应的证书 keytool -keystore server.keystore.jks -alias 192.168.14.140 -certreq -file cert-file #在client端执行,如果不要求客户端认证(服务器没有配置ssl.client.auth=required),那么这步可以省略。 #产生key keytool -keystore client.keystore.jks -alias 192.168.14.1 -validity 365 -keyalg RSA -genkey #导出key对应的证书 keytool -keystore client.keystore.jks -alias 192.168.14.1 -certreq -file cert-file-client #使用CA证书(ca-key,ca-cert)分别对服务器(cert-file)和客户端(cert-file-client)的证书进行签名。产生的证书分别为(cert-signed,cert-signed-client) […more]

kafka限流(quota)设置

Dec 24, 2016 | | Say something

如果kafka客户端是认证的。那么可以使用userId和clientId两种认证方式。如果没有认证只能使用clientId限流。 bin/kafka-configs.sh –zookeeper localhost:2181 –alter –add-config ‘producer_byte_rate=1048576,consumer_byte_rate=1024′ –entity-type clients –entity-name clientA 对clientId=clientA的客户端添加限流设置。producer_byte_rate表示每秒最多能写入到消息量,单位为byte/sec。consumer_byte_rate表示每秒最多能消费的消息了,单位也为byte/sec。 可以使用bin/kafka-producer-perf-test.sh压力测试脚本来验证你的限流配置是否生效。 bin/kafka-producer-perf-test.sh –topic test_perf –num-records 10000 –record-size 100 –throughput 150 –producer-props bootstrap.servers=localhost:9092 client.id=clientA 注意”client.id=clientA”属性值。 当然,你会简单的zookeeper命令行操作,也可以登录zookeeper执行如下的命令 [zk: localhost:2181(CONNECTED) 16] get /config/clients/clientA {“version”:1,”config”:{“producer_byte_rate”:”20480″,”consumer_byte_rate”:”1024″}} 发现kafka将我们的限流设置以json的格式写入了zookeeper中。

kafka日志清理过程

Dec 23, 2016 | | Say something

总所周知,kafka将topic分成不同的partitions,每个partition的日志分成不同的segments,最后以segment为单位将陈旧的日志从文件系统删除,本文由javacoder.cn整理,转载请注明出处。 假设kafka的日志目录为tmp/kafka-logs,对于名为test_perf的topic。假设两个partitions,那么我们可以在tmp/kafka-logs目录下看到目录VST_TOPIC-0,VST_TOPIC-1。也就是说kafka使用目录表示topic 分区。 VST_TOPIC-0目录下下,可以看到后缀名为.log和.index的文件,如下 [root@kafka kafka-logs]# ls test_perf-0/ 00000000000003417135.index.deleted 00000000000003518540.index 00000000000003619945.index 00000000000003417135.log.deleted 00000000000003518540.log 00000000000003619945.log 如果所有待删除的陈旧日志都清理了,那么是看不到后缀名为.deleted的文件的。 kafka默认的清理策略为: log.retention.hours=168 //7d log.retention.check.interval.ms=300000 //5min log.segment.bytes=1073741824 //1G log.cleaner.delete.retention.ms=86400000 // 1d log.cleaner.backoff.ms=15000 //15s 每个segment的大小为1GB,每5分钟检查一次是否有segment已经查过了7d,如果有将其标记为deleted。标记为deleted的segment默认会保留1天,清理线程会每隔15秒检查一次,是否有标记为deleted的segment的保留时间超过一天了,如果有将其从文件系统删除。 大家注意,kafka清理时是不管该segment中的消息是否被消费过,它清理的依据为是否超过了指定的保留时间,仅此而已。 kafka还提供基于日志的大小的清理策略。 log.segment.bytes参数默认没有指定。 你可以同时指定log.segment.bytes和log.retention.hours来混合指定保留规则。一旦日志的大小超过了log.segment.bytes就清除老的segment,一旦某个segment的保留时间超过了规定的值同样将其清除。 log.cleanup.policy属性指定清理策略,默认策略为delete,可选的为compact

Kafka自带的性能测试脚本

Dec 23, 2016 | | Say something

压测写入消息 ./kafka-producer-perf-test.sh –topic test_perf –num-records 1000000 –record-size 1000 –throughput 20000 –producer-props bootstrap.servers=localhost:9092 88126 records sent, 17625.2 records/sec (16.81 MB/sec), 968.5 ms avg latency, 1409.0 max latency. 113096 records sent, 22587.6 records/sec (21.54 MB/sec), 249.2 ms avg latency, 681.0 max latency. 63205 records sent, 12615.8 records/sec (12.03 MB/sec), 1200.1 ms avg latency, 2560.0 max latency. 137255 […more]

CountDownLatch VS CyclicBarrier

Nov 30, 2016 | | Say something

CountDownLatch和CyclicBarrier都是线程同步辅助工具。 CountDownLatch的经典用法为两种 场景1: 将count初始化为1,当一个线程调用countDown()后其他等待的线程继续执行 场景2: 将count初始化为n,协调线程(coordinate thread)等待其他工作线程(work thread)都执行结束后再执行某些动作。过程为协调线程调用await()等待count变为0,其他的线程调用countDown()表示其执行完毕。 CyclicBarrier原理: 不是等待别的线程调用countDown()来减少count,当count变为0时阻塞在await()的线程继续执行。而是当await()的线程数等于初始化值N时执行CyclicBarrier初始化时提供的回调函数,然后所有这些在await()调用处阻塞的线程继续执行

mysql实现分组排序[转]

Nov 21, 2016 | | Say something

MySQL-to achieve ORACLE-ROW_NUMBER () over (partition by) packet sorting capabilities. Not provide a similar MYSQL ORACLE in OVER () such a wealth of analysis functions. Need to implement such a function in MySQL, we can only use some flexible approach: First of all, we create instance data: drop table if exists heyf_t10; create table heyf_t10 […more]

kafka broker advertised.listeners属性的设置

Nov 19, 2016 | | Say something

kafka 0.9.x以后的版本,有一个配置属性叫advertised.listeners,在server.properties中,该属性默认是注释掉的,解释如下【注:本文由javacoder.cn整理】 #Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for “listeners” if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://192.168.14.140:9092 “PLAINTEXT”表示协议,可选的值有PLAINTEXT和SSL,hostname可以指定IP地址,也可以用”0.0.0.0″表示对所有的网络接口有效,如果hostname为空表示只对默认的网络接口有效 也就是说如果你没有配置advertised.listeners,就使用listeners的配置通告给消息的生产者和消费者,这个过程是在生产者和消费者获取源数据(metadata)。如果都没配置,那么就使用java.net.InetAddress.getCanonicalHostName()返回的值,对于ipv4,基本就是localhost了 然后你的生产者就会一直卡死,没有反应,如果添加slf4j 桥接 log4j,将日志级别调到debug,发现如下的日志输出 Updated cluster metadata version 2 to Cluster(nodes = [Node(0, 127.0.0.1, 9092)], partitions = []) […more]

centos7 haproxy 日志配置

Nov 17, 2016 | | Say something

vi /etc/haproxy/haproxy.cfg 可以看到如下行 log 127.0.0.1 local2 没有指定端口,默认为udp 514 vi /etc/rsyslog.conf #启用在udp 514端口接收日子消息 $ModLoad imudp $UDPServerRun 514 #在rules(规则)节中添加如下信息 local2.* /var/log/haproxy.log #表示将发往facility local2的消息写入haproxy.log文件中,”local2.* “前面的local2表示facility,预定义的。*表示所有等级的消息 #重启 rsyslog systemctl restart rsyslog #重启haproxy systemctl restart rsyslog 查看/var/log/haproxy.log文件应该能看到日志信息

SHELL 历史命令扩展(History Expansion)

Nov 15, 2016 | | Say something

SHELL 历史命令扩展(History Expansion) 在命令行执行如下命令 sed -n “2!p” /etc/passwd -bash: !p”: event not found 提示这个错误是因为在历史列表中没有以p开头的记录,”!”是历史命令扩展的标记,为了让你的脚本能在交互式环境下如你所愿地执行,最好是禁用历史扩展,命令如下 set +H 或者set +o histexpand 启用历史扩展 set -H 或者set -o histexpand 查看当前shell的选项 set -o 或者set |grep SHELLOPTS

Bash 数组

Nov 14, 2016 | | Say something