Hadoop(2.x)云计算生态系统

SamzaContainer

SamzaContainer负责管理一个或多个StreamTask实例的启动,执行和关闭。每个SamzaContainer通常在独立的Java虚拟机上运行。一个Samza job可以由几个SamzaContainers组成,并可以在在不同的机器上运行。

当SamzaContainer启动时,它按照顺序执行以下操作:

1)获取每个输入流的分区的最后一个检查点的偏移量,并从此位置开始处理消息
2)作为消费者,对于每一个输入流分区创建一个“reader”线程
3)启动度量监控器,监控度量指标
4)启动一个定时器检查站,每隔一段时间,用来保存您的任务的输入流中的偏移量
5)如果定义了任务的windows方法,将启动一个窗口定时器,触发任务的windows方法
6)按照kafka队列的每个输入流分区,实例化和初始化StreamTask。
7)启动事件循环

这些步骤的每一个过程,将通知生命周期监听器。

让我们先从步骤的中间开始,以StreamTask的实例。其他步骤将在文档的其他部分阐述。

任务与分区(Tasks and Partitions)

当container 启动的时候,首选创建 task class的实例对象。如果 task class实现了InitableTask的接口,SamzaContainer将调用其 init() 方法。

/** Implement this if you want a callback when your task starts up. */
public interface InitableTask {
  void init(Config config, TaskContext context);
}

默认情况下,有多少个samza job的partition,就会创建多少个samza task 的实例。例如:如果 samza job 有10个partition,那么将创建 10个task 实例,每个实例用来接收每个parttion的消息。如下图:

数据流的partition数量,取决于使用的系统。例如,如果的数据流是通过kafka系统来实现的,那么数据流的partition数量是你创建kafka队列的时候的cmd决定的。如果cmd中没有指定,那么默认是通过 kafka的服务器端配置中的num.partitions这个阐述。

如果samza job多于一个input stream,那么,对应到Samza job 的task实例,将是所有input stream中partition的最大值。例如:如果一个Samza job从PageViewEvent(12 partition)和ServiceMetricEvent(14 partition),那么,将会产生14个task实例(从0到13)。task实例12和13将只用来读取和ServiceMetricEvent事件,因为PageViewEvent没有对应的partition。

容器和资源分配

虽然task实例的数量是固定的( 由输入分区的数量决定 ),那 你可以配置多少容器为你工作,是否什么决定的呢?如果您使用的yarn,容器的数量是由分配给你的CPU和内存资源决定的。

每一个SamzaContainer被设计成只是用一个CPU,因此他用了单线程事件循环的模式。因此,你在开发程序的时候,不要再SamzaContainer创建自己的多线程。如果你需要进行并行,你需要配置一下你的job利用多SamzaContainer。