如何构建安全的Kafka集群

如题所述

第1个回答  2016-05-30
Kafka是由LinkedIn设计的一个高吞吐量、分布式、基于发布订阅模式的消息系统,使用Scala编写,它以可水平扩展、可靠性、异步通信和高吞吐率等特性而被广泛使用。目前越来越多的开源分布式处理系统都支持与Kafka集成,其中Spark Streaming作为后端流引擎配合Kafka作为前端消息系统正成为当前流处理系统的主流架构之一。
然而,当下越来越多的安全漏洞、数据泄露等问题的爆发,安全正成为系统选型不得不考虑的问题,Kafka由于其安全机制的匮乏,也导致其在数据敏感行业的部署存在严重的安全隐患。本文将围绕Kafka,先介绍其整体架构和关键概念,再深入分析其架构之中存在的安全问题,最后分享下Transwarp在Kafka安全性上所做的工作及其使用方法。
Kafka架构与安全
首先,我们来了解下有关Kafka的几个基本概念:
Topic:Kafka把接收的消息按种类划分,每个种类都称之为Topic,由唯一的Topic Name标识。
Producer:向Topic发布消息的进程称为Producer。
Consumer:从Topic订阅消息的进程称为Consumer。
Broker:Kafka集群包含一个或多个服务器,这种服务器被称为Broker。
Kafka的整体架构如下图所示,典型的Kafka集群包含一组发布消息的Producer,一组管理Topic的Broker,和一组订阅消息的Consumer。Topic可以有多个分区,每个分区只存储于一个Broker。Producer可以按照一定的策略将消息划分给指定的分区,如简单的轮询各个分区或者按照特定字段的Hash值指定分区。Broker需要通过ZooKeeper记录集群的所有Broker、选举分区的Leader,记录Consumer的消费消息的偏移量,以及在Consumer Group发生变化时进行relalance. Broker接收和发送消息是被动的:由Producer主动发送消息,Consumer主动拉取消息。

然而,分析Kafka框架,我们会发现以下严重的安全问题:
1.网络中的任何一台主机,都可以通过启动Broker进程而加入Kafka集群,能够接收Producer的消息,能够篡改消息并发送给Consumer。
2.网络中的任何一台主机,都可以启动恶意的Producer/Consumer连接到Broker,发送非法消息或拉取隐私消息数据。
3.Broker不支持连接到启用Kerberos认证的ZooKeeper集群,没有对存放在ZooKeeper上的数据设置权限。任意用户都能够直接访问ZooKeeper集群,对这些数据进行修改或删除。
4.Kafka中的Topic不支持设置访问控制列表,任意连接到Kafka集群的Consumer(或Producer)都能对任意Topic读取(或发送)消息。
随着Kafka应用场景越来越广泛,特别是一些数据隐私程度较高的领域(如道路交通的视频监控),上述安全问题的存在犹如一颗定时炸弹,一旦内网被黑客入侵或者内部出现恶意用户,所有的隐私数据(如车辆出行记录)都能够轻易地被窃取,而无需攻破Broker所在的服务器。
Kafka安全设计
基于上述分析,Transwarp从以下两个方面增强Kafka的安全性:
身份认证(Authentication):设计并实现了基于Kerberos和基于IP的两种身份认证机制。前者为强身份认证,相比于后者具有更好的安全性,后者适用于IP地址可信的网络环境,相比于前者部署更为简便。
权限控制(Authorization):设计并实现了Topic级别的权限模型。Topic的权限分为READ(从Topic拉取数据)、WRITE(向Topic中生产数据)、CREATE(创建Topic)和DELETE(删除Topic)。
基于Kerberos的身份机制如下图所示:

Broker启动时,需要使用配置文件中的身份和密钥文件向KDC(Kerberos服务器)认证,认证通过则加入Kafka集群,否则报错退出。
Producer(或Consumer)启动后需要经过如下步骤与Broker建立安全的Socket连接:
1.Producer向KDC认证身份,通过则得到TGT(票证请求票证),否则报错退出
2.Producer使用TGT向KDC请求Kafka服务,KDC验证TGT并向Producer返回SessionKey(会话密钥)和ServiceTicket(服务票证)
3.Producer使用SessionKey和ServiceTicket与Broker建立连接,Broker使用自身的密钥解密ServiceTicket,获得与Producer通信的SessionKey,然后使用SessionKey验证Producer的身份,通过则建立连接,否则拒绝连接。
ZooKeeper需要启用Kerberos认证模式,保证Broker或Consumer与其的连接是安全的。
Topic的访问控制列表(ACL)存储于ZooKeeper中,存储节点的路径为/acl/<topic>/<user>,节点数据为R(ead)、W(rite)、C(reate)、D(elete)权限的集合,如/acl/transaction/jack节点的数据为RW,则表示用户jack能够对transaction这个topic进行读和写。
另外,kafka为特权用户,只有kafka用户能够赋予/取消权限。因此,ACL相关的ZooKeeper节点权限为kafka具有所有权限,其他用户不具有任何权限。
构建安全的Kafka服务
首先,我们为Broker启用Kerberos认证模式,配置文件为/etc/kafka/conf/server.properties,安全相关的参数如下所示:

