读书笔记-Kafka权威指南

一二章部分内容

producer发送流程:

  1. producer发送message到broker。
  2. Broker commit到本地存储。确认模式取决于request.required.acks=0/1/all参数设置。
  3. 等broker确认后,producer继续发下一条。

硬件选择:

Kafka clusters:一般是指多个broker,主要为了分担负载。 Replication:指partition的replication,主要为了数据的容灾。

集群大小的选择:

  1. 需要多少磁盘容量。若需要保留10TB数据,单个broker可以保留2TB,则显然至少需要5个broker。
  2. 需要什么程度的集群处理能力,主要是考虑吞吐率。

第三章 Kafka Producers

第三方客户端:kafka第三方提供了C++、Python、Go等多种语言实现的客户端,实现了二进制线协议。虽然Kafka是用Java实现的,但通过这些第三方客户端可以在多种语言里享用Kafka。

生产者的多种不同需求:能否忍受消息丢失?能否忍受消息重复?是否有严格的latency要求?即常见的At most once, At least once, Exactly once 三种需求。

kafka producer架构.jpg

发送步骤:

  1. 创建ProducerRecord,指定topic。key/partition可选。
  2. 序列化,key and value对象 → ByteArrays,以便在网络中进行传输。
  3. 数据被发送到partitioner。如果在创建ProducerRecord时指定了partition,partitioner不做事情;否则会选择一个partition。这时producer确定了topic+partition。
  4. 一个单独的线程负责将topic+partition中的记录以batch方式发送给kafka broker。(猜测,在发送给broker前是放内存的)
  5. Kafka broker收到消息后回ack。如果消息成功写入,返回RecordMeatadata对象,包括topic、partition、记录在partition中的偏移;如果消息写入失败,返回error。
  6. 当producer收到ack时,会发下一条;当收到error时,根据配置决定是否重试、重试次数,若失败则返回error。

创建kafka producer的三个强制属性:

  1. bootstrap.servers,以host:port的列表形式指定kafka cluster地址。
  2. key.serializer,用于序列化的类。java对象 → 字节数组(byte arrays)。
  3. value.serializer,跟key.serializer差不多。

3种消息发送模式:

  1. 发送后就不管。由于kafka的高可用,这种模式下,大多数消息都会成功到达,但的确会丢失消息。
  2. 同步发送。会阻塞。send()方法返回一个Future对象,我们使用get()方法在future上等待来看send()是否调用成功。
  3. 异步发送。send()方法调用时带一个callback,当从kafka broker收到response时callback会被触发。

3种消息发送模式的写法:

  1. producer.send(record)。“发送后就不管”的发送模式。
    1. 调用send()方法时,消息被放在buffer中,然后由上述的一个单独线程发送。
    2. 序列化是调用send()方法时执行的操作,而不是构建ProducerRecord时执行的。所以在调用send()方法时,可能出现的异常有:SerializationException,BufferExhaustedException/TimeoutException (buffer满时),InterruptedException(负责发送的线程被中断)。
  2. producer.send(record).get()。同步发送模式的写法。会阻塞直到Future Obj有了返回结果。
  3. producer.send(record, new ProducerCallback())。异步发送模式的写法,不会阻塞,且能拿到发送结果。ProducerCallback类需要实现CallBack接口。

一系列producer配置参数中,除acks还有比较重要的是max.in.flights.requests.per.session=1,这可以保证重试时数据的顺序还是和发送顺序一致的。避免 batch1(失败)- batch2(成功)- 1(重试,成功)这种情况导致顺序错乱。

序列化一般使用StringSerializer和现成的Serializer,实在有必要再自己构建。

Parition的策略也可以自己定制。

第四章 Kafka consumers

Rebalance,重新调整的是一个consumer group中的哪些consumer消费哪些partition。

consumers and partitions.jpg

  1. 对于一个topic,其里面有N个partition,当consumer group中consumer的数量>N时,会有consumer处于空闲状态,得不到数据。所以当producer速率>consumer时,增加consumer数量是好办法,但是consumer数量上限不要超过partition分区数量,不然纯属浪费。意味着,一个partition不会被一个consumer group中的多个consumers同时消费。
  2. 划分consumer group的概念是为了将应用程序-consumer group对应起来,让每个应用程序可以获取topic中的所有信息,即从所有partition而非一个子集中获取信息。
  3. 分区的rebalance:partition随着consumer group中consumer的加入/离开而发生被消费变化:当一个consumer新加入时,会消费之前由其它consumer消费的partition;当一个consumer离开时,会把曾消费的partition给其它consumer消费。在rebalance期间,consumer不能消费。(应该是抛error)
  4. Consumer通过给kafka broker发心跳来看是加入/离开的。这个收心跳的broker称为group coordinator。

创建kafka consumer的三个强制属性:

  1. bootstrap.servers
  2. key.deserializer
  3. value.deserializer。这三个跟producer很相似,除了做serialize/deserialize的相反功能。

非强制但重要属性:

  1. group.id,所属的consumer group。会频繁用到。

commit an offset

