Kafka支持多种安全认证机制,包括SASL和SSL。以下是配置安全认证的步骤和示例代码:
producer或consumer的properties对象中,设置security.protocol为sasl_plaintext或sasl_ssl,并设置sasl.mechanism为相应的认证机制(如plain、scram-sha-256、scram-sha-512或oauthbearer)。示例代码(使用SASL PLAINTEXT认证):
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("security.protocol", "sasl_plaintext"); props.put("sasl.mechanism", "plain"); // 添加用户名和密码 props.put("sasl.username", "admin"); props.put("sasl.password", "password"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); 示例JAAS配置文件(kafka_server_jaas.conf):
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret"; }; 示例Zookeeper的zoo.cfg配置:
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/var/lib/zookeeper clientPort=2181 admin.serverPort=8888 maxClientCnxns=0 authProvider.1.class=org.apache.zookeeper.server.auth.SASLAuthenticationProvider authProvider.1.仰慕loginRenew=3600000 authProvider.1.requireClientAuthScheme=sasl producer或consumer的properties对象中,设置security.protocol为ssl,并指定ssl.truststore.location和ssl.truststore.password以使用SSL加密。示例代码(使用SSL):
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("security.protocol", "ssl"); props.put("ssl.truststore.location", "/path/to/truststore/file"); props.put("ssl.truststore.password", "password"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); 通过上述配置,Kafka集群和客户端之间将建立安全的通信通道,有效防止未授权访问和数据泄露。具体的配置步骤和参数可能会根据Kafka版本和具体环境有所不同,建议参考Kafka官方文档以获取最准确的配置指南。