Hadoop(2.x)云计算生态系统

Streams

samza 容器 是通过 SystemConsumer 与 SystemProducer 接口进行消息得读取与写入。你可以通过这两个接口实现对任何消息系统得整合。

public interface SystemConsumer {
  void start();

  void stop();

  void register(
      SystemStreamPartition systemStreamPartition,
      String lastReadOffset);

  List<IncomingMessageEnvelope> poll(
      Map<SystemStreamPartition, Integer> systemStreamPartitions,
      long timeout)
    throws InterruptedException;
}

public class IncomingMessageEnvelope {
  public Object getMessage() { ... }

  public Object getKey() { ... }

  public SystemStreamPartition getSystemStreamPartition() { ... }
}

public interface SystemProducer {
  void start();

  void stop();

  void register(String source);

  void send(String source, OutgoingMessageEnvelope envelope);

  void flush(String source);
}

public class OutgoingMessageEnvelope {
  ...
  public Object getKey() { ... }

  public Object getMessage() { ... }
}

开箱即用,Samza实现了对kafka得支持(KafkaSystemConsumer和KafkaSystemProducer)。然而,任何消息总线系统可以被整合,只要它能提供由Samza所需的接口,可以参考Javadoc说明。

SystemConsumers和SystemProducers可以读取和写入任何数据类型的消息。默认可以支持支持 字节数组- Samza具体得转换是通过应用程序代码使用对象单独序列化层。 Samza没有规定任何具体的数据模型或序列化格式,具体形式可以由开发人员实现。

流处理

如果job从多个输入数据流处处理消息,那么所有输入流有可用的消息,缺省情况下将被被在一个循环的方式逐个处理。例如,如果一个job需要处理AdImpressionEvent和AdClickEvent得消息,任务实例的process()方法被调用,来处理AdImpressionEvent消息,再处理AdClickEvent消息,然后从AdImpressionEvent处理另一个消息,...,并继续在这两者之间交替处理。

如果流处理系统处理中如果一个输入流没有消息,那么流处理系统将跳过这个输入流进行处理另外一个输入流。同时,还会继续检查空输入流是否有新的消息进来。

MessageChooser

当Samza容器对不同的流分区传入的消息进行处理得时候,它是如何决定要首先处理拿一个?该行为是由一个MessageChooser决定。默认选择器是RoundRobinChooser,但你可以自己实现自定义选择器覆盖它。

实现自己的MessageChooser,你需要实现MessageChooserFactory接口,并在配置文件中设置“task.chooser.class”,并配置您实现的完全类名:

task.chooser.class=com.example.samza.YourMessageChooserFactory

优先输入流

在一定得时间窗口内,可以让一个输入流得处理比另一个输入流得处理有更高得优先级别,我们可以通过设置输入流得优先级别。例如:samza得job需要处理2个输入流,其中一个输入流由实时系统提供消息,另外一个由批处理系统提供处理消息。在这个案例中,我们将给实时输入流提供更高得优先级。可以使实时输入数据流系统不会产生突然变慢得情况。

例如,我们可以再配置文件中设置如下:

systems.kafka.streams.my-real-time-stream.samza.priority=2
systems.kafka.streams.my-batch-stream.samza.priority=1

引导顺序

批处理

在某些情况下,可以提高每次从多个分区消费多个消息,从而提高消息得处理能力。 Samza支持这种操作模式,被称为批处理。

例如,如果你想读的100条信息从每个流分区行,你可以使用这个配置参数:

task.consumer.batch.size=100