其中,authentication参数表示认证模式,可选配置项为simple, kerberos和ipaddress,默认为simple。当认证模式为kerberos时,需要额外配置账户属性principal和对应的密钥文件路径keytab.
认证模式为ipaddress时,Producer和Consumer创建时不需要做任何改变。而认证模式为kerberos时,需要预先创建好相应的principal和keytab,并使用API进行登录,样例代码如下所示:
public class SecureProducer extends Thread {
private final kafka.javaapi.producer.Producer<Integer, String> producer;
private final String topic;
private final Properties props = new Properties();
public SecureProducer(String topic) {
AuthenticationManager.setAuthMethod(“kerberos”);
AuthenticationManager.login(“producer1″, “/etc/producer1.keytab”);
props.put(“serializer.class”, “kafka.serializer.StringEncoder”);
props.put(“metadata.broker.list”,
“172.16.1.190:9092,172.16.1.192:9092,172.16.1.193:9092″);
// Use random partitioner. Don’t need the key type. Just set it to Integer.
// The message is of type String.
producer = new kafka.javaapi.producer.Producer<Integer, String>(
new ProducerConfig(props));
this.topic = topic;
}
. . .
Topic权限管理
Topic的权限管理主要是通过AuthorizationManager这个类来完成的,其类结构如下图所示:

其中,resetPermission(user, Permissions, topic) 为重置user对topic的权限。
grant(user, Permissions, topic) 为赋予user对topic权限。
revoke(user, Permissions, topic) 为取消user对topic权限。
isPermitted(user, Permissions, topic) 为检查user对topic是否具有指定权限。
调用grant或revoke进行权限设置完成后,需要commit命令提交修改到ZooKeeper
Kerberos模式下,AuthorizationManager需要先使用AuthenticationManager.login方法登录,与ZooKeeper建立安全的连接,再进行权限设置。示例代码如下所示:
public class AuthzTest {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(“authentication”, “kerberos”);
props.setProperty(“zookeeper.connect”, “172.16.2.116:2181,172.16.2.117:2181,172.16.2.118:2181″);
props.setProperty(“principal”, “kafka/host1@TDH”);
props.setProperty(“keytab”, “/usr/lib/kafka/config/kafka.keytab”);
ZKConfig config = new ZKConfig(props);
AuthenticationManager.setAuthMethod(config.authentication());
AuthenticationManager.login(config.principal(), config.keytab());
AuthorizationManager authzManager = new AuthorizationManager(config);
// reset permission READ and WRITE to ip 172.16.1.87 on topic test
authzManager.resetPermission(“172.16.1.87″,
new Permissions(Permissions.READ, Permissions.WRITE), “test”);
// grant permission WRITE to ip 172.16.1.87 on topic test
authzManager.grant(“172.16.1.87″, new Permissions(Permissions.CREATE), “test”);
// revoke permission READ from ip 172.16.1.87 on topic test
authzManager.revoke(“172.16.1.87″, new Permissions(Permissions.READ), “test”);
// commit the permission settings
authzManager.commit();
authzManager.close();
}
}

ipaddress认证模式下,取消和赋予权限的操作如下所示:
public class AuthzTest {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(“authentication”, “ipaddress”);
props.setProperty(“zookeeper.connect”,
“172.16.1.87:2181,172.16.1.88:2181,172.16.1.89:2181″);
ZKConfig config = new ZKConfig(props);
// new authorization manager
AuthorizationManager authzManager = new AuthorizationManager(config);
// reset permission READ and WRITE to ip 172.16.1.87 on topic test
authzManager.resetPermission(“172.16.1.87″,
new Permissions(Permissions.READ, Permissions.WRITE), “test”);
// grant permission WRITE to ip 172.16.1.87 on topic test
authzManager.grant(“172.16.1.87″, new Permissions(Permissions.CREATE), “test”);
// revoke permission READ from ip 172.16.1.87 on topic test
authzManager.revoke(“172.16.1.87″, new Permissions(Permissions.READ), “test”);
// commit the permission settings
authzManager.commit();
authzManager.close();
}
}
总结与展望
本文通过介绍Kafka现有架构,深入挖掘其中存在的安全问题,并给出Transwarp在Kafka安全上所做的工作及其使用方式。然而,纵观Hadoop & Spark生态系统,安全功能还存在很多问题,各组件的权限系统独立混乱,缺少集中易用的账户管理系统。某些组件的权限管理还很不成熟,如Spark的调度器缺少用户的概念,不能限制具体用户使用资源的多少。Transwarp基于开源版本,在安全方面已有相当多的积累,并持续改进开发,致力于为企业用户提供一个易用、高效、安全和稳定的基础数据平台。
第2个回答  2016-05-30
扩展、可靠性、异步通信和高吞吐率等特性而被广泛使用。目前越来越多的开源分布式处理系统都支持与Kafka集成,其中Spark
Streaming作为后端流引擎配合Kafka作为前端消息系统正成为当前流处理系统的主流架构之一。
  然而,当下越来越多的安全漏
