Hadoop(2.x)云计算生态系统

实施细则

下面给出了一些在上一节所描述的低层相关的实现系统的某些部分的细节的简要说明。

API 设计

生产者 APIs

生产者 API 是给两个底层生产者的再封装

-kafka.producer.SyncProducerandkafka.producer.async.AsyncProducer.

    class Producer {  

      /* Sends the data, partitioned by key to the topic using either the */  
      /* synchronous or the asynchronous producer */  
      public void send(kafka.javaapi.producer.ProducerData producerData);  

      /* Sends a list of data, partitioned by key to the topic using either */  
      /* the synchronous or the asynchronous producer */  
      public void send(java.util.List< kafka.javaapi.producer.ProducerData> producerData);  

      /* Closes the producer and cleans up */     
      public void close();  

    }

该API的目的是将生产者的所有功能通过一个单个的API公开给其使用者(client)。新建的生产者可以:

  • 对多个生产者请求进行排队/缓冲并异步发送批量数据 —— kafka.producer.Producer提供了在将多个生产请求序列化并发送给适当的Kafka代理分区之前,对这些生产请求进行批量处理的能力(producer.type=async)。批量的大小可以通过一些配置参数进行控制。当事件进入队列时会先放入队列进行缓冲,直到时间到了queue.time或者批量大小到达batch.size为止,后台线程(kafka.producer.async.ProducerSendThread)会将这批数据从队列中取出,交给kafka.producer.EventHandler进行序列化并发送给适当的kafka代理分区。通过event.handler这个配置参数,可以在系统中插入一个自定义的事件处理器。在该生产者队列管道中的各个不同阶段,为了插入自定义的日志/跟踪代码或者自定义的监视逻辑,如能注入回调函数会非常有用。通过实现kafka.producer.asyn.CallbackHandler接口并将配置参数callback.handler设置为实现类就能够实现注入。

  • 使用用户指定的Encoder处理数据的序列化(serialization)。Encoder的缺省值是一个什么活都不干的kafka.serializer.DefaultEncoder。

  • 提供基于zookeeper的代理自动发现功能 —— 通过使用zk.connect配置参数指定zookeeper的连接url,就能够使用基于zookeeper的代理发现和负载均衡功能。在有些应用场合,可能不太适合于依赖zookeeper。在这种情况下,生产者可以从broker.list这个配置参数中获得一个代理的静态列表,每个生产请求会被随即的分配给各代理分区。如果相应的代理宕机,那么生产请求就会失败。

  • 通过使用一个可选性的、由用户指定的Partitioner,提供由软件实现的负载均衡功能 —— 数据发送路径选择决策受kafka.producer.Partitioner的影响。分区API根据相关的键值以及系统中具有的代理分区的数量返回一个分区id。将该id用作索引,在broker_id和partition组成的经过排序的列表中为相应的生产者请求找出一个代理分区。缺省的分区策略是hash(key)%numPartitions。如果key为null,那就进行随机选择。使用partitioner.class这个配置参数也可以插入自定义的分区策略。

使用者API

我们有两个层次的使用者API。底层比较简单的API维护了一个同单个代理建立的连接,完全同发送给服务器的网络请求相吻合。该API完全是无状态的,每个请求都带有一个偏移量作为参数,从而允许用户以自己选择的任意方式维护该元数据。

高层API对使用者隐藏了代理的具体细节,让使用者可运行于集群中的机器之上而无需关心底层的拓扑结构。它还维护着数据使用的状态。高层API还提供了订阅同一个过滤表达式(例如,白名单或黑名单的正则表达式)相匹配的多个话题的能力。

底层API
class SimpleConsumer {

  /* Send fetch request to a broker and get back a set of messages. */
  public ByteBufferMessageSet fetch(FetchRequest request);

  /* Send a list of fetch requests to a broker and get back a response set. */
  public MultiFetchResponse multifetch(List<FetchRequest> fetches);

  /**
   * Get a list of valid offsets (up to maxSize) before the given time.
   * The result is a list of offsets, in descending order.
   * @param time: time in millisecs,
   *              if set to OffsetRequest$.MODULE$.LATIEST_TIME(), get from the latest offset available.
   *              if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
   */
  public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}

底层API不但用于实现高层API,而且还直接用于我们的离线使用者(比如Hadoop这个使用者),这些使用者还对状态的维护有比较特定的需求。

高层API
/* create a connection to the cluster */
ConsumerConnector connector = Consumer.create(consumerConfig);

interface ConsumerConnector {

