在apach kafka框架,提交offset,若某个消费组下的某个消费者消费数据顺序分别为1,2,3,如若消费到2时,出现错误,但是错误捕捉了,只是不提交offset的这种情况: 伪代码如下
这种情况下,出现错误时虽然没有提交到offset,但是捕捉了错误,这种情况,后面的消息提交的offset会把之前消费失败的offset覆盖掉,offset提交其实都是累加类似的,(record.offset+1);故即使中间的那个记录未提交offset,后面也不会再消费到了。
但是在spring-kafka框架中,帮我们封装了一些处理逻辑,我们可以方便的实现当出现错误而重试,或者再次消费,或者进入死信队列等,我们需要明白一些情况是如何触发的,以及注意什么。
默认情况下,当我们未主动设置errorHander,系统会有一个默认的
CommonErrorHandler 的 实现
DefaultErrorHandler
其中对发生错误(非格式转换等异常错误)时,会在等待固定间隔发起重试,重试的次数与间隔如下:
默认不配置的重试次数与间隔时间:
具体怎么重试呢,会把失败的这个记录的offset, 重新把offset seek到这个记录这里:
当重试次数完了,会调用recoverer
而当我们不配置时,默认实现为:
仅仅只打印信息就返回了,后续kafka将跳过这个错误的记录,继续消费后面的。
当然我们可以自定义处理重试失败后的操作,官方也提供了很多写好的工具类供我们配置,例如放到死信队列中:
它的处理如下:
画红线的就是发送数据到死信队列中,那么私信队列的topic命名是如下代码实现的:
在原来的topic后面加上.DLT