洞、数据泄露等问题的爆发,安全正成为系统选型不得不考虑的问题,Kafka由于其安全机制的匮乏,也导致其在数据敏感行业的部署存在严重的安全隐患。本
文将围绕Kafka,先介绍其整体架构和关键概念,再深入分析其架构之中存在的安全问题,最后分享下Transwarp在Kafka安全性上所做的工作及
其使用方法。
Kafka架构与安全
  首先,我们来了解下有关Kafka的几个基本概念:
Topic:Kafka把接收的消息按种类划分,每个种类都称之为Topic,由唯一的Topic Name标识。
Producer:向Topic发布消息的进程称为Producer。
Consumer:从Topic订阅消息的进程称为Consumer。
Broker:Kafka集群包含一个或多个服务器,这种服务器被称为Broker。
 
 Kafka的整体架构如下图所示,典型的Kafka集群包含一组发布消息的Producer,一组管理Topic的Broker,和一组订阅消息的
Consumer。Topic可以有多个分区,每个分区只存储于一个Broker。Producer可以按照一定的策略将消息划分给指定的分区,如简单的
轮询各个分区或者按照特定字段的Hash值指定分区。Broker需要通过ZooKeeper记录集群的所有Broker、选举分区的Leader,记录
Consumer的消费消息的偏移量,以及在Consumer Group发生变化时进行relalance.
Broker接收和发送消息是被动的:由Producer主动发送消息,Consumer主动拉取消息。

  然而,分析Kafka框架,我们会发现以下严重的安全问题:
网络中的任何一台主机,都可以通过启动Broker进程而加入Kafka集群,能够接收Producer的消息,能够篡改消息并发送给Consumer。
网络中的任何一台主机,都可以启动恶意的Producer/Consumer连接到Broker,发送非法消息或拉取隐私消息数据。
Broker不支持连接到启用Kerberos认证的ZooKeeper集群,没有对存放在ZooKeeper上的数据设置权限。任意用户都能够直接访问ZooKeeper集群,对这些数据进行修改或删除。
Kafka中的Topic不支持设置访问控制列表,任意连接到Kafka集群的Consumer(或Producer)都能对任意Topic读取(或发送)消息。
 
 随着Kafka应用场景越来越广泛,特别是一些数据隐私程度较高的领域(如道路交通的视频监控),上述安全问题的存在犹如一颗定时炸弹,一旦内网被黑客
入侵或者内部出现恶意用户,所有的隐私数据(如车辆出行记录)都能够轻易地被窃取,而无需攻破Broker所在的服务器。
Kafka安全设计
  基于上述分析,Transwarp从以下两个方面增强Kafka的安全性:
  身份认证(Authentication):设计并实现了基于Kerberos和基于IP的两种身份认证机制。前者为强身份认证,相比于后者具有更好的安全性,后者适用于IP地址可信的网络环境,相比于前者部署更为简便。
  权限控制(Authorization):设计并实现了Topic级别的权限模型。Topic的权限分为READ(从Topic拉取数据)、WRITE(向Topic中生产数据)、CREATE(创建Topic)和DELETE(删除Topic)。

  Broker启动时,需要使用配置文件中的身份和密钥文件向KDC(Kerberos服务器)认证,认证通过则加入Kafka集群,否则报错退出。
  Producer(或Consumer)启动后需要经过如下步骤与Broker建立安全的Socket连接:
