Hadoop(2.x)云计算生态系统

部署模式

/*************
*kafka 0.8.1.1的安装部署
**************/

kafka的部署模式为3种模式

  1. 单broker模式
  2. 单机多broker模式 (伪集群)
  3. 多机多broker模式 (真正的集群模式)

第一种模式安装

  1. 在hadoopdn2机器上面上传kafka文件,并解压到 /opt/hadoop/kafka下面

  2. 修改 /opt/hadoop/kafka/kafka_2.9.2-0.8.1.1/config/server.properties 配置文件

    broker.id=0 默认不用修改 log.dirs=/opt/hadoop/kafka/kafka-logs log.flush.interval.messages=10000 默认不用修改 log.flush.interval.ms=1000 默认不用修改 zookeeper.connect=hadoopdn2:2181

  3. 启动kafka的broker

bin/kafka-server-start.sh config/server.properties

正常启动如下:

[2014-11-18 10:36:32,196] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2014-11-18 10:36:32,196] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2014-11-18 10:36:32,196] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2014-11-18 10:36:32,196] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2014-11-18 10:36:32,196] INFO Client environment:os.version=2.6.32-220.el6.x86_64 (org.apache.zookeeper.ZooKeeper)
[2014-11-18 10:36:32,196] INFO Client environment:user.name=hadoop (org.apache.zookeeper.ZooKeeper)
[2014-11-18 10:36:32,196] INFO Client environment:user.home=/home/hadoop (org.apache.zookeeper.ZooKeeper)
[2014-11-18 10:36:32,196] INFO Client environment:user.dir=/opt/hadoop/kafka/kafka_2.9.2-0.8.1.1 (org.apache.zookeeper.ZooKeeper)
[2014-11-18 10:36:32,197] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@c2f8b5a (org.apache.zookeeper.ZooKeeper)
[2014-11-18 10:36:32,231] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181 (org.apache.zookeeper.ClientCnxn)
[2014-11-18 10:36:32,238] INFO Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2014-11-18 10:36:32,262] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x349c07dcd7a0002, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2014-11-18 10:36:32,266] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2014-11-18 10:36:32,415] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager)
[2014-11-18 10:36:32,422] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[2014-11-18 10:36:32,502] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2014-11-18 10:36:32,503] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer)
[2014-11-18 10:36:32,634] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2014-11-18 10:36:32,716] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2014-11-18 10:36:32,887] INFO Registered broker 0 at path /brokers/ids/0 with address JobTracker:9092. (kafka.utils.ZkUtils$)
[2014-11-18 10:36:32,941] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2014-11-18 10:36:33,034] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
  1. 创建topics

    bin/kafka-topics.sh –create –zookeeper hadoopdn2:2181 –replication-factor 1 –partitions 1 –topic test

  2. 查看队列列表

    bin/kafka-topics.sh –list –zookeeper hadoopdn2:2181

  3. 查看队列明细

    bin/kafka-topics.sh –describe –zookeeper hadoopdn2:2181 –topic test

Topic[队列]:test    PartitionCount[分区数量]:1    ReplicationFactor:1    Configs:
Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0

第一行是所有Partition的总结。后面的行是每个partition一行。

  1. 查看帮助文档

    bin/kafka-topics.sh –help 查看与topics相关的指令

第二种模式部署:

  1. 为第二个broker创建server的配置文件

    cp server.properties server1.properties

  2. 修改server1.properties

    broker.id=1
    port=9093
    log.dirs=/opt/hadoop/kafka/kafka-logs-server1
    zookeeper.connect=hadoopdn2:2181
    
  3. 启动kafka的broker

    nohup bin/kafka-server-start.sh config/server1.properties &

  4. 通过zookeeper的客户端可以查看当前的broker

[zk: hadoopdn2:2181(CONNECTED) 7] ls /
[zookeeper, admin, consumers, config, controller, brokers, controller_epoch]
[zk: hadoopdn2:2181(CONNECTED) 8] ls /brokers
[topics, ids]
[zk: hadoopdn2:2181(CONNECTED) 9] ls /brokers/ids
[1, 0]
  • 查看队列情况

bin/kafka-topics.sh –describe test –zookeeper hadoopdn2:2181

