Code前端首页关于Code前端联系我们

Apache Kafka大型应用的20个最佳实践 如何在1秒内处理1500万条消息?

terry 2年前 (2023-09-26) 阅读数 44 #数据库

Apache Kafka 是一个流行的分布式数据流平台,被 New Relic(数据智能平台)、Uber、Square(移动支付公司)等大公司广泛使用。以及高度可靠的实时数据流系统。

例如,在New Relic的生产环境中,Kafka集群每秒能够处理超过1500万条消息,数据聚合速率接近1Tbps。

可见Kafka极大地简化了数据流的处理,因此也赢得了很多应用开发者和数据管理专家的青睐。

不过Kafka在大型系统中的应用会更加复杂。如果您的消费者无法跟上数据流,消息通常会在查看之前就消失。

同时,它在自动数据保留、高流量发布订阅(Pub/Sub)模型等方面的局限性可能会影响您系统的性能。

毫不夸张地说,如果存储数据流的系统不能按需扩展,或者稳定性不可靠,你经常会寝食难安。

为了降低上述复杂度,我分享New Relic的Kafka集群处理高吞吐量的20个最佳实践。

我会从以下四个方面展开:

  • partitions(分区)
  • consumers(消费者)
  • Producers(生产者) ♿
  • ♿ Broker概念和架构

    Kafka是一个高效的分布式消息系统。在性能方面,它具有内置的数据冗余和弹性,以及高吞吐能力和可扩展性。

    功能上,支持自动化的数据存储限制,能够以“流”的方式为应用程序提供数据转换,并根据“键值(key-value)”建模关系“压缩”数据流。

    要了解各种最佳实践,您必须首先熟悉以下关键术语: 消息

    Kafka 中的记录或数据单元。每条消息都有一个键和相应的值,有时还有可选的标头。 生产者(Producer)

    生产者发布关于 Kafka 主题的消息。生产者决定如何在主题分区上发布,例如随机轮询方法或基于消息键的分区算法。 Broker

    Kafka 作为分布式系统或集群工作。那么集群中的每个节点就称为broker。 主题(Topic)

    主题是发布的数据记录或消息的类别。消费者通过订阅主题来读取写入的数据。 主题分区(Topic Partition)

    不同的主题被划分为不同的分区,每条消息被分配一个偏移量。通常,每个分区至少被复制一次或两次。

    每个分区都有一个领导者和每个跟随者上存储的一个或多个副本(即:数据副本)。这种方法可以防止特定经纪人的失败。

    集群中的所有broker都可以作为leader和follower,但一个broker最多可以拥有一份主题分区的副本。 Leader可以用来执行所有的读写操作。 偏移量(Offset)

    单个分区中的每条消息都分配有一个偏移量,该偏移量是一个单调递增的整数,可以用作该分区中消息的唯一标识符。 Consumer(消费者)

    Consumer通过订阅主题分区来读取Kafka的各种主题消息。然后,消费应用程序处理接收到的消息以完成指定的工作。 消费者群体

    可以根据消费者群体对消费者进行逻辑划分。主题分区均匀分布在组中的所有消费者之间。

    因此,在同一个消费者组中,所有消费者以负载均衡的方式运行。

    也就是说,同一个组中的每个消费者都可以看到每条消息。如果某个消费者“离线”,则该分区将分配给同一组中的另一个消费者。这称为“再平衡”。

    当然,如果组中消费者的数量多于分区的数量,就会有一些消费者闲置。

    相反,如果组中消费者的数量少于分区的数量,则一些消费者会收到来自多个分区的消息。 Lag(延迟)

    如果消费者的速度跟不上消息生成的速度,消费者就会因为无法从分区读取消息而被延迟。

    延迟表示为分区标头之后的偏移量。从延迟状态恢复正常(“赶上”)所需的时间取决于消费者每秒可以处理的消息速率。

    计算公式如下: 时间=消息数/(每秒消耗率-每秒生产率)分区最佳实践

    ①了解所提供的相应分区的数据速率节省空间

    这里所谓的“分区的数据速率”是指数据产生的速率。换句话说,数据速率(平均消息大小乘以每秒消息数)决定了保证在给定时间内保存数据的空间量(以字节为单位)。

    如果不知道数据配额,就无法根据一定的时间跨度正确计算出需要多少空间来保存数据。

    同时,数据速率还可以识别单个消费者在不造成延迟的情况下必须支持的最低性能值。

    除非您有其他架构需求,否则请使用编写主题时随机分区

    在执行大型操作时,每个分区的数据速率不均匀非常难以管理。

    原因来自于以下三个方面:

    • 首先,“热”(吞吐量较高)分区上的消费者必然会比同组中的其他消费者处理更多的消息,因此很可能会导致处理和网络费用。
    • 其次,为数据速率最高的分区配置的最大保留空间会导致主体中其他分区的磁盘使用率相应增加。
    • 第三,基于分区的Leader关系实现的最优平衡方案比简单地将Leader关系分配给每个Broker要复杂。对于同一主题,“热”分区将“承载”其他分区 10 倍的重量。

    有关使用主题分区的信息,请参阅在《Kafka Topic Partition的各种有效策略》https://blog.newrelic.com/engineering/ effective-strategies-kafka-topic-partitioning/。 消费者最佳实践

    ③如果消费者运行的版本早于Kafka 0.10,请升级。直接很多已知的bug导致其长时间处于再平衡状态,或者直接导致再平衡算法失败(我们称之为“再平衡风暴”)。

    因此在重新平衡期间,一个或多个分区被分配给同一组中的每个消费者。

    在再平衡风暴中,分区所有权将继续在消费者之间流动,从而阻止任何消费者真正获得分区所有权。

    ④ 调整consumer的socket缓冲区以应对数据的高速流入

    在Kafka 0.10.x版本中,receive.buffer.bytes参数的默认值为64KB。在Kafka 0.8.x版本中,参数socket.receive.buffer.bytes默认值为100KB。

    对于高吞吐量环境来说,两个默认值都太小,特别是当代理和消费者之间的带宽延迟乘积大于局域网(LAN)时。

    对于延迟为 1 毫秒或更长的高带宽网络(例如 10Gbps 或更高),请考虑将套接字缓冲区设置为 8 或 16MB。

    如果内存不足,还可以考虑设置至少 1MB。当然,你也可以将其设置为-1,这允许底层操作系统根据当前网络情况调整缓冲区大小。

    但是,对于需要启动“热”分区的消费者来说,自动缩放可能不会那么快。

    ⑤设计高吞吐量的消费者,实现按需背压

    一般情况下,我们应该保证系统只处理能力范围内的数据,不会超载“消费”。这可能会“挂起”进程或导致使用组溢出。

    在 Java 虚拟机 (JVM) 中运行时,使用者应使用固定大小的缓冲区,最好是堆外缓冲区。请参阅Disruptor模式:http://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf

    固定大小的缓冲区可以防止消费者将太多数据拉入堆栈,因此JVM会花费所有是时候执行垃圾收集了,无法完成处理消息的基本工作。

    ⑥如果您在JVM上运行多个消费者,请调整垃圾收集对它们可能产生的影响

    例如,垃圾收集的长期停滞可能会导致ZooKeeper会话被丢弃,或者消费者组处于重新平衡状态。

    这同样适用于经纪人。如果垃圾收集停止时间过长,则存在集群离线的风险。 产品最佳实践 在 Kafka 0.10.x 版本上,设置为 Acks;在版本 0.8.x 上,它 request.required.acks。

    Kafka通过复制提供容错能力,这样单个节点的故障或者分区Leader关系的变化不会影响系统的可用性。

    如果您没有为生产者配置 ack(或“即发即忘”),消息可能会悄无声息地丢失。

    ⑧为每个生产者配置重试

    默认值为3,自然很低。但是,正确的设置取决于您的应用程序,即对于数据丢失零容忍的应用程序,请考虑将其设置为 Integer.MAX_VALUE(有效和最大值)。

    这将能够处理broker的leader分区无法立即响应生产请求的情况。

    对于高吞吐量生产者,调整缓冲区大小

    指定buffer.memory 和batch.size(以字节为单位)。由于batch.size是在分区之后设置的,因此生产者的性能和内存使用情况可以与主题中的分区数量相关。

    因此这里的设置取决于几个因素:

    • 生产者数据速率(消息的大小和数量)
    • 生成缓冲区的分区数量更大并不总是一件好事。如果生产者由于某种原因失败(例如,领导者的反应比确认慢),那么堆上内存也会失败。缓冲区中的数据越多,需要回收的垃圾就越多。

      ⑩ 检测应用程序以跟踪生成的消息数量、平均消息大小和消耗的消息数量等指标 代理的最佳实践

      ⑪ 在各个代理上,压缩所需内存和 CPU 的所有问题资源。

      日志压缩(请参阅https://kafka.apache.org/documentation/#compaction)需要每个代理上的堆栈(内存)和CPU周期才能成功实现。如果允许失败的日志压缩数据继续增长,就会给brokers分区带来风险。

      您可以调整代理上的 log.cleaner.dedupe.buffer.size 和 log.cleaner.threads 参数,但请记住,这两个值都会影响各个代理上的堆栈使用情况。

      如果代理抛出 OutOfMemoryError 异常,它将被关闭并且数据可能会丢失。

      缓冲区大小和线程数量取决于需要删除的主题分区的数量,以及这些分区中消息的数据速率和键的大小。

      使用 Kafka 0.10.2.1 版本,监视日志文件中的 ERROR 条目是检测其线程可能出现问题的最可靠方法。

      ⑫通过网络吞吐量监控代理

      请监控。 (传输,TX)和入站(接收,RX)的流量,以及磁盘 I/O、磁盘空间和 CPU 使用情况,以及容量规划是维持集群整体性能的关键步骤。

      ⑬在集群中的不同broker之间分配分裂的leader关系

      Leader通常需要大量的网络I/O资源。例如,如果我们将复制因子配置为3并运行。

      Leader首先要从分区获取数据,然后发送两组副本给另外两个follower,再传输给需要数据的不同消费者。

      所以在这个例子中,单个leader使用的网络I/O至少是follower的四倍。此外,Leader 还必须对磁盘执行读操作,而 Follower 必须只执行写操作。

      ⑭不要忽视监控代理的同步副本 (ISR) 收缩、复制不足的分区和不受欢迎的领导者

      这些都是集群中潜在问题的迹象。例如,单个分区中频繁的ISR收缩意味着该分区的数据速率超出了领导者的能力,并且不再能够为消费者和其他副本线程提供服务。

      ⑮根据需要更改Apache Log4j的各种属性

      详情请参见at:https://github.com/apache/kafka/blob/trunk/config/log4j.properties

      Kafka的日志记录占用了大量空间,但我们无法完全关闭它。

      因为有时在事故发生后我们必须重建事件的顺序,代理日志是我们最好的,甚至是唯一的方法。

      ⑯ 禁用自动创建主题,或者对未使用的主题设置清理策略

      例如,如果在固定的 x 天内没有出现新消息,则应考虑该主题是否已过期,并将其从星团这可以防止您花费时间管理在集群中创建的其他元数据。

      对于持续高吞吐量的经纪商,请输入足够的内存以避免从磁盘子系统读取

      如果可能的话,我们应该直接从操作系统的缓存中获取分区数据。然而,这意味着你必须确保你的消费者能够跟上“节奏”,而对于那些落后的消费者,你只能强制broker从磁盘读取。

      ⑱ 对于具有高吞吐量服务级别目标 (SLO) 的大型集群,请考虑将不同的问题隔离到代理子集

      如何确定需要隔离哪些问题完全取决于您和您自己的业务需求。例如,您有多个使用同一集群的在线事务处理 (OLTP) 系统。

      隔离不同代理子集中的任何系统问题有助于限制潜在事件的影响半径。

      ⑲ 在旧客户端上使用新的主题消息格式。而不是客户端,应该在每个broker上加载额外的格式转换服务

      当然,最好尽可能避免这种情况。

      ⑳不要错误地认为与大多数生产环境不同,在本地主机上测试broker就可以代表生产环境中的真实性能。

      loopback接口上的网络延迟几乎可以忽略不计,但如果不涉及复制,获得leader确认所需的时间也会产生很大的差异。 总结

      希望以上建议能够帮助您更有效地使用Kafka。如果您想提高 Kafka 的专业知识,请仔细查看 Kafka 配套文档的操作部分,其中包含有关集群操作的有用信息。

版权声明

本文仅代表作者观点,不代表Code前端网立场。
本文系作者Code前端网发表,如需转载,请注明页面地址。

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

热门