Producer向KDC认证身份,通过则得到TGT(票证请求票证),否则报错退出
Producer使用TGT向KDC请求Kafka服务,KDC验证TGT并向Producer返回SessionKey(会话密钥)和ServiceTicket(服务票证)
Producer
使用SessionKey和ServiceTicket与Broker建立连接,Broker使用自身的密钥解密ServiceTicket,获得与
Producer通信的SessionKey,然后使用SessionKey验证Producer的身份,通过则建立连接,否则拒绝连接。
  ZooKeeper需要启用Kerberos认证模式,保证Broker或Consumer与其的连接是安全的。
 
 Topic的访问控制列表(ACL)存储于ZooKeeper中,存储节点的路径为/acl/<topic>/<user>,
节点数据为R(ead)、W(rite)、C(reate)、D(elete)权限的集合,如/acl/transaction/jack节点的数据为
RW,则表示用户jack能够对transaction这个topic进行读和写。
  另外,kafka为特权用户,只有kafka用户能够赋予/取消权限。因此,ACL相关的ZooKeeper节点权限为kafka具有所有权限,其他用户不具有任何权限。
构建安全的Kafka服务
  首先,我们为Broker启用Kerberos认证模式,配置文件为/etc/kafka/conf/server.properties,安全相关的参数如下所示:

  其中,authentication参数表示认证模式,可选配置项为simple, kerberos和ipaddress,默认为simple。当认证模式为kerberos时,需要额外配置账户属性principal和对应的密钥文件路径keytab.
  认证模式为ipaddress时,Producer和Consumer创建时不需要做任何改变。而认证模式为kerberos时,需要预先创建好相应的principal和keytab,并使用API进行登录,样例代码如下所示:
public class SecureProducer extends Thread {
private final kafka.javaapi.producer.Producer<Integer, String> producer;
private final String topic;
private final Properties props = new Properties();

public SecureProducer(String topic) {
AuthenticationManager.setAuthMethod("kerberos");
AuthenticationManager.login("producer1", "/etc/producer1.keytab");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list",
"172.16.1.190:9092,172.16.1.192:9092,172.16.1.193:9092");
// Use random partitioner. Don't need the key type. Just set it to Integer.
// The message is of type String.
producer = new kafka.javaapi.producer.Producer<Integer, String>(
new ProducerConfig(props));
this.topic = topic;
}
. . .

Topic权限管理
  Topic的权限管理主要是通过AuthorizationManager这个类来完成的,其类结构如下图所示:

  其中,resetPermission(user, Permissions, topic) 为重置user对topic的权限。
  grant(user, Permissions, topic) 为赋予user对topic权限。
  revoke(user, Permissions, topic) 为取消user对topic权限。
  isPermitted(user, Permissions, topic) 为检查user对topic是否具有指定权限。
  调用grant或revoke进行权限设置完成后,需要commit命令提交修改到ZooKeeper
  Kerberos模式下,AuthorizationManager需要先使用AuthenticationManager.login方法登录,与ZooKeeper建立安全的连接,再进行权限设置。示例代码如下所示:
public class AuthzTest {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("authentication", "kerberos");
props.setProperty("zookeeper.connect", "172.16.2.116:2181,172.16.2.117:2181,172.16.2.118:2181");
props.setProperty("principal", "kafka/host1@TDH");
props.setProperty("keytab", "/usr/lib/kafka/config/kafka.keytab");
ZKConfig config = new ZKConfig(props);
AuthenticationManager.setAuthMethod(config.authentication());
AuthenticationManager.login(config.principal(), config.keytab());
AuthorizationManager authzManager = new AuthorizationManager(config);
// reset permission READ and WRITE to ip 172.16.1.87 on topic test
authzManager.resetPermission("172.16.1.87",
new Permissions(Permissions.READ, Permissions.WRITE), "test");
// grant permission WRITE to ip 172.16.1.87 on topic test
authzManager.grant("172.16.1.87", new Permissions(Permissions.CREATE), "test");
// revoke permission READ from ip 172.16.1.87 on topic test
authzManager.revoke("172.16.1.87", new Permissions(Permissions.READ), "test");
// commit the permission settings
authzManager.commit();
authzManager.close();
}
}
  ipaddress认证模式下,取消和赋予权限的操作如下所示:
public class AuthzTest {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("authentication", "ipaddress");
props.setProperty("zookeeper.connect",
"172.16.1.87:2181,172.16.1.88:2181,172.16.1.89:2181");
ZKConfig config = new ZKConfig(props);
// new authorization manager
AuthorizationManager authzManager = new AuthorizationManager(config);
// reset permission READ and WRITE to ip 172.16.1.87 on topic test
authzManager.resetPermission("172.16.1.87",
new Permissions(Permissions.READ, Permissions.WRITE), "test");
// grant permission WRITE to ip 172.16.1.87 on topic test
authzManager.grant("172.16.1.87", new Permissions(Permissions.CREATE), "test");
// revoke permission READ from ip 172.16.1.87 on topic test
authzManager.revoke("172.16.1.87", new Permissions(Permissions.READ), "test");
// commit the permission settings
authzManager.commit();
authzManager.close();
}
}

