Kafka入门(一)

Posted by Kaka Blog on March 27, 2019

简介

Kafka是最初由Linkedin公司开发,Kafka是一个高吞吐的分布式消息队列,也是一个订阅/发布系统。常见可以用于web/nginx日志、访问日志,消息服务等等。

Kafka集群中每个节点都有一个被称为broker的实例,负责缓存数据。Kafka有两类客户端,Producer(消息生产者的)和Consumer(消息消费者)。Kafka中不同业务系统的消息可通过topic进行区分,每个消息都会被分区,用以分担消息读写负载,每个分区又可以有多个副本来防止数据丢失。消费者在具体消费某个topic消息时,指定起始偏移量。Kafka通过Zero-Copy、Exactly Once等技术语义保证了消息传输的实时、高效、可靠以及容错性。

相关角色

img

Broker

Kafka 集群包含一个或多个服务器,服务器节点称为broker。

broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。

如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。

如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。

Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

类似于数据库的表名.

Partition

topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。

Producer

生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。

Consumer

消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。

Consumer Group

每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

Leader

每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。

Follower

Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。

安装配置

安装前准备:

  • 准备3个节点。
  • 3个节点安装好Zookeeper,见前面安装HBase文章。

1、下载安装包

Kafka官网下载安装包 http://kafka.apache.org/downloads.html, 我们下载第二种(已经被编译过的),版本为:kafka_2.12-2.2.0

2、解压安装包

tar -zxvf kafka_2.12-2.2.0.tgz

3、修改配置文件

配置文件在/kafka_2.12-2.2.0/config/server.properties

broker.id=0 # broker的全局唯一编号,不能重复
zookeeper.connect=hadoop-master:2181,hadoop-slave:2181,hadoop-slave2:2181 # broker需要使用zookeeper保存meta数据

4、复制kafka_2.12-2.2.0到其它两个节点

scp -r kafka_2.12-2.2.0 fwj@hadoop-slave:~/
scp -r kafka_2.12-2.2.0 fwj@hadoop-slave2:~/

到这里基本就完成配置。

启动关闭

后台启动kafka:

nohup bin/kafka-server-start.sh config/server.properties &
  • 启动提示Cannot allocate memory:修改bin/kafka-server-start.sh,改成KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
  • 输入jps可以看到Kafka

关闭:

bin/kafka-server-stop.sh

测试

1、创建Topic

bin/kafka-topics.sh --create --topic topic_1 --zookeeper localhost:2
181 --partitions 1 --replication-factor 3
  • partitions:表示分区数
  • replication-factor:表示副本数

2、查看Topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

3、模拟客户端发送消息

bin/kafka-console-producer.sh --topic topic_1 --broker-list hadoop-ma
ster:9092,hadoop-slave:9092,hadoop-slave2:9092

之后可以输入消息。

4、启动一个消费者

bin/kafka-console-consumer.sh --topic topic_1 --bootstrap-server loc
alhost:2181 --from-beginning

如果你上面的命令是在不同的终端运行,那么你可以在生产者终端输入消息然后在消费者终端看到。

kafka集群管理工具

为了简化开发者和服务工程师维护Kafka集群的工作,yahoo构建了一个叫做Kafka管理器的基于Web工具,叫做 Kafka Manager。这个管理工具可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建Topic。

1、下载kafka-manager

下载地址:https://github.com/yahoo/kafka-manager/releases

2、解压

3、修改配置

编辑配置文件application.conf

kafka-manager.zkhosts="hadoop-master:2181,hadoop-slave:2181,hadoop-slave2:2181"

4、编译

./sbt clean dist
  • sbt 默认下载库文件很慢, 还时不时被打断,我们可以在用户目录下创建 touch ~/.sbt/repositories, 填上阿里云的镜像 # vi ~/.sbt/repositories
[repositories]
#local
public: http://maven.aliyun.com/nexus/content/groups/public/#这个maven
typesafe:http://dl.bintray.com/typesafe/ivy-releases/ , [organization]/[module]/(scala_[scalaVersion]/)(sb
t_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly#这个ivyivy-sbt-plugin:http://dl.bintray.com/sbt/sbt-plugin-releases/, [organization]/[module]/(scala_[scalaVersio
n]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext]#这个ivysonatype-oss-releases

sonatype-oss-snapshots

注意:后面不能有空格。

看到打印这个消息 Getting org.scala-sbt sbt 0.13.9 (this may take some time)... 就慢慢等吧。

命令执行完成后,在 target/universal 目录中会生产一个zip压缩包kafka-manager-1.3.3.7.zip。将压缩包拷贝到要部署的目录下解压。

编译失败的话可以下载已经编译好的kafka-manager,百度网盘地址: https://pan.baidu.com/s/1VIcqou8e8P55s65RCG_bCg

提取码: 8vvj

5、启动

bin/kafka-manager
kafka-manager 默认的端口是9000,可通过 -Dhttp.port,指定端口; -Dconfig.file=conf/application.conf指定配置文件:

nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8080 &

6、查看Web UI

访问:http://192.168.241.140:8080,出现下面界面则启动成功。

img

参考