开源框架

Kafka初始offset的确定

Dec 26, 2016 | | Say something

如果kafka的topic中已经累积了很多消息,这时新组的第一个消费者开始消费该topic中的消息,这时消费者是从topic的头开始处理,还是从topic的的尾开始处理呢,分三种情况: 使用老的消费者接口且自动确定初始offset: auto.offset.reset属性决定初始offset。默认值为largest,表示从topic的的尾开始处理,可选的值还有smallest,表示从topic的头开始处理,设置为其他值时报异常。 使用新的消费者接口且自动确定初始offset: auto.offset.reset属性决定初始offset。默认值为latest,表示从topic的的尾开始处理,可选的值还有earliest,表示从topic的头开始处理,,设置为none且以前没有offset时报异常。设置为其他值时直接报异常 手动指定offset: KafkaConsumer.seek方法可以将offset置为任意的值

kafka SSL配置

Dec 26, 2016 | | Say something

一、证书产生 #CA产生自己的私钥(存于ca-key文件)和证书(存于ca-cert文件),在当前目录(本文由javacoder.cn博主整理,转载请注明出处) openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 #在kafka服务器上执行,如果有多台服务器,请在每台上都执行。-alias选项用例标识jks文件中每个条目的。 #产生key keytool -keystore server.keystore.jks -alias 192.168.14.140 -validity 365 -keyalg RSA -genkey #导出key对应的证书 keytool -keystore server.keystore.jks -alias 192.168.14.140 -certreq -file cert-file #在client端执行,如果不要求客户端认证(服务器没有配置ssl.client.auth=required),那么这步可以省略。 #产生key keytool -keystore client.keystore.jks -alias 192.168.14.1 -validity 365 -keyalg RSA -genkey #导出key对应的证书 keytool -keystore client.keystore.jks -alias 192.168.14.1 -certreq -file cert-file-client #使用CA证书(ca-key,ca-cert)分别对服务器(cert-file)和客户端(cert-file-client)的证书进行签名。产生的证书分别为(cert-signed,cert-signed-client) […more]

kafka限流(quota)设置

Dec 24, 2016 | | Say something

如果kafka客户端是认证的。那么可以使用userId和clientId两种认证方式。如果没有认证只能使用clientId限流。 bin/kafka-configs.sh –zookeeper localhost:2181 –alter –add-config ‘producer_byte_rate=1048576,consumer_byte_rate=1024′ –entity-type clients –entity-name clientA 对clientId=clientA的客户端添加限流设置。producer_byte_rate表示每秒最多能写入到消息量,单位为byte/sec。consumer_byte_rate表示每秒最多能消费的消息了,单位也为byte/sec。 可以使用bin/kafka-producer-perf-test.sh压力测试脚本来验证你的限流配置是否生效。 bin/kafka-producer-perf-test.sh –topic test_perf –num-records 10000 –record-size 100 –throughput 150 –producer-props bootstrap.servers=localhost:9092 client.id=clientA 注意”client.id=clientA”属性值。 当然,你会简单的zookeeper命令行操作,也可以登录zookeeper执行如下的命令 [zk: localhost:2181(CONNECTED) 16] get /config/clients/clientA {“version”:1,”config”:{“producer_byte_rate”:”20480″,”consumer_byte_rate”:”1024″}} 发现kafka将我们的限流设置以json的格式写入了zookeeper中。

kafka日志清理过程

Dec 23, 2016 | | Say something

