#CA产生自己的私钥(存于ca-key文件)和证书(存于ca-cert文件),在当前目录(本文由javacoder.cn博主整理,转载请注明出处)
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
#在kafka服务器上执行,如果有多台服务器,请在每台上都执行。-alias选项用例标识jks文件中每个条目的。
#产生key
keytool -keystore server.keystore.jks -alias 192.168.14.140 -validity 365 -keyalg RSA -genkey
#导出key对应的证书
keytool -keystore server.keystore.jks -alias 192.168.14.140 -certreq -file cert-file
#在client端执行,如果不要求客户端认证(服务器没有配置ssl.client.auth=required),那么这步可以省略。
#产生key
keytool -keystore client.keystore.jks -alias 192.168.14.1 -validity 365 -keyalg RSA -genkey
#导出key对应的证书
keytool -keystore client.keystore.jks -alias 192.168.14.1 -certreq -file cert-file-client
#使用CA证书(ca-key,ca-cert)分别对服务器(cert-file)和客户端(cert-file-client)的证书进行签名。产生的证书分别为(cert-signed,cert-signed-client)
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-client -out cert-signed-client -days 365 -CAcreateserial -passin pass:test1234
#将CA证书(ca-cert),签名后的服务器证书(cert-signed),签名后的客户端证书(cert-signed-client)导入服务器的信任的秘钥库(-keystore server.truststore.jks)
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.truststore.jks -alias 192.168.14.140 -import -file cert-signed
keytool -keystore server.truststore.jks -alias 192.168.14.1 -import -file cert-signed-client
#将CA证书(ca-cert),签名后的服务器证书(cert-signed),签名后的客户端证书(cert-signed-client)导入客户端信任的秘钥库(-keystore server.truststore.jks)
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias 192.168.14.140 -import -file cert-signed
keytool -keystore client.truststore.jks -alias 192.168.14.1 -import -file cert-signed-client
修改KAFKA_HOME/conf/server.properties
#在9093端口接收ssl连接请求
listeners=PLAINTEXT://192.168.14.140:9092,SSL://192.168.14.140:9093
#指定秘钥位置,ssl.keystore.location存自己的私钥,server.truststore.jks存放信任的证书
ssl.keystore.location=/opt/kafka_2.10-0.10.0.0/ssl/server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/opt/kafka_2.10-0.10.0.0/ssl/server.truststore.jks
ssl.truststore.password=test1234
#broker之间也使用ssl通信
security.inter.broker.protocol=SSL
#客户端也需要认证
ssl.client.auth=required
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public static void main(String[] args) throws UnknownHostException { System.out.println(java.net.InetAddress.getLocalHost().getCanonicalHostName()); Properties props = new Properties(); props.put("bootstrap.servers", "192.168.14.140:9093"); //使用ssl端口 props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("security.protocol", "SSL"); props.put("ssl.truststore.location", "D:\\Ted\\TEST\\test-kafka\\client.truststore.jks"); props.put("ssl.truststore.password", "test1234"); props.put("ssl.keystore.location", "D:\\Ted\\TEST\\test-kafka\\client.keystore.jks"); props.put("ssl.keystore.password", "test1234"); props.put("ssl.key.password", "test1234"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for(int i = 50; i > 0; i--) //发送50个消息 producer.send(new ProducerRecord<String, String>("VST_F_API_TIME_PRICE", Integer.toString(i), Integer.toString(i))); producer.close(); } |
可以kafka-console-consumer.sh 验证是否成功写入到kafka broker中
Posted in: Kafka
Comments are closed.