  /**
   * This method is used to get a list of KafkaStreams, which are iterators over
   * MessageAndMetadata objects from which you can obtain messages and their
   * associated metadata (currently only topic).
   *  Input: a map of <topic, #streams>
   *  Output: a map of <topic, list of message streams>
   */
  public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);

  /**
   * You can also obtain a list of KafkaStreams, that iterate over messages
   * from topics that match a TopicFilter. (A TopicFilter encapsulates a
   * whitelist or a blacklist which is a standard Java regex.)
   */
  public List<KafkaStream> createMessageStreamsByFilter(
      TopicFilter topicFilter, int numStreams);

  /* Commit the offsets of all messages consumed so far. */
  public commitOffsets()

  /* Shut down the connector */
  public shutdown()
}

该API的中心是一个由KafkaStream这个类实现的迭代器(iterator)。每个KafkaStream都代表着一个从一个或多个分区到一个或多个服务器的消息流。每个流都是使用单个线程进行处理的,所以,该API的使用者在该API的创建调用中可以提供所需的任意个数的流。这样,一个流可能会代表多个服务器分区的合并(同处理线程的数目相同),但每个分区只会把数据发送给一个流中。

createMessageStreams方法为使用者注册到相应的话题之上,这将导致需要对使用者/代理的分配情况进行重新平衡。为了将重新平衡操作减少到最小。该API鼓励在一次调用中就创建多个话题流。createMessageStreamsByFilter方法为发现同其过滤条件想匹配的话题(额外地)注册了多个监视器(watchers)。应该注意,createMessageStreamsByFilter方法所返回的每个流都可能会对多个话题进行迭代(比如,在满足过滤条件的话题有多个的情况下)。

网络层

网络层就是一个特别直截了当的NIO服务器,在此就不进行过于细致的讨论了。sendfile是通过给MessageSet接口添加了一个writeTo方法实现的。这样就可以让基于文件的消息更加高效地利用transferTo实现,而不是使用线程内缓冲区读写方式。线程模型用的是一个单个的接收器(acceptor)线程和每个可以处理固定数量网络连接的N个处理器线程。这种设计方案在别处已经经过了非常彻底的检验,发现其实现起来简单、运行起来很快。其中使用的协议一直都非常简单,将来还可以用其它语言实现其客户端。

消息

消息由一个固定大小的消息头和一个变长不透明字节数字的有效载荷构成(opaque byte array payload)。消息头包含格式的版本信息和一个用于探测出坏数据和不完整数据的CRC32校验。让有效载荷保持不透明是个非常正确的决策:在用于序列化的代码库方面现在正在取得非常大的进展,任何特定的选择都不可能适用于所有的使用情况。都不用说,在Kafka的某特定应用中很有可能在它的使用中需要采用某种特殊的序列化类型。MessageSet接口就是一个使用特殊的方法对NIOChannel进行大宗数据读写(bulk reading and writing to an NIOChannel)的消息迭代器。

消息的格式

/**
     * A message. The format of an N byte message is the following:
     *
     * If magic byte is 0
     *
     * 1. 1 byte "magic" identifier to allow format changes
     *
     * 2. 4 byte CRC32 of the payload
     *
     * 3. N - 5 byte payload
     *
     * If magic byte is 1
     *
     * 1. 1 byte "magic" identifier to allow format changes
     *
     * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used)
     *
     * 3. 4 byte CRC32 of the payload
     *
     * 4. N - 6 byte payload
     *
     */

日志

具有两个分区的、名称为"my_topic"的话题的日志由两个目录组成(即:my_topic_0和my_topic_1),目录中存储的是内容为该话题的消息的数据文件。日志的文件格式是一系列的“日志项”;每条日志项包含一个表示消息长度的4字节整数N,其后接着保存的是N字节的消息。每条消息用一个64位的整数偏移量进行唯一性标示,该偏移量表示了该消息在那个分区中的那个话题下发送的所有消息组成的消息流中所处的字节位置。每条消息在磁盘上的格式如下文所示。每个日志文件的以它所包含的第一条消息的偏移量来命名。因此,第一个创建出来的文件的名字将为00000000000.kafka,随后每个后加的文件的名字将是前一个文件的文件名大约再加S个字节所得的整数,其中,S是配置文件中指定的最大日志文件的大小。

消息的确切的二进制格式都有版本,它保持为一个标准的接口,让消息集可以根据需要在生产者、代理、和使用者直接进行自由传输而无须重新拷贝或转换。其格式如下所示: On-disk format of a messagemessage length : 4 bytes (value: 1+4+n) "magic" value : 1 bytecrc : 4 bytespayload : n bytes