commit offset的几种方式:

  1. 自动commit。配置enable.auto.commit=true,每固定时间(默认5s) consumer会提交从poll()方法中返回的最大offset。后果:某些窗口内的事件会被消费2次,通过缩小commit间隔可以减少重复消费事件的窗口大小。越小间隔,越小数量的重复消费。但越小间隔意味着性能开销越大,需要有trade-off。
  2. 手动显式commit。设置auto.commit.offset=false,然后程序中调用consumer.commitSync()显式提交。commitSync()会提交poll()返回的最新offset,失败会重试。后果:重复消费。
  3. 异步commit。调用方法:consumer.commitAsync()或 consumer.commitAsync(callback)。commitAsync()没有失败重试,是因为由于异步会立即返回,后续可能有成功的commitAsync,再重试成功的话会写回旧值。一个简单可行的异步重试机制是使用递增ID,commit时比较callback返回的ID与当前ID。
  4. 使用同步+异步commit。日常使用consumer.commitAsync(),但在finally中进行consumer.close()之前,执行一次consumer.commitSync()。
  5. Commit特定偏移量。上述的commit只能提交最后返回的偏移量。

Rebalance listeners: 目的:在partition rebalance之前做一些offset commit或清理工作。譬如在失去对这个partition的ownership前进行offset提交和文件handler关闭等。

指定消费的offset:可以从头(seekToBeginning)、从尾(seekToEnd)或中间一个特定的地方开始。

consumer.poll()放在一个无限循环中,会不断消费。关于如何退出:在另一个线程中调用consumer.wakeup().

第五章 Kafka Internals

kafka使用zookeeper做集群管理。每个broker有个唯一的ID。ZK中:/brokers/ids

The Controller:是一个特殊的kafka broker,额外负责选举partition leader。会在ZK中创建/controller临时节点。broker在控制节点中创建zookeeper watch以便收到此节点变更的通知。当controller停止/失去与ZK的连接时,/controller节点会消失,其它brokers通过zookeeper watch捕捉到此信息然后都尝试建立/controller节点,第一个创建成功的会变成新的controller。

利用ZK临时节点的特性选举controller,controller负责选举partition leaders。

Replication

Request Processing

物理存储 存储的基本单位是partition,partition不可再分割。

第六章 Reliable Data Delivery

kafka提供的保证:

  1. 顺序保证。同一partition中先被写入的先被消费。
  2. 当生成的消息在所有同步副本中被写入分区后,就被视为“已提交”。
  3. 只有至少有一个同步副本存活,被提交的消息就不会丢失。
  4. 消费者只能消费 已提交的消息。

in-sync和out-of-sync。注:如果看到一个或多个副本频繁的在in-sync和out-of-sync状态间切换,那么这基本可以说明集群配置有问题。其中常见的问题之一是java GC的错误配置。错误的配置可能导致GC期间broker停顿几秒,然后就变成out-of-sync。

broker reliable broker配置的三个参数:

  1. 复制因子。replication.factor,推荐replication.factor=3。以及建议通过broker.rack给每个broker配置机架名,以保证partition的副本可以跨机架分布。
  2. 不洁leader选举(是否允许out-of-sync中的副本成为Leader)。unclean.leader.election.enable,默认为true。场景是,当in-sync副本中只有leader且leader挂了时,要在”不选新Leader,partition则会一直为离线状态(可用性)” 和 “从out-of-sync副本中选新leader,则下游可能各种不一致数据(一致性)” 中作出艰难选择。 如果对数据一致性有非常高要求,这个参数就应该设为false。
  3. 最小in-sync副本数。min.insync.replicas=N,当消息被至少N个in-sync副本写入后,才被认为已提交。也就是说,如果in-sync中副本数小于N,消息肯定无法被接收。(这个参数的配置主要是为了防止在不洁leader选举中做出艰难选择)

producer reliable

  1. 发送ack。ack=0/1/all。0:只要发出就OK;1:leader收到就OK;all:所有in-sync副本都收到。
  2. 配置重试次数。可以配置为无限尝试,例如kafka mirrormaker工具。
  3. 额外的错误处理。producer自己会有错误处理,除此之外,开发者要关注程序中的错误处理。如:不可重试的broker错误、在消息发送给broker前的错误、重试次数或内存耗尽时的错误。

consumer reliable consumer几个重要的参数配置:

  1. group.id,标识consumer group。
  2. auto.offset.reset=earliest/latest,此参数控制消费者未提交offset或要求broker中不存在的offset时将执行的操作。
  3. enable.auto.commit,自己手动提交还是自动提交。设为true的主要好处是实现消费者时无需担心提交这件事。
  4. auto.commit.interval.ms,和第3个参数绑定,默认是5s。

第七章 构建数据管道

跳过。

第八章 跨集群数据镜像

kafka内置的跨集群复制工具:MirrorMaker。

hub-and-spokes架构.jpg 双活架构.jpg 主备架构.jpg

  1. hub-and-spokes架构。适用场景:一个中心kafka集群对应多个本地kafka集群。不足:一个DC的程序无法访问另一个DC的数据;本地DC的程序无法访问到全局数据。
  2. 双活架构。适用场景:两个+的DC需要共享数据。优势:为就近用户提供服务,性能优势;冗余和弹性。不足:异步读取和异步更新时的冲突问题需要开发人员考虑周全;循环镜像问题。
  3. 主备架构。适用场景:对kafka集群的失效备援。不足:只是为了灾备,是对集群的浪费。
  4. 延展集群。适用场景:当整个DC(而非kafka集群)发生故障时,避免kafka集群失效。

kafka的MirrorMaker

本质上是由一个生产者线程和多个消费者线程组成的进程。MirrorMaker是高度可配置的。

其它跨集群镜像方案 ⁃ uber的uReplicator ⁃ confluent的Replicator