Hadoop(2.x)云计算生态系统

Serialization

序列化

任何消息最终都需要被序列化成字节(通过网络发送或写到本地磁盘),包括读取或写入的流数据或者持久化状态信息存储。在序列化和反序列化过程中可能发生的事情:

1.在客户端lib库中:例如,支持kafka的publishing或者consuming的插件式的序列化功能。
2.在任务的执行中:你的程序执行过程中将处理inputs与outputs的消息数据,并自身进行转换与序列化。
3.在两者之间: Samza 提供了一层专门进行序列化与反序列化,叫做SERDE层。 

你可以使用任何方式做这块工作; Samza不会强加任何特定的数据模型或序列化模式。然而,最合理的做法,通常是使用Samza的SERDE层。下面的配置示例说明了如何使用它。

# Define a system called "kafka"
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory

# The job is going to consume a topic called "PageViewEvent" from the "kafka" system
task.inputs=kafka.PageViewEvent

# Define a serde called "json" which parses/serializes JSON objects
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory

# Define a serde called "integer" which encodes an integer as 4 binary bytes (big-endian)
serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory

# For messages in the "PageViewEvent" topic, the key (the ID of the user viewing the page)
# is encoded as a binary integer, and the message is encoded as JSON.
systems.kafka.streams.PageViewEvent.samza.key.serde=integer
systems.kafka.streams.PageViewEvent.samza.msg.serde=json

# Define a key-value store which stores the most recent page view for each user ID.
# Again, the key is an integer user ID, and the value is JSON.
stores.LastPageViewPerUser.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory
stores.LastPageViewPerUser.changelog=kafka.last-page-view-per-user
stores.LastPageViewPerUser.key.serde=integer
stores.LastPageViewPerUser.msg.serde=json

所有Samza API来发送和接收消息的类型为object。这意味着,你必须将对象转换为正确的类型,然后才能正确使用它们。这需要写一点点的代码,但它具有的优点是Samza不限于任何特定的数据模型。