Blog

kafka日志清理过程

Dec 23, 2016 | | Say something

总所周知,kafka将topic分成不同的partitions,每个partition的日志分成不同的segments,最后以segment为单位将陈旧的日志从文件系统删除,本文由javacoder.cn整理,转载请注明出处。 假设kafka的日志目录为tmp/kafka-logs,对于名为test_perf的topic。假设两个partitions,那么我们可以在tmp/kafka-logs目录下看到目录VST_TOPIC-0,VST_TOPIC-1。也就是说kafka使用目录表示topic 分区。 VST_TOPIC-0目录下下,可以看到后缀名为.log和.index的文件,如下 [root@kafka kafka-logs]# ls test_perf-0/ 00000000000003417135.index.deleted 00000000000003518540.index 00000000000003619945.index 00000000000003417135.log.deleted 00000000000003518540.log 00000000000003619945.log 如果所有待删除的陈旧日志都清理了,那么是看不到后缀名为.deleted的文件的。 kafka默认的清理策略为: log.retention.hours=168 //7d log.retention.check.interval.ms=300000 //5min log.segment.bytes=1073741824 //1G log.cleaner.delete.retention.ms=86400000 // 1d log.cleaner.backoff.ms=15000 //15s 每个segment的大小为1GB,每5分钟检查一次是否有segment已经查过了7d,如果有将其标记为deleted。标记为deleted的segment默认会保留1天,清理线程会每隔15秒检查一次,是否有标记为deleted的segment的保留时间超过一天了,如果有将其从文件系统删除。 大家注意,kafka清理时是不管该segment中的消息是否被消费过,它清理的依据为是否超过了指定的保留时间,仅此而已。 kafka还提供基于日志的大小的清理策略。 log.segment.bytes参数默认没有指定。 你可以同时指定log.segment.bytes和log.retention.hours来混合指定保留规则。一旦日志的大小超过了log.segment.bytes就清除老的segment,一旦某个segment的保留时间超过了规定的值同样将其清除。 log.cleanup.policy属性指定清理策略,默认策略为delete,可选的为compact

Kafka自带的性能测试脚本

Dec 23, 2016 | | Say something

压测写入消息 ./kafka-producer-perf-test.sh –topic test_perf –num-records 1000000 –record-size 1000 –throughput 20000 –producer-props bootstrap.servers=localhost:9092 88126 records sent, 17625.2 records/sec (16.81 MB/sec), 968.5 ms avg latency, 1409.0 max latency. 113096 records sent, 22587.6 records/sec (21.54 MB/sec), 249.2 ms avg latency, 681.0 max latency. 63205 records sent, 12615.8 records/sec (12.03 MB/sec), 1200.1 ms avg latency, 2560.0 max latency. 137255 […more]

CountDownLatch VS CyclicBarrier

Nov 30, 2016 | | Say something

CountDownLatch和CyclicBarrier都是线程同步辅助工具。 CountDownLatch的经典用法为两种 场景1: 将count初始化为1,当一个线程调用countDown()后其他等待的线程继续执行 场景2: 将count初始化为n,协调线程(coordinate thread)等待其他工作线程(work thread)都执行结束后再执行某些动作。过程为协调线程调用await()等待count变为0,其他的线程调用countDown()表示其执行完毕。 CyclicBarrier原理: 不是等待别的线程调用countDown()来减少count,当count变为0时阻塞在await()的线程继续执行。而是当await()的线程数等于初始化值N时执行CyclicBarrier初始化时提供的回调函数,然后所有这些在await()调用处阻塞的线程继续执行

mysql实现分组排序[转]

Nov 21, 2016 | | Say something

MySQL-to achieve ORACLE-ROW_NUMBER () over (partition by) packet sorting capabilities. Not provide a similar MYSQL ORACLE in OVER () such a wealth of analysis functions. Need to implement such a function in MySQL, we can only use some flexible approach: First of all, we create instance data: drop table if exists heyf_t10; create table heyf_t10 […more]

kafka broker advertised.listeners属性的设置

Nov 19, 2016 | | Say something

kafka 0.9.x以后的版本,有一个配置属性叫advertised.listeners,在server.properties中,该属性默认是注释掉的,解释如下【注:本文由javacoder.cn整理】 #Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for “listeners” if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://192.168.14.140:9092 “PLAINTEXT”表示协议,可选的值有PLAINTEXT和SSL,hostname可以指定IP地址,也可以用”0.0.0.0″表示对所有的网络接口有效,如果hostname为空表示只对默认的网络接口有效 也就是说如果你没有配置advertised.listeners,就使用listeners的配置通告给消息的生产者和消费者,这个过程是在生产者和消费者获取源数据(metadata)。如果都没配置,那么就使用java.net.InetAddress.getCanonicalHostName()返回的值,对于ipv4,基本就是localhost了 然后你的生产者就会一直卡死,没有反应,如果添加slf4j 桥接 log4j,将日志级别调到debug,发现如下的日志输出 Updated cluster metadata version 2 to Cluster(nodes = [Node(0, 127.0.0.1, 9092)], partitions = []) […more]

