/*************
*kafka 0.8.1.1的安装部署
**************/
在hadoopdn2机器上面上传kafka文件,并解压到 /opt/hadoop/kafka下面
修改 /opt/hadoop/kafka/kafka_2.9.2-0.8.1.1/config/server.properties 配置文件
broker.id=0 默认不用修改 log.dirs=/opt/hadoop/kafka/kafka-logs log.flush.interval.messages=10000 默认不用修改 log.flush.interval.ms=1000 默认不用修改 zookeeper.connect=hadoopdn2:2181
启动kafka的broker
bin/kafka-server-start.sh config/server.properties
正常启动如下:
[2014-11-18 10:36:32,196] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2014-11-18 10:36:32,196] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2014-11-18 10:36:32,196] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2014-11-18 10:36:32,196] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2014-11-18 10:36:32,196] INFO Client environment:os.version=2.6.32-220.el6.x86_64 (org.apache.zookeeper.ZooKeeper)
[2014-11-18 10:36:32,196] INFO Client environment:user.name=hadoop (org.apache.zookeeper.ZooKeeper)
[2014-11-18 10:36:32,196] INFO Client environment:user.home=/home/hadoop (org.apache.zookeeper.ZooKeeper)
[2014-11-18 10:36:32,196] INFO Client environment:user.dir=/opt/hadoop/kafka/kafka_2.9.2-0.8.1.1 (org.apache.zookeeper.ZooKeeper)
[2014-11-18 10:36:32,197] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@c2f8b5a (org.apache.zookeeper.ZooKeeper)
[2014-11-18 10:36:32,231] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181 (org.apache.zookeeper.ClientCnxn)
[2014-11-18 10:36:32,238] INFO Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2014-11-18 10:36:32,262] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x349c07dcd7a0002, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2014-11-18 10:36:32,266] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2014-11-18 10:36:32,415] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager)
[2014-11-18 10:36:32,422] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[2014-11-18 10:36:32,502] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2014-11-18 10:36:32,503] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer)
[2014-11-18 10:36:32,634] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2014-11-18 10:36:32,716] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2014-11-18 10:36:32,887] INFO Registered broker 0 at path /brokers/ids/0 with address JobTracker:9092. (kafka.utils.ZkUtils$)
[2014-11-18 10:36:32,941] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2014-11-18 10:36:33,034] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
创建topics
bin/kafka-topics.sh –create –zookeeper hadoopdn2:2181 –replication-factor 1 –partitions 1 –topic test
查看队列列表
bin/kafka-topics.sh –list –zookeeper hadoopdn2:2181
查看队列明细
bin/kafka-topics.sh –describe –zookeeper hadoopdn2:2181 –topic test
Topic[队列]:test PartitionCount[分区数量]:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
第一行是所有Partition的总结。后面的行是每个partition一行。
bin/kafka-topics.sh –help 查看与topics相关的指令
为第二个broker创建server的配置文件
cp server.properties server1.properties
修改server1.properties
broker.id=1
port=9093
log.dirs=/opt/hadoop/kafka/kafka-logs-server1
zookeeper.connect=hadoopdn2:2181
启动kafka的broker
nohup bin/kafka-server-start.sh config/server1.properties &
通过zookeeper的客户端可以查看当前的broker
[zk: hadoopdn2:2181(CONNECTED) 7] ls /
[zookeeper, admin, consumers, config, controller, brokers, controller_epoch]
[zk: hadoopdn2:2181(CONNECTED) 8] ls /brokers
[topics, ids]
[zk: hadoopdn2:2181(CONNECTED) 9] ls /brokers/ids
[1, 0]
bin/kafka-topics.sh –describe test –zookeeper hadoopdn2:2181
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
修改test队列的参数
bin/kafka-topics.sh –zookeeper hadoopdn2:2181 –partitions 3 –topic test –alter
bin/kafka-topics.sh –describe –zookeeper hadoopdn2:2181 –topic test_kafka
Topic:test PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0[在broker0上面] Isr: 0
Topic: test Partition: 1 Leader: 1 Replicas: 1[在broker1上面] Isr: 1
Topic: test Partition: 2 Leader: 0 Replicas: 0[在broker0上面] Isr: 0
partiton: partion id,由于此处只有一个partition,因此partition id 为0 leader:当前负责读写的lead broker id relicas:当前partition的所有replication broker list isr:relicas的子集,只包含出于活动状态的broker
Replicas复制备份机制:
kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定.leader处理所有的read-write请求,follower需要和leader保持同步.Follower和consumer一样,消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower”落后”太多或者失效,leader将会把它从replicas同步列表中删除.当所有的follower都将一条消息保存成功,此消息才被认为是”committed”,那么此时consumer才能消费它.即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可.(不同于其他分布式存储,比如hbase需要”多数派”存活才行)
当leader失效时,需在followers中选取出新的leader,可能此时follower落后于leader,因此需要选择一个”up-to-date”的follower.选择follower时需要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,需要考虑到”负载均衡”.
kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)
格式: /broker/ids/[0...N] –>host:port;其中[0..N]表示broker id,每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),znode的值为此broker的host:port信息.
Broker Topic Registry: 当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息,仍然是一个临时znode.
格式: /broker/topics/[topic]/[0...N] 其中[0..N]表示partition索引号.
Consumer and Consumer group: 每个consumer客户端被创建时,会向zookeeper注册自己的信息;此作用主要是为了”负载均衡”.
一个group中的多个consumer可以交错的消费一个topic的所有partitions;简而言之,保证此topic的所有partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每个consumer上.
Consumer id Registry: 每个consumer都有一个唯一的ID(host:uuid,可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.
格式: /consumers/[group_id]/ids/[consumer_id]
仍然是一个临时的znode,此节点的值为{“topic_name”:#streams…},即表示此consumer目前所消费的topic + partitions列表.
Consumer offset Tracking: 用来跟踪每个consumer目前所消费的partition中最大的offset.
格式: /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]–>offset_value
此znode为持久节点,可以看出offset跟group_id有关,以表明当group中一个消费者失效,其他consumer可以继续消费.
Partition Owner registry: 用来标记partition被哪个consumer消费.临时znode
格式: /consumers/[group_id]/owners/[topic]/[broker_id-partition_id] –>consumer_node_id
当consumer启动时,所触发的操作:
1.Producer端使用zookeeper用来”发现”broker列表,以及和Topic下每个partition leader建立socket连接并发送消息.
2.Broker端使用zookeeper用来注册broker信息,已经监测partition leader存活性.
3.Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息.
在hadoopdn3机器上面上传kafka文件,并解压到 /opt/hadoop/kafka下面
修改 /opt/hadoop/kafka/kafka_2.9.2-0.8.1.1/config 下面的server.properties 配置文件 broker.id=2 必须修改保证每个broker的ID唯一 修改 log.dirs=/opt/hadoop/kafka/kafka-logs log.flush.interval.messages=10000 默认不用修改 log.flush.interval.ms=1000 默认不用修改 zookeeper.connect=hadoopdn2:2181
通过zookeeper的客户端查看
[zk: hadoopdn2:2181(CONNECTED) 10] ls /brokers/ids
[2, 1, 0]
broker的id为2的已经注册到zookeeper上面了
4.删除kafka的队列[注意需要重启kafka集群,测试发现bug]
bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand –topic test_kafka –zookeeper hadoopdn2:2181
5.创建多副本的队列
bin/kafka-topics.sh –create –zookeeper hadoopdn2:2181 –replication-factor 1 –partitions 1 –topic test_kafka
6.查看多版本队列的明细
$ bin/kafka-topics.sh –describe –zookeeper hadoopdn2:2181 –topic test_kafka
$ bin/kafka-topics.sh –describe –zookeeper hadoopdn2:2181 –topic test_kafka
Topic:test_kafka PartitionCount:3 ReplicationFactor:3 Configs: Topic: test_kafka Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: test_kafka Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: test_kafka Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
//—————————————————————————
$ bin/kafka-console-producer.sh –broker-list JobTracker:9092 –topic test
$ bin/kafka-console-consumer.sh –zookeeper hadoopdn2:2181 –topic test –from-beginning