将消息的偏移量作为消息的可不常见。我们原先的想法是使用由生产者产生的GUID作为消息id,然后在每个代理上作一个从GUID到偏移量的映射。但是,既然使用者必须为每个服务器维护一个ID,那么GUID所具有的全局唯一性就失去了价值。更有甚者,维护将从一个随机数到偏移量的映射关系带来的复杂性,使得我们必须使用一种重量级的索引结构,而且这种结构还必须与磁盘保持同步,这样我们还就必须使用一种完全持久化的、需随机访问的数据结构。如此一来,为了简化查询结构,我们就决定使用一个简单的依分区的原子计数器(atomic counter),这个计数器可以同分区id以及节点id结合起来唯一的指定一条消息;这种方法使得查询结构简化不少,尽管每次在处理使用者请求时仍有可能会涉及多次磁盘寻道操作。然而,一旦我们决定使用计数器,跳向直接使用偏移量作为id就非常自然了,毕竟两者都是分区内具有唯一性的、单调增加的整数。既然偏移量是在使用者API中并不会体现出来,所以这个决策最终还是属于一个实现细节,进而我们就选择了这种更加高效的方式。

写操作

日志可以顺序添加,添加的内容总是保存到最后一个文件。当大小超过配置中指定的大小(比如说1G)后,该文件就会换成另外一个新文件。有关日志的配置参数有两个,一个是M,用于指出写入多少条消息之后就要强制OS将文件刷新到磁盘;另一个是S,用来指定过多少秒就要强制进行一次刷新。这样就可以保证一旦发生系统崩溃,最多会有M条消息丢失,或者最长会有S秒的数据丢失,

读操作

可以通过给出消息的64位逻辑偏移量和S字节的数据块最大的字节数对日志文件进行读取。读取操作返回的是这S个字节中包含的消息的迭代器。S应该要比最长的单条消息的字节数大,但在出现特别长的消息情况下,可以重复进行多次读取,每次的缓冲区大小都加倍,直到能成功读取出这样长的一条消息。也可以指定一个最大的消息和缓冲区大小并让服务器拒绝接收比这个大小大一些的消息,这样也能给客户端一个能够读取一条完整消息所需缓冲区的大小的上限。很有可能会出现读取缓冲区以一个不完整的消息结尾的情况,这个情况用大小界定(size delimiting)很容易就能探知。

从某偏移量开始进行日志读取的实际过程需要先找出存储所需数据的日志段文件,从全局偏移量计算出文件内偏移量,然后再从该文件偏移量处开始读取。搜索过程通过对每个文件保存在内存中的范围值进行一种变化后的二分查找完成。

日志提供了获取最新写入的消息的功能,从而允许从“当下”开始消息订阅。这个功能在使用者在SLA规定的天数内没能正常使用数据的情况下也很有用。当使用者企图从一个并不存在的偏移量开始使用数据时就会出现这种情况,此时使用者会得到一个OutOfRangeException异常,它可以根据具体的使用情况对自己进行重启或者仅仅失败而退出。

以下是发送给数据使用者(consumer)的结果的格式。 MessageSetSend (fetch result)total length : 4 byteserror code : 2 bytesmessage 1 : x bytes...message n : x bytesMultiMessageSetSend (multiFetch result)total length : 4 byteserror code : 2 bytesmessageSetSend 1...messageSetSend n

删除

一次只能删除一个日志段的数据。 日志管理器允许通过可加载的删除策略设定删除的文件。 当前策略删除修改事件超过N 天以上的文件,也可以选择保留最后 N GB 的数据。 为了避免删除时的读取锁定冲突,我们可以使用副本写入模式,以便在进行删除的同时对日志段的一个不变的静态快照进行二进制搜索。

数据正确性保证

日志功能里有一个配置参数M,可对在强制进行磁盘刷新之前可写入的消息的最大条目数进行控制。在系统启动时会运行一个日志恢复过程,对最新的日志段内所有消息进行迭代,以对每条消息项的有效性进行验证。一条消息项是合法的,仅当其大小加偏移量小于文件的大小并且该消息中有效载荷的CRC32值同该消息中存储的CRC值相等。在探测出有数据损坏的情况下,就要将文件按照最后一个有效的偏移量进行截断。

要注意,这里有两种必需处理的数据损坏情况:由于系统崩溃造成的未被正常写入的数据块(block)因而需要截断的情况以及由于文件中被加入了毫无意义的数据块而造成的数据损坏情况。造成数据损坏的原因是,一般来说OS并不能保证文件索引节点(inode)和实际数据块这两者的写入顺序,因此,除了可能会丢失未刷新的已写入数据之外,在索引节点已经用新的文件大小更新了但在将数据块写入磁盘块之前发生了系统崩溃的情况下,文件就可能会获得一些毫无意义的数据。CRC值就是用于这种极端情况,避免由此造成整个日志文件的损坏(尽管未得到保存的消息当然是真的找不回来了)。