简介
Kafka是最初由Linkedin公司开发,Kafka是一个高吞吐的分布式消息队列,也是一个订阅/发布系统。常见可以用于web/nginx日志、访问日志,消息服务等等。
Kafka集群中每个节点都有一个被称为broker
的实例,负责缓存数据。Kafka有两类客户端,Producer
(消息生产者的)和Consumer
(消息消费者)。Kafka中不同业务系统的消息可通过topic
进行区分,每个消息都会被分区,用以分担消息读写负载,每个分区又可以有多个副本来防止数据丢失。消费者在具体消费某个topic消息时,指定起始偏移量。Kafka通过Zero-Copy、Exactly Once等技术语义保证了消息传输的实时、高效、可靠以及容错性。
相关角色
Broker
Kafka 集群包含一个或多个服务器,服务器节点称为broker。
broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
类似于数据库的表名.
Partition
topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
Producer
生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
Consumer
消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
Leader
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
Follower
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。
安装配置
安装前准备:
- 准备3个节点。
- 3个节点安装好Zookeeper,见前面安装HBase文章。
1、下载安装包
Kafka官网下载安装包 http://kafka.apache.org/downloads.html,
我们下载第二种(已经被编译过的),版本为:kafka_2.12-2.2.0
2、解压安装包
tar -zxvf kafka_2.12-2.2.0.tgz
3、修改配置文件
配置文件在/kafka_2.12-2.2.0/config/server.properties
broker.id=0 # broker的全局唯一编号,不能重复
zookeeper.connect=hadoop-master:2181,hadoop-slave:2181,hadoop-slave2:2181 # broker需要使用zookeeper保存meta数据
4、复制kafka_2.12-2.2.0
到其它两个节点
scp -r kafka_2.12-2.2.0 fwj@hadoop-slave:~/
scp -r kafka_2.12-2.2.0 fwj@hadoop-slave2:~/
到这里基本就完成配置。
启动关闭
后台启动kafka:
nohup bin/kafka-server-start.sh config/server.properties &
- 启动提示
Cannot allocate memory
:修改bin/kafka-server-start.sh
,改成KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
- 输入
jps
可以看到Kafka
关闭:
bin/kafka-server-stop.sh
测试
1、创建Topic
bin/kafka-topics.sh --create --topic topic_1 --zookeeper localhost:2
181 --partitions 1 --replication-factor 3
- partitions:表示分区数
- replication-factor:表示副本数
2、查看Topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
3、模拟客户端发送消息
bin/kafka-console-producer.sh --topic topic_1 --broker-list hadoop-ma
ster:9092,hadoop-slave:9092,hadoop-slave2:9092
之后可以输入消息。
4、启动一个消费者
bin/kafka-console-consumer.sh --topic topic_1 --bootstrap-server loc
alhost:2181 --from-beginning
如果你上面的命令是在不同的终端运行,那么你可以在生产者终端输入消息然后在消费者终端看到。
kafka集群管理工具
为了简化开发者和服务工程师维护Kafka集群的工作,yahoo构建了一个叫做Kafka管理器的基于Web工具,叫做 Kafka Manager
。这个管理工具可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建Topic。
1、下载kafka-manager
下载地址:https://github.com/yahoo/kafka-manager/releases
2、解压
3、修改配置
编辑配置文件application.conf
kafka-manager.zkhosts="hadoop-master:2181,hadoop-slave:2181,hadoop-slave2:2181"
4、编译
./sbt clean dist
- sbt 默认下载库文件很慢, 还时不时被打断,我们可以在用户目录下创建
touch ~/.sbt/repositories
, 填上阿里云的镜像# vi ~/.sbt/repositories
[repositories]
#local
public: http://maven.aliyun.com/nexus/content/groups/public/#这个maven
typesafe:http://dl.bintray.com/typesafe/ivy-releases/ , [organization]/[module]/(scala_[scalaVersion]/)(sb
t_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly#这个ivyivy-sbt-plugin:http://dl.bintray.com/sbt/sbt-plugin-releases/, [organization]/[module]/(scala_[scalaVersio
n]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext]#这个ivysonatype-oss-releases
sonatype-oss-snapshots
注意:后面不能有空格。
看到打印这个消息 Getting org.scala-sbt sbt 0.13.9 (this may take some time)...
就慢慢等吧。
命令执行完成后,在 target/universal 目录中会生产一个zip压缩包kafka-manager-1.3.3.7.zip。将压缩包拷贝到要部署的目录下解压。
编译失败的话可以下载已经编译好的kafka-manager,百度网盘地址: https://pan.baidu.com/s/1VIcqou8e8P55s65RCG_bCg
提取码: 8vvj
5、启动
bin/kafka-manager
kafka-manager 默认的端口是9000,可通过 -Dhttp.port,指定端口; -Dconfig.file=conf/application.conf指定配置文件:
nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8080 &
6、查看Web UI
访问:http://192.168.241.140:8080
,出现下面界面则启动成功。