kafka SSL配置

12月 26, 2016 |

一、证书产生

#CA产生自己的私钥(存于ca-key文件)和证书(存于ca-cert文件),在当前目录(本文由javacoder.cn博主整理,转载请注明出处)
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
#在kafka服务器上执行,如果有多台服务器,请在每台上都执行。-alias选项用例标识jks文件中每个条目的。
#产生key
keytool -keystore server.keystore.jks -alias 192.168.14.140 -validity 365 -keyalg RSA -genkey
#导出key对应的证书
keytool -keystore server.keystore.jks -alias 192.168.14.140 -certreq -file cert-file

#在client端执行,如果不要求客户端认证(服务器没有配置ssl.client.auth=required),那么这步可以省略。
#产生key
keytool -keystore client.keystore.jks -alias 192.168.14.1 -validity 365 -keyalg RSA -genkey
#导出key对应的证书
keytool -keystore client.keystore.jks -alias 192.168.14.1 -certreq -file cert-file-client

#使用CA证书(ca-key,ca-cert)分别对服务器(cert-file)和客户端(cert-file-client)的证书进行签名。产生的证书分别为(cert-signed,cert-signed-client)
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-client -out cert-signed-client -days 365 -CAcreateserial -passin pass:test1234

#将CA证书(ca-cert),签名后的服务器证书(cert-signed),签名后的客户端证书(cert-signed-client)导入服务器的信任的秘钥库(-keystore server.truststore.jks)
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.truststore.jks -alias 192.168.14.140 -import -file cert-signed
keytool -keystore server.truststore.jks -alias 192.168.14.1 -import -file cert-signed-client

#将CA证书(ca-cert),签名后的服务器证书(cert-signed),签名后的客户端证书(cert-signed-client)导入客户端信任的秘钥库(-keystore server.truststore.jks)
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias 192.168.14.140 -import -file cert-signed
keytool -keystore client.truststore.jks -alias 192.168.14.1 -import -file cert-signed-client

二、修改server.properties

修改KAFKA_HOME/conf/server.properties
#在9093端口接收ssl连接请求
listeners=PLAINTEXT://192.168.14.140:9092,SSL://192.168.14.140:9093

#指定秘钥位置,ssl.keystore.location存自己的私钥,server.truststore.jks存放信任的证书
ssl.keystore.location=/opt/kafka_2.10-0.10.0.0/ssl/server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/opt/kafka_2.10-0.10.0.0/ssl/server.truststore.jks
ssl.truststore.password=test1234
#broker之间也使用ssl通信
security.inter.broker.protocol=SSL
#客户端也需要认证
ssl.client.auth=required

三、客户端代码实例

public static void main(String[] args) throws UnknownHostException {
System.out.println(java.net.InetAddress.getLocalHost().getCanonicalHostName());
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.14.140:9093"); //使用ssl端口
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "D:\\Ted\\TEST\\test-kafka\\client.truststore.jks");
props.put("ssl.truststore.password", "test1234");

props.put("ssl.keystore.location", "D:\\Ted\\TEST\\test-kafka\\client.keystore.jks");
props.put("ssl.keystore.password", "test1234");
props.put("ssl.key.password", "test1234");

Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i = 50; i > 0; i--) //发送50个消息
producer.send(new ProducerRecord<String, String>("VST_F_API_TIME_PRICE", Integer.toString(i), Integer.toString(i)));
producer.close();
}

可以kafka-console-consumer.sh 验证是否成功写入到kafka broker中

Posted in: Kafka

Comments are closed.