Topic:test    PartitionCount:1    ReplicationFactor:1    Configs:
Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
  • 修改test队列的参数

    bin/kafka-topics.sh –zookeeper hadoopdn2:2181 –partitions 3 –topic test –alter

    bin/kafka-topics.sh –describe –zookeeper hadoopdn2:2181 –topic test_kafka

    Topic:test    PartitionCount:3    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 0    Replicas: 0[在broker0上面]    Isr: 0
    Topic: test    Partition: 1    Leader: 1    Replicas: 1[在broker1上面]    Isr: 1
    Topic: test    Partition: 2    Leader: 0    Replicas: 0[在broker0上面]    Isr: 0
    

    partiton: partion id,由于此处只有一个partition,因此partition id 为0 leader:当前负责读写的lead broker id relicas:当前partition的所有replication broker list isr:relicas的子集,只包含出于活动状态的broker

    Replicas复制备份机制:

    kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定.leader处理所有的read-write请求,follower需要和leader保持同步.Follower和consumer一样,消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower”落后”太多或者失效,leader将会把它从replicas同步列表中删除.当所有的follower都将一条消息保存成功,此消息才被认为是”committed”,那么此时consumer才能消费它.即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可.(不同于其他分布式存储,比如hbase需要”多数派”存活才行)

    当leader失效时,需在followers中选取出新的leader,可能此时follower落后于leader,因此需要选择一个”up-to-date”的follower.选择follower时需要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,需要考虑到”负载均衡”.

kafka分配机制:

kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)

  1. Broker node registry: 当一个kafka broker启动后,首先会向zookeeper注册自己的节点信息(临时znode),同时当broker和zookeeper断开连接时,此znode也会被删除.

    格式: /broker/ids/[0...N] –>host:port;其中[0..N]表示broker id,每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),znode的值为此broker的host:port信息.

  2. Broker Topic Registry: 当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息,仍然是一个临时znode.

    格式: /broker/topics/[topic]/[0...N] 其中[0..N]表示partition索引号.

  3. Consumer and Consumer group: 每个consumer客户端被创建时,会向zookeeper注册自己的信息;此作用主要是为了”负载均衡”.

    一个group中的多个consumer可以交错的消费一个topic的所有partitions;简而言之,保证此topic的所有partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每个consumer上.

  4. Consumer id Registry: 每个consumer都有一个唯一的ID(host:uuid,可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.

    格式: /consumers/[group_id]/ids/[consumer_id]

    仍然是一个临时的znode,此节点的值为{“topic_name”:#streams…},即表示此consumer目前所消费的topic + partitions列表.

  5. Consumer offset Tracking: 用来跟踪每个consumer目前所消费的partition中最大的offset.

    格式: /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]–>offset_value

    此znode为持久节点,可以看出offset跟group_id有关,以表明当group中一个消费者失效,其他consumer可以继续消费.

  6. Partition Owner registry: 用来标记partition被哪个consumer消费.临时znode

    格式: /consumers/[group_id]/owners/[topic]/[broker_id-partition_id] –>consumer_node_id

    当consumer启动时,所触发的操作:

    1. 首先进行”Consumer id Registry”;
    2. 然后在”Consumer id Registry”节点下注册一个watch用来监听当前group中其他consumer的”leave”和”join”;只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管 partitions).
    3. 在”Broker id registry”节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.
       1.Producer端使用zookeeper用来”发现”broker列表,以及和Topic下每个partition leader建立socket连接并发送消息.
       2.Broker端使用zookeeper用来注册broker信息,已经监测partition leader存活性.
       3.Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息.
      

第三种部署方式:

  1. 在hadoopdn3机器上面上传kafka文件,并解压到 /opt/hadoop/kafka下面

  2. 修改 /opt/hadoop/kafka/kafka_2.9.2-0.8.1.1/config 下面的server.properties 配置文件 broker.id=2 必须修改保证每个broker的ID唯一 修改 log.dirs=/opt/hadoop/kafka/kafka-logs log.flush.interval.messages=10000 默认不用修改 log.flush.interval.ms=1000 默认不用修改 zookeeper.connect=hadoopdn2:2181

  3. 通过zookeeper的客户端查看

[zk: hadoopdn2:2181(CONNECTED) 10] ls /brokers/ids

[2, 1, 0]

broker的id为2的已经注册到zookeeper上面了

4.删除kafka的队列[注意需要重启kafka集群,测试发现bug]

bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand –topic test_kafka –zookeeper hadoopdn2:2181

5.创建多副本的队列

bin/kafka-topics.sh –create –zookeeper hadoopdn2:2181 –replication-factor 1 –partitions 1 –topic test_kafka

6.查看多版本队列的明细

$ bin/kafka-topics.sh –describe –zookeeper hadoopdn2:2181 –topic test_kafka

$ bin/kafka-topics.sh –describe –zookeeper hadoopdn2:2181 –topic test_kafka

Topic:test_kafka    PartitionCount:3    ReplicationFactor:3    Configs:
Topic: test_kafka    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
Topic: test_kafka    Partition: 1    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
Topic: test_kafka    Partition: 2    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2

//—————————————————————————

kafka的集群测试

1)发送消息

$ bin/kafka-console-producer.sh –broker-list JobTracker:9092 –topic test

2)消费消息

$ bin/kafka-console-consumer.sh –zookeeper hadoopdn2:2181 –topic test –from-beginning