总所周知,kafka将topic分成不同的partitions,每个partition的日志分成不同的segments,最后以segment为单位将陈旧的日志从文件系统删除,本文由javacoder.cn整理,转载请注明出处。 假设kafka的日志目录为tmp/kafka-logs,对于名为test_perf的topic。假设两个partitions,那么我们可以在tmp/kafka-logs目录下看到目录VST_TOPIC-0,VST_TOPIC-1。也就是说kafka使用目录表示topic 分区。 VST_TOPIC-0目录下下,可以看到后缀名为.log和.index的文件,如下 [root@kafka kafka-logs]# ls test_perf-0/ 00000000000003417135.index.deleted 00000000000003518540.index 00000000000003619945.index 00000000000003417135.log.deleted 00000000000003518540.log 00000000000003619945.log 如果所有待删除的陈旧日志都清理了,那么是看不到后缀名为.deleted的文件的。 kafka默认的清理策略为: log.retention.hours=168 //7d log.retention.check.interval.ms=300000 //5min log.segment.bytes=1073741824 //1G log.cleaner.delete.retention.ms=86400000 // 1d log.cleaner.backoff.ms=15000 //15s 每个segment的大小为1GB,每5分钟检查一次是否有segment已经查过了7d,如果有将其标记为deleted。标记为deleted的segment默认会保留1天,清理线程会每隔15秒检查一次,是否有标记为deleted的segment的保留时间超过一天了,如果有将其从文件系统删除。 大家注意,kafka清理时是不管该segment中的消息是否被消费过,它清理的依据为是否超过了指定的保留时间,仅此而已。 kafka还提供基于日志的大小的清理策略。 log.segment.bytes参数默认没有指定。 你可以同时指定log.segment.bytes和log.retention.hours来混合指定保留规则。一旦日志的大小超过了log.segment.bytes就清除老的segment,一旦某个segment的保留时间超过了规定的值同样将其清除。 log.cleanup.policy属性指定清理策略,默认策略为delete,可选的为compact

Kafka自带的性能测试脚本

Dec 23, 2016 | | Say something

压测写入消息 ./kafka-producer-perf-test.sh –topic test_perf –num-records 1000000 –record-size 1000 –throughput 20000 –producer-props bootstrap.servers=localhost:9092 88126 records sent, 17625.2 records/sec (16.81 MB/sec), 968.5 ms avg latency, 1409.0 max latency. 113096 records sent, 22587.6 records/sec (21.54 MB/sec), 249.2 ms avg latency, 681.0 max latency. 63205 records sent, 12615.8 records/sec (12.03 MB/sec), 1200.1 ms avg latency, 2560.0 max latency. 137255 […more]

kafka broker advertised.listeners属性的设置

Nov 19, 2016 | | Say something

kafka 0.9.x以后的版本,有一个配置属性叫advertised.listeners,在server.properties中,该属性默认是注释掉的,解释如下【注:本文由javacoder.cn整理】 #Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for “listeners” if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://192.168.14.140:9092 “PLAINTEXT”表示协议,可选的值有PLAINTEXT和SSL,hostname可以指定IP地址,也可以用”0.0.0.0″表示对所有的网络接口有效,如果hostname为空表示只对默认的网络接口有效 也就是说如果你没有配置advertised.listeners,就使用listeners的配置通告给消息的生产者和消费者,这个过程是在生产者和消费者获取源数据(metadata)。如果都没配置,那么就使用java.net.InetAddress.getCanonicalHostName()返回的值,对于ipv4,基本就是localhost了 然后你的生产者就会一直卡死,没有反应,如果添加slf4j 桥接 log4j,将日志级别调到debug,发现如下的日志输出 Updated cluster metadata version 2 to Cluster(nodes = [Node(0, 127.0.0.1, 9092)], partitions = []) […more]

Activiti简单demo,基于spring mvc

Dec 30, 2015 | | Say something

工程下载:test-activiti.zip, 解压密码:javacoder.cn 部署步骤: 1、配置applicationContext.xml中的mailServerUsername和mailServerPassword为你的邮箱用户名和密码 2、执行maven编译。生成war, 3、部署 4、用浏览器访问http://localhost:8080/test-activiti 该demo具有的功能 基于内存数据库h2,便于部署 配置了h2数据库的web控制台,路径为http://localhost:8043/ jdbcURL为jdbc:h2:mem:activiti 本demo基于报销场景,实现了文件的文件的上传 本demo用到的工作流知识有user task, mail task, 和exclusive gateway 任务的高亮 上传的文件从tomcat的根目录分离出来 在tomcat的server.xml的host节点下添加如下配置 <Context docBase=”E:/pic/upload” path=”/test-activiti/static/images”/> 如果你不想处理这个,修改相应的图片路径(将上传的文件直接放在”/test-activiti/static/images”目录下) activiti示例,能运行的,基于spring mvc