Kafka集群搭建及必知必会
首先,搭建Zookeeper。其作为Kafka的分布式协调底层,依赖于JDK环境。通过下载、解压缩、配置、启动Zookeeper,并开放其端口,完成Zookeeper节点部署。其次,安装Kafka。遵循Kafka的部署规范,通过下载、解压、配置等步骤安装。在此过程中,需对配置文件进行必要调整,如更改brokerId及监听的ip地址等,并将Kafka b...

docker环境,搭建kafka集群
首先,构建bin\/jdk:8u221镜像,参考-- docker基础环境搭建,构建bin\/zookeeper:3.5.7镜像,参考-- docker环境,搭建zookeeper集群。kafka集群由zookeeper管理,创建kafka-net docker网络并启动zookeeper集群。这里使用单机zookeeper简化部署。下载kafka_2.12-2.3.1.tgz,构建bin\/kafka:2.3.1镜像,Dockerf...

大数据Kafka(三):Kafka的集群搭建以及shell启动命令脚本编写
首先,搭建Kafka集群的步骤如下:将Kafka安装包上传至虚拟机,解压并进行安装。修改`server.properties`文件以配置Kafka实例。将安装完成的Kafka复制至其他两台服务器上。设置`KAFKA_HOME`环境变量以方便访问Kafka相关目录。启动服务器。通过检查zookeeper中`brokers`节点目录下的三个ids,确认集群构建成功。接...

在Linux搭建Kafka集群
采用kafka_2.13-2.8.0版本搭建集群,所需架构包括四台服务器、四个Zookeeper节点(一个为主节点,两个为从节点,一个为观察节点)以及三个Kafka服务。具体步骤如下:首先,下载并解压安装包。接着,编辑配置文件,通常除了broker.id,其余配置应保持一致。利用scp命令将安装文件复制至其他服务器。随后...

如何估算 Kafka 集群规模和配置?
在规划一个Kafka集群时,需要考虑多个因素来估算规模和配置。首先,假设每个消息大小为1KB,日数据量为1亿条,且每个topic平均有3个副本,数据保留周期为7天。单个机器分区的最大数量限制为10个。对于服务器台数的选择,我们采用经验公式:服务器台数 ≈ 2 * (生产者峰值生产速率 * 副本 \/ 100) + ...

Kafka集群安装
为了构建Apache Kafka集群,需遵循以下步骤进行安装和部署。首先,访问官方下载页面获取Kafka安装包。将安装包上传至服务器并解压。修改解压后的文件名称,便于后续操作。进入\/opt\/module\/kafka目录,对配置文件进行修改。确保broker.id在集群中唯一,避免重复。分发安装包至集群中的各节点。在各节点上修改配置...

03、Kafka 集群安装
首先准备一台虚拟机,配置为centos7系统,然后在一台上进行Kafka集群安装,最后通过克隆方式扩展到多台机器。进行JDK安装:下载并解压JDK至\/root\/software目录,随后配置环境变量,编辑配置文件并添加JDK路径,通过命令使配置生效,最后验证配置路径。进行zookeeper安装:下载并解压至指定路径,修改配置文件,调整...

Kafka集群部署(Docker容器的方式)
文章主要介绍以docker容器的方式部署kafka集群。上述配置文件中的server.x,数字x对应到data\/myid文件中的值。三台机器x的值分别就是1,2,3。参数详细说明请参考 官网文档 。1.--net=host: 容器与主机共享同一Network Namespace,即容器与网络看到的是相同的网络视图(host模式存在一定的风险,对安全要求...

Kafka集群部署
Topic在kafka01上创建后也会同步到集群中另外两个Broker:kafka02、kafka03 这里我们向Broker(id=0)的Topic=test-kafka发送消息 在Kafka02上消费Broker03的消息 在Kafka03上消费Broker02的消息 然后均能收到消息 这是因为这两个消费消息的命令是建立了两个不同的Consumer。如果我们启动Consumer指定Consumer...

kafka问题求助
Broker启动时,需要使用配置文件中的身份和密钥文件向KDC(Kerberos服务器)认证,认证通过则加入Kafka集群,否则报错退出。Producer(或Consumer)启动后需要经过如下步骤与Broker建立安全的Socket连接:1.Producer向KDC认证身份,通过则得到TGT(票证请求票证),否则报错退出2.Producer使用TGT向KDC请求Kafka服务,KDC验证TGT并向Producer...

相似回答