本文首要从Kafka消费、堆积、安稳性、预案、本钱操控等视点等最佳实践。
要确保Kafka在运用进程中的安稳性,需求从kafka在事务中的运用周期进行顺次确保。首要能够分为:事前防备(经过标准的运用、开发,防备问题产生)、运行时监控(确保集群安稳,出问题能及时发现)、毛病时处理(有完好的应急预案)这三阶段。
事前防备即经过标准的运用、开发,防备问题产生。首要包含集群/出产端/消费端的一些最佳实践、上线前测验以及一些针对紧急状况(如音讯积压等)的暂时开关功用。Kafka调优准则:
依据详细场景(是否答应必定推迟、实时音讯、守时周期使命等)区别kafka topic,防止抢占或堵塞实时事务音讯的处理。
假如下流音讯消费存在瓶颈或许集群负载过高级,需求在出产端(或音讯网关)施行流量出产速率的操控或许延时/暂定音讯发送等战略,防止短时刻内发送很多音讯。
手动去查询丢掉的那部分数据,然后将音讯从头发送到mq里边,把丢掉的数据从头补回来。
假如需求在确保Kafka在分区内严厉有序的话(即需求确保两个音讯是有严厉的先后次序),需求设置key,让某类音讯依据指定规矩路由到同一个topic的同一个分区中(能处理大部分消费次序的问题)。 可是,需求防止分区内音讯歪斜的问题(例如,依照店肆Id进行路由,简单导致音讯不均衡的问题)。
:音讯发送指定key,确保相同key的音讯发送到同一个partition。
:单线程消费或许写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后关于 N 个线程,每个线程别离消费一个内存 queue ;
:kafka先将音讯缓存在内存中的双端行列(buffer)中,当音讯量到达batch size指定巨细时进行批量发送,减少了网络传输频次,进步了传输功率;
: 将一批音讯打包后进行紧缩,发送给 Broker 服务器后,但频频的紧缩和解压也会下降功用,终究仍是以紧缩的办法传递到顾客的手上,在 Consumer 端进行解压;
:将出产者改造为异步的办法,能够进步发送功率,可是假如音讯异步产生过快,会导致挂起线程过多,内存不足,终究导致音讯丢掉;
:当一个时刻相对长的使命在履行时,它会占用该音讯地点索引分区被确认,后边的使命不能及时派发给闲暇的客户端处理,若服务端假如启用索引分区并行消费的特性,就能够及时的把后边的使命派发给其他的客户端去履行,一起也不需求调整索引的分区数(但此类音讯仅适用于无需确保音讯次序联系的音讯)
的api进行发送,并设置 acks、retries、factor等等些参数来确保Producer发送的音讯不丢掉。
:kafka为了得到更高的功用和吞吐量,将数据异步批量的存储在磁盘中,并采用了批量刷盘的做法,假如对数据牢靠性要求很高的话,能够修正为
吞吐量:调整partition 数、OS page cache(分配满足的内存来缓存数据);
offset topic(__consumer_offsets):c.replication.factor(默以为3)、offsets.retention.minutes(默以为1440,即 1day);
offset commit较慢:异步 commit 或 手动 commit
max.poll.interval.ms:调用 poll() 之后推迟的最大时刻,超越这个时刻没有调用 poll() 的话,就会以为这个 consumer 挂掉了,将会进行 rebalance
max.poll.records:当调用 poll() 之后回来最大的 record 数,默以为500
Consumer Rebalance:check timeouts、check processing times/logic、GC Issues
简而言之,即经过Redis做前置处理 + DB仅有索引做终究确保来完成幂等性。
在音讯量十分大的状况下,实时和离线顾客一起消费一个集群,离线数据深重的磁盘 IO 操作会直接影响实时事务的实时性和集群的安稳性。
:对数据实时性要求较高;在实时消费的场景下,Kafka 会运用体系的 page cache 缓存,直接从内存转发给实时顾客(热读),磁盘压力为零,适宜广告、引荐等事务场景。
:一般是消费数分钟前或是数小时前的音讯,这类音讯一般存储在磁盘中,消费时会触发磁盘的 IO 操作(冷读),适宜报表核算、批量核算等周期性履行的事务场景。
出产速度大于消费速度,这样能够恰当添加分区,添加consumer数量,进步消费TPS;
防止很重的消费逻辑,优化consumer TPS:是否有很多DB操作;下流/外部服务接口调用超时;是否有lock操作(导致线程堵塞);需求特别重视kafka异步链路中的触及音讯扩展的逻辑;
假如有较重的消费逻辑,需求调整xx参数,防止音讯没消费完时,消费组退出,形成reblance等问题
注:批量拉取处理时,需留意下kafka版别,spring-kafka 2.2.11.RELEASE版别以下,假如装备kafka.batchListener=true,可是将音讯接纳的元素设置为单个元素(非批量List),或许会导致kafka在拉取一批音讯后,只是消费了头部的第一个音讯。
B. 怎么防止非必要rebalance(顾客下线、顾客自动退出消费组导致的reblance):
一般状况下,仍是client 消费 broker 丢音讯的场景比较多,想client端消费数据不能丢,肯定是不能运用autoCommit的,所以有必要是手动提交的。
Consumer自动提交的机制是依据必定的时刻距离,将收到的音讯进行commit。commit进程和消费音讯的进程是异步的。也便是说,或许存在消费进程未成功(比方抛出反常),commit音讯现已提交了,则此刻音讯就丢掉了。
不同topic(乱序音讯):假如支授予订单生成对应不同的topic,只能在consumer层面去向理了。
同一个topic(乱序音讯):一个topic能够对应多个分区,别离对应了多个consumer,与“不同topic”没什么本质上的不同。(能够理解为咱们的服务有多个pod,出产者次序发送音讯,但被路由到不同分区,就或许变得乱序了,服务消费的便是无序的音讯)
同一个topic,同一个分区(次序音讯):Kafka的音讯在分区内是严厉有序的,例如把同一笔订单的一切音讯,依照生成的次序一个个发送到同一个topic的同一个分区。
例如:订单和付出别离封装了各自的音讯,可是消费端的事务场景需求按订单音讯-付出音讯的次序顺次消费音讯。
宽表(事务主题相关的目标、维度、特色相关在一起的一张数据库表):消费音讯时,只更新对应的字段就好,音讯只会存在时刻短的状况不共同问题,可是状况终究是共同的。例如订单,付出有自己的状况字段,订单有自己的状况字段,售后有自己的状况字段,就不需求确保付出、订单、售后音讯的有序,即便音讯无序,也只会更新自己的状况字段,不会影响到其他状况;
音讯补偿机制:将音讯与DB进行比照,假如发现数据不共同,再从头发送音讯至主进程处理,确保终究共同性;
两者都是经过将音讯绑定到定向的分区或许行列来确保次序性,经过添加分区或许线程来进步消费才能。
出产者在发送音讯时,已确保音讯在分区内有序,一个分区对应了一个顾客,确保了音讯消费的次序性。
单线程次序消费的扩展才能很差。为了进步顾客的处理速度,除了横向扩展分区数,添加顾客外,还能够运用多线程次序消费。
将接纳到的kafka数据进行hash取模(留意:假如kafka分区承受音讯现已是取模的了,这儿必定要对id做一次hash再取模)发送到不同的行列,然后敞开多个线程去消费对应行列里边的数据。
此外,这儿经过装备中心进行开关、动态扩容/缩容线) 处理Consumer的事务
经过事务音讯,能够很好的确保一些事务场景的事务逻辑,不会因为网络不可用等原因呈现体系之间状况不共同。
当更新任何一个服务呈现毛病时就抛出反常,事务音讯不会被提交或回滚,音讯服务器会回调发送端的事务查询接口,确认事务状况,发送端程序能够依据音讯的内容对未做完的使命从头履行,然后告知音讯服务器该事务的状况。
Broker 评价:每个 Broker 的 Partition 数不应该超越2k、操控 partition 巨细(不要超越25GB);
集群评价(Broker 的数量依据以下条件装备):数据保存时刻、集群的流量巨细;
集群扩容:磁盘运用率应该在 60% 以下、网络运用率应该在 75% 以下;
集群监控:坚持负载均衡、确保 topic 的 partition 均匀分布在一切 Broker 上、确保集群的阶段没有耗尽磁盘或带宽
Partition 数:Partition 数应该至少与最大 consumer group 中 consumer 线程数共同;关于运用频频的 topic,应该设置更多的 partition;操控 partition 的巨细(25GB 左右);考虑运用未来的添加(能够运用一种机制进行自动扩容);
partition 扩容:当 partition 的数据量超越一个阈值时应该自动扩容(实际上还应该考虑网络流量)。
设置多个分区在必定程度上是能够进步顾客消费的并发度,可是分区数量过多时或许会带来:句柄开支过大、出产端占用内存过大、或许添加端到端的推迟、影响体系可用性、毛病康复时刻较长等问题。
自上而下分为运用程序层、结构层、JVM层和操作体系层,层级越靠上,调优的作用越显着。
kafka的安稳性测验首要在事务上线前针对Kafka实例/集群健康性、高可用性的测验。
(1) 查看实例:查看Kafka 实例目标中拿到一切的信息(例如 IP、端口等);
运行时监控首要包含集群安稳性装备与Kafka监控的最佳实践,旨在及时发现Kafka在运行时产生的相关问题与反常。
音讯即便被消费,也会耐久化到磁盘存储保存时长的时刻。该设置会占用磁盘空间,假如每天音讯量很大的话,可恰当缩短保存时刻。
当磁盘容量到达阈值,则删去最早的音讯,最多删去到保底时长规模外的音讯(筛选战略),能够很大程度防止磁盘被打满的状况。但有调整时不会自动告诉,但咱们能够经过装备告警感知磁盘容量的改变。
处理:Broker等级物理阻隔:创立Topic、搬迁Topic、宕机康复流程
Kafka RPC 行列短少阻隔,一旦某个 topic 处理慢,会导致一切恳求 hang 住。
处理:需求依照操控流、数据流别离,且数据流要能够依照 topic 做阻隔。
将 call 行列依照拆解成多个,并且为每个 call 行列都分配一个线程池。
一个行列独自处理 controller 恳求的行列(阻隔操控流),其他多个行列依照 topic 做 hash 的分散开(数据流之距离离)。
假如一个 topic 呈现问题,则只会堵塞其间的一个 RPC 处理线程池,以及 call 行列,能够确保其他的处理链路是疏通的。
整个限速逻辑完成在 RPC 作业线程处理的结尾,一旦 RPC 处理结束,则经过限速操控模块进行限速检测。
针对CKafka,需求装备告警(此类告警一般为音讯积压、可用性、集群/机器健康性等查看)。
如:实例健康状况、节点数量、健康节点数量、问题分区数、出产音讯数、消费恳求数、jvm内存运用率、均匀出产呼应时刻、分区消费偏移量等。
一般会对当时服务本身的kafka集群做告警装备,可是假如是依靠本身音讯的下流服务呈现消费问题,咱们是感知不到了;并且针对消费端服务不共用同一个集群的状况,呈现音讯重复发送的问题,服务本身是很难发现的。
在事务上线前,最好整理下本身服务所触及的topic音讯(上游出产端和下流消费端),并细化告警装备,假如呈现上游kafka反常或许下流kafka音讯堆积能够及时感知。特别需求把或许有瞬时很多音讯的场景(如批量数据导入、守时全量数据同步等)做必定的告警或许预案,防止服务不可用或许影响正常事务音讯。
经过自建告警渠道装备对服务本身的反常告警,其间包含对结构在运用kafka组件时抛出与kafka消费逻辑进程中抛出的事务反常。
其间,或许需求反常晋级的状况(因为)独自做下处理(针对spring kafka):
自界说kafka反常处理器:完成KafkaListenerErrorHandler接口的办法,注册自界说反常,区别事务反常并抛出;
消费Kafka音讯时,将@KafkaListener的errorHandler参数设置为界说的Kafka反常处理器;
尔后,指定的事务反常会被抛出,而不会被封装成Spring kafka的结构反常,导致不能明晰地了解详细反常信息。
Kafka Manager:应该算是最有名的专属 Kafka 监控结构了,是独立的监控体系。
Kafka Monitor:LinkedIn 开源的免费结构,支撑对集群进行体系测验,并实时监控测验成果。
CruiseControl:也是 LinkedIn 公司开源的监控结构,用于实时监测资源运用率,以及供给常用运维操作等。无 UI 界面,只供给 REST API。
JMX 监控:因为 Kafka 供给的监控目标都是依据 JMX 的,因而,市面上任何能够集成 JMX 的结构都能够运用,比方 Zabbix 和 Prometheus。已有大数据渠道自己的监控体系:像 Cloudera 供给的 CDH 这类大数据渠道,天然就供给 Kafka 监控计划。
JMXTool:社区供给的命令行东西,能够实时监控 JMX 目标。答上这一条,归于肯定的加分项,因为知道的人很少,并且会给人一种你对 Kafka 东西十分了解的感觉。假如你暂时不了解它的用法,能够在命令行以无参数办法履行一下kafka-run-class.sh kafka.tools.JmxTool,学习下它的用法。
其间,Kafka Monitor经过模仿客户端行为,出产和消费数据并收集音讯的推迟、错误率和重复率等功用和可用性目标,能够很好地发现下流的音讯消费状况然后能够动态地调整音讯的发送。(运用进程中需留意对样本掩盖率、功用掩盖率、流量、数据阻隔、时延的操控)
经过为每个 Partition 发动独自的出产使命,确保监控掩盖一切 Partition。
在出产的音讯中包含了时刻戳、序列号,Kafka Monitor 能够依据这些数据对音讯的推迟、丢掉率和重复率进行计算。
出产的音讯在序列化时指定为一个可装备的巨细(验证对不同巨细数据的处理才能、相同音讯巨细的功用比较)
经过设定独自的 Topic 和 Producer ID 来操作 Kafka 集群,可防止污染线上数据,做到必定程度上的数据阻隔。
依据Kafka Monitor的规划思维,能够针对事务特色引进对音讯的推迟、错误率和重复率等功用的监控告警目标。
消费端产生音讯积压,导致依靠该音讯的服务不能及时感知事务改变,导致一些事务逻辑、数据处理呈现推迟,简单产生事务堵塞和数据共同性问题。
问题排查、扩容升配战略、音讯Topic转化战略、可装备多线 问题排查遇到音讯积压时,详细能够从以下几个视点去定位问题原因:
查看出产端消费发送状况(首要查看是否持续有音讯产生、是否存在逻辑缺点、是否有重复音讯发送)
若为出产端问题,则评价是否能够经过添加分区数、调整偏移量、删去topic(需求评价影响面)等处理
简而言之,即线程池消费+动态线程池装备战略:将接纳到的kafka数据进行hash取模(假如kafka分区承受音讯现已是取模的了,这儿必定要对id做一次hash再取模)发送到不同的行列,然后敞开多个线程去消费对应行列里边的数据。
在运用发动时初始化对应事务的次序消费线程池(demo中为订单消费线程池)
别的,能够依据事务流量调整的线程装备与pod的装备,如高峰期设置一个相对较高的并发等级数用来快速处理音讯,平峰期设置一个较小的并发等级数来让出体系资源。这儿,能够参阅美团供给的一种装备中心修正装备动态设置线程池参数的思路,完成动态的扩容或许缩容。
经过set办法修正concurrent的值时,先修正stopped的值去中止当时正在履行的线程池。
履行结束后经过新的并发等级数新建一个新的线程池,完成了动态扩容与缩容。
此外,还能够新增开关,它设置为true是能够中止发动中的线程池,毛病时进行功用开关。
留意: 假如触及数据共同性问题,需求经过数据比对、对账等功用进行校验。
当音讯积压是产生在一切的partition仍是一切的partition都有积压状况时,只能操作暂时扩容,以更快的速度去消费数据了。
然后写一个暂时分发音讯的consumer程序,这个程序布置上去消费积压的音讯,消费之后不做耗时处理,直接均匀轮询写入暂时建好分10数量的queue里边。
紧接着征用10倍的机器来布置consumer,每一批consumer消费一个暂时queue的音讯。
这种做法相当于暂时将queue资源和consumer资源扩展10倍,以正常速度的10倍来消费音讯。
等快速消费完了之后,康复本来的布置架构,从头用本来的consumer机器来消费音讯。
Broker丢掉音讯:Kafka为了得到更高的功用和吞吐量,将数据异步批量的存储在磁盘中,异步刷盘有肯能形成源头数据丢掉;
例如,创立用户的kafka音讯,或许价格中心和促销服务误用了一个消费组,导致每个服务都是消费了部分音讯,导致一些问题呈现偶现的状况。
处理:修正装备,重启服务,各种树立的消费组;事前需求有查看是否有多个服务共用一个消费的状况(检测+比对);
需求从头评价你的实例类型决议计划:你的集群是否饱满?在什么状况下饱满?是否存在其他实例类型,或许比你第一次创立集群时挑选的类型更适宜?EBS 优化实例与 GP2/3 或 IO2 驱动器的混合是否线en 机器(及其带来的优势)有更好的性价比?1.2 存储与网络
紧缩在 Kafka 中并不新鲜,大多数用户现已知道了自己能够在 GZIP、Snappy 和 LZ4 之间做出挑选。但自从KIP-110被合并进 Kafka,并添加了用于 Zstandard 紧缩的紧缩器后,它已完成了明显的功用改善,并且是下降网络本钱的完美办法。
以出产者端略高的 CPU 运用率为价值,你将取得更高的紧缩率并在线上“挤进”更多信息。
Amplitude在他们的帖子中介绍,在切换到 Zstandard 后,他们的带宽运用量减少了三分之二,仅在处理管道上就能够节约每月数万美元的数据传输本钱。
不平衡的集群或许会危害集群功用,导致某些 borker 比其他 broker 的负载更大,让呼应推迟更高,并且在某些状况下会导致这些 broker 的资源饱满,然后导致不用要的扩容,然后会影响集群本钱。
此外,不平衡集群还面对一个危险:在一个 broker 出毛病后呈现更高的 MTTR(例如当该 broker 不用要地持有更多分区时),以及更高的数据丢掉危险(幻想一个仿制因子为 2 的主题,其间一个节点因为发动时要加载的 segment 过多,所以难以发动)。
所谓幂等性,数学概念便是: f(f(x)) = f(x) 。f函数表明对音讯的处理。浅显点来讲便是,在顾客收到重复音讯进行重复处理时,也要确保终究成果的共同性。
将数据库中的多个字段联合,创立一个仅有束缚,即便屡次操作也能确保表里至多存在一条记载(如创立订单、创立账单、创立流水等)。此外,只要是支撑相似“INSERT IF NOT EXIST”语义的存储类体系(如Redis的SETNX)都能够用于完成幂等消费。
给数据改变设置一个前置条件(版别号version、updateTime);
在更新数据的时分,一起改变前置条件中的数据(版别号+1、更新updateTime)。