Activiti Mail Task

Dec 22, 2015 | | Say something

1、在流程定义中添加一个代表邮件任务的ServiceTask <serviceTask id=”mailtask1″ name=”Mail Task” activiti:type=”mail”> <extensionElements> <activiti:field name=”to”> <activiti:string><![CDATA[接受者邮箱]]></activiti:string> </activiti:field> <activiti:field name=”from”> <activiti:string><![CDATA[发送者邮箱]]></activiti:string> </activiti:field> <activiti:field name=”subject”> <activiti:string><![CDATA[主题]]></activiti:string> </activiti:field> <activiti:field name=”cc”> <activiti:string><![CDATA[抄送人]]></activiti:string> </activiti:field> <activiti:field name=”html”> <activiti:expression><![CDATA[ 邮件内容 ]]></activiti:expression> </activiti:field> </extensionElements> </serviceTask> 注意需要将<![CDATA[]]>中的内容换成实际的信息。当然这些信息可以再Activiti的Properties view中可视化配置 2、processEngineConfiguration中添加邮箱服务器相关的信息 <bean id=”processEngineConfiguration” class=”org.activiti.spring.SpringProcessEngineConfiguration”> <…..> <property name=”mailServerHost” value=”smtp.126.com” /> <property name=”mailServerUsername” value=”邮箱用户名” /> <property name=”mailServerPassword” value=”邮箱密码” /> </bean> serviceTask中配置的是发送者是用户显示的,processEngineConfiguration中配置的才是用于认证的。

Activiti Timer Start Event

Dec 22, 2015 | | Say something

确保processEngineConfiguration.jobExecutorActivate=true或者没有配置,因为这个配置项的默认值为true 将开始事件转换为Timer Start Event。本例使用使用timeCycle配置一分钟执行一次,共四次,本例也添加了一个executionListener来捕获该事件 <startEvent id=”timerstartevent1″ name=”Start”> <extensionElements> <activiti:executionListener event=”start” class=”cn.javacoder.test.activiti.MyListener”></activiti:executionListener> </extensionElements> <timerEventDefinition> <timeCycle>R4/PT1M</timeCycle> </timerEventDefinition> </startEvent> 当然我们也可以配置自己的JobExecutor,如下: <!– spring task executor –> <bean id=”taskExecutor” class=”org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor”> <property name=”maxPoolSize” value=”5″/> <property name=”keepAliveSeconds” value=”30″ /> </bean> <bean id=”jobExecutor” class=”org.activiti.spring.SpringJobExecutor”> <constructor-arg ref=”taskExecutor”/> </bean> <bean id=”processEngineConfiguration” class=”org.activiti.spring.SpringProcessEngineConfiguration”> <……> <property name=”jobExecutorActivate” value=”true” /> <property name=”jobExecutor” ref=”jobExecutor” /> </bean> 5.17.0版本引入了AsyncExecutor,和原来的JobExecutor最大的区别是一次查询出多个带执行的job,减少数据库的操作,当这个job是排他的(exclusive),会锁住整个流程实例(process instance),配置可以参考User Guide. […more]

Lucene explain输出详解

Nov 25, 2015 | | Say something

JQuery in Action 1.3116325 = (MATCH) weight(name:jquery in 4) [DefaultSimilarity], result of: 1.3116325 = score(doc=4,freq=1.0 = termFreq=1.0 ), product of: 0.99999994 = queryWeight, product of: 2.0986123 = idf(docFreq=1, maxDocs=6) 0.47650534 = queryNorm 1.3116326 = fieldWeight in 4, product of: 1.0 = tf(freq=1.0), with freq of: 1.0 = termFreq=1.0 2.0986123 = idf(docFreq=1, maxDocs=6) 0.625 = fieldNorm(doc=4) […more]