[TOC]
Kafka简介 消息队列(Message Queue) 消息 Message 网络中的两台计算机或者两个通讯设备之间传递的数据。例如说:文本、音乐、视频等内容。 队列 Queue 一种特殊的线性表(数据元素首尾相接),特殊之处在于只允许在首部删除元素和在尾部追加元素。入队、出队。 消息队列 MQ 消息+队列,保存消息的队列。消息的传输过程中的容器;主要提供生产、消费接口供外部调用做数据的存储和获取。 MQ分类 MQ主要分为两类:点对点(p2p)、发布订阅(Pub/Sub) 共同点: 消息生产者生产消息发送到queue中,然后消息消费者从queue中读取并且消费消息。 不同点: p2p模型包括:消息队列(Queue)、发送者(Sender)、接收者(Receiver) 一个生产者生产的消息只有一个消费者(Consumer)(即一旦被消费,消息就不在消息队列中)。比如说打电话。 Pub/Sub包含:消息队列(Queue)、主题(Topic)、发布者(Publisher)、订阅者(Subscriber) 每个消息可以有多个消费者,彼此互不影响。比如我发布一个微博:关注我的人都能够看到。 那么在大数据领域呢,为了满足日益增长的数据量,也有一款可以满足百万级别消息的生成和消费,分布式、持久稳定的产品——Kafka。 Kafka简介 Kafka是分布式的发布—订阅消息系统。它最初由LinkedIn(领英)公司发布,使用Scala语言编写,与2010年12月份开源,成为Apache的顶级项目。 Kafka是一个高吞吐量的、持久性的、分布式发布订阅消息系统。 它主要用于处理活跃的数据(登录、浏览、点击、分享、喜欢等用户行为产生的数据)。 三大特点: 高吞吐量 可以满足每秒百万级别消息的生产和消费——生产消费。QPS 持久性 有一套完善的消息存储机制,确保数据的高效安全的持久化——中间存储。 分布式 基于分布式的扩展和容错机制;Kafka的数据都会复制到几台服务器上。当某一台故障失效时,生产者和消费者转而使用其它的机器——整体健壮性。 Kafka组件 一个MQ需要哪些部分?生产、消费、消息类别、存储等等。 对于kafka而言,kafka服务就像是一个大的水池。不断的生产、存储、消费着各种类别的消息。那么kafka由何组成呢? > Kafka服务: > Topic:主题,Kafka处理的消息的不同分类。 > Broker:消息代理,Kafka集群中的一个kafka服务节点称为一个broker,主要存储消息数据。存在硬盘中。每个topic都是有分区的。 > Partition:Topic物理上的分组,一个topic在broker中被分为1个或者多个partition,分区在创建topic的时候指定。 > Message:消息,是通信的基本单位,每个消息都属于一个partition > Kafka服务相关 > Producer:消息和数据的生产者,向Kafka的一个topic发布消息。 > Consumer:消息和数据的消费者,定于topic并处理其发布的消息。 > Zookeeper:协调kafka的正常运行。 Broker Broker:配置文件server.properties 1、为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数达到一定阀值或者过了一定的时间间隔时,再flush到磁盘,这样减少了磁盘IO调用的次数。 配置:Log Flush Policy #log.flush.interval.messages=10000 一个分区的消息数阀值 #log.flush.interval.ms=1000 2、kafka的消息保存一定时间(通常为7天)后会被删除。 配置:Log Retention Policy log.retention.hours=168 #log.retention.bytes=1073741824 log.retention.check.interval.ms=300000 Producer Producer:配置文件:producer.properties 1、自定义partition Producer也根据用户设置的算法来根据消息的key来计算输入哪个partition:partitioner.class 2、异步或者同步发送 配置项:producer.type 异步或者同步发送 同步是指:发送方发出数据后,等接收方发回响应以后才发下一个数据的通讯方式。 异步是指:发送方发出数据后,不等接收方发回响应,接着发送下个数据的通讯方式。 3、批量发送可以很有效的提高发送效率。 Kafka producer的异步发送模式允许进行批量发送,先将消息缓存在内存中,然后一次请求批量发送出去。 具体配置queue.buffering.max.ms、queue.buffering.max.messages。 默认值分别为5000和10000 Consumer consumers:配置文件:consumer.properties 1、每个consumer属于一个consumer group,可以指定组id。group.id 2、消费形式: 组内:组内的消费者消费同一份数据;同时只能有一个consumer消费一个Topic中的1个partition;一个consumer可以消费多个partitions中的消息。 所以,对于一个topic,同一个group中推荐不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。 组间:每个消费组消费相同的数据,互不影响。 3、在一个consumer多个线程的情况下,一个线程相当于一个消费者。 例如:partition为3,一个consumer起了3个线程消费,另一个后来的consumer就无法消费。 (这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。 一个Topic可以对应多个Consumer Group。如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。 要实现单播只要所有的Consumer在同一个Group里。用Consumer Group还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic。) topic、partition、message 1、每个partition在存储层面是append log文件。新消息都会被直接追加到log文件的尾部,每条消息在log文件中的位置称为offset(偏移量)。 2、每条Message包含了以下三个属性: 1°、offset 对应类型:long 此消息在一个partition中序号。可以认为offset是partition中Message的id 2°、MessageSize 对应类型:int32 此消息的字节大小。 3°、data 是message的具体内容。 3、越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力。 4、总之:业务区分增加topic、数据量大增加partition。 Kafka安装配置 解压: [uplooking@uplooking01 ~]$ tar -zxvf soft/kafka_2.10-0.10.0.1.tgz -C app/ 重命名:[uplooking@uplooking01 ~]$ mv app/kafka_2.10-0.10.0.1/ app/kafka 添加KAFKA_HOME至环境变量:~/.bash_profile export KAFKA_HOME=/home/uplooking/app/kafka export PATH=$PATH:$KAFKA_HOME/bin source ~/.bash_profile 配置相关参数:$KAFKA_HOME/config/server.properties 主要参数:broker.id、log.dirs、zookeeper.connect broker.id=10 log.dirs=/home/uplooking/data/kafka [kafka数据的存放目录] zookeeper.connect=uplooking01:2181,uplooking02:2181,uplooking03:2181 kafka实例broker监听默认端口9092,配置listeners=PLAINTEXT://:9092 启动: $KAFKA_HOME/bin/kafka-server-start.sh [-daemon] $KAFKA_HOME/config/server.properties -daemon 可选,表示后台启动kafka服务郑重声明:本文版权归原作者所有,转载文章仅为传播更多信息之目的,如作者信息标记有误,请第一时间联系我们修改或删除,多谢。