centos7 haproxy 日志配置

Nov 17, 2016 | | Say something

vi /etc/haproxy/haproxy.cfg 可以看到如下行 log 127.0.0.1 local2 没有指定端口,默认为udp 514 vi /etc/rsyslog.conf #启用在udp 514端口接收日子消息 $ModLoad imudp $UDPServerRun 514 #在rules(规则)节中添加如下信息 local2.* /var/log/haproxy.log #表示将发往facility local2的消息写入haproxy.log文件中,”local2.* “前面的local2表示facility,预定义的。*表示所有等级的消息 #重启 rsyslog systemctl restart rsyslog #重启haproxy systemctl restart rsyslog 查看/var/log/haproxy.log文件应该能看到日志信息

SHELL 历史命令扩展(History Expansion)

Nov 15, 2016 | | Say something

SHELL 历史命令扩展(History Expansion) 在命令行执行如下命令 sed -n “2!p” /etc/passwd -bash: !p”: event not found 提示这个错误是因为在历史列表中没有以p开头的记录,”!”是历史命令扩展的标记,为了让你的脚本能在交互式环境下如你所愿地执行,最好是禁用历史扩展,命令如下 set +H 或者set +o histexpand 启用历史扩展 set -H 或者set -o histexpand 查看当前shell的选项 set -o 或者set |grep SHELLOPTS

Bash 数组

Nov 14, 2016 | | Say something

mycat初体验

Nov 2, 2016 | | Say something

三台虚拟机的IP如下 MySQL1:192.168.14.145 MySQL2:192.168.14.146 MyCat: 192.168.14.147 MySQL的安装过程不在本文的讨论范围,略过本文由javacoder.cn整理,转载请注明出处。 注意:由于我配置MyCat使用native驱动,所以MyCat服务器需要先安装MySQL客户端,由于我使用的centos7,所以直接执行”yum install mysql”安装。 修改MyCat_HOME/conf/server.xml

配置root用户的用户名和密码,以及默认的数据库 修改MyCat_HOME/conf/rule.xml

取模的节点数为2,因为我只有两台测试MySQL服务器 修改MyCat_HOME/conf/schema.xml

配置了一个名为test_db的数据库,声明了t_user表的负载均衡策略为mod-long,对id进行取模操作。由于驱动使用的是native,所以需要事先在MyCat对应的服务器上安装mysql客户端 测试: 启动mysql1,mysql2 server上的MySQL服务,注意selinux和firewalld的配置。 启动MyCat服务, ./startup_nowrap.sh 注意selinux和firewalld的配置,防止客户端连接不上, 我使用MySQLWorkbench连接的,MyCat的连接信息为:192.168.14.147:8066 root:123456 执行如下脚本: use test_db; create table t_user (id int not null primary key,name varchar(100)); insert into t_user(id,name) values(1,’hello mycat’); insert into t_user(id,name) values(2,’javacoder.cn’); select * from t_user where id=2; 操作成功,验证了MyCat的建表,插入,主键查询功能。select […more]

centos7实践LVS+keepalived

Nov 1, 2016 | | Say something

网上有很多关于这个主题的配置,要么是陈旧的,要么是错误,胡乱转载,有时参考这些文章,反而给你的工作带来更大的麻烦。经过实践,博主javacoder.cn将其整理成文,转载请注明出处,希望对你有所帮助。 其实keepalived集成了LVS的功能,所以理论上只要安装keepalived就好了,可选安装ipvsadm,ipvsadm命令能查看你通过keepalived配置的LVS是否正确, 本文使用NAT模式,NAT简介如下 用过vmware的人都知道连接方式之一就是NAT模式,NAT用于内网和外网的互联,当内网请求外网的资源的时候,在路由处,将数据包的源地址改为路由器的地址,并记录真正的源地址;当请求的资源返回到路由处时,将目的地址改为保存的源地址,并将包转给源地址。 实践使用vmware虚拟机测试的,本示例使用nat模式,拓扑图如下: 可以看到Router1和Router2谁成为master时,谁占有内网VIP和外网VIP,real_server的网管指向内网VIP,这点至关重要。 实践步骤: 准备步骤 防火墙的配置不在本文的范围,请关闭selinux和firewall systemctl disable firewalld systemctl stop firewalld setenforce 0 vi /etc/selinux/config 改如下行SELINUX=disabled 安装keepalived,ipvsadm yum install keepalived yum install ipvsadm 配置keepalived 修改/etc/keepalived/keepalived.conf global_defs { @唯一标识,每台服务器不一样 router_id LVS_DEVEL } vrrp_sync_group VRRP1 { #组的作用是当组内的一个vrrp_instance故障时,将整个组的vrrp_instance都按照故障处理 group { VI_1 VI_2 } } #外网的VIP配置 vrrp_instance VI_1 { state MASTER interface eno16777736 virtual_router_id 51 […more]