flink如何支持kafka容灾自动切换
背景
在flink消费kafka消息时,我们会指定连接的kafka服务器的地址以及起始消费偏移等信息,一旦指定,当kafka服务器挂掉后,flink也会由于连接不上服务器而导致失败,这里想要解决的问题是当kafka在机房A挂掉后,如果机房B有对kafka进行容灾的频道,那么flink怎么可以做到自动切换到机房B的进行kafka消费?同理,当机房A数据恢复后,如何自动切回到机房A进行消费?这个过程自动发生而不需要手动修改kafka的地址
技术实现
flink消费kafka的实现类是FlinkKafkaConsumerBase,这个类内部有一个功能:可以自动发现满足某个规则的kafka主题并消费,其关键代码如下:
private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {
discoveryLoopThread =
new Thread(
() -> {
try {
// --------------------- partition discovery loop
// ---------------------
// throughout the loop, we always eagerly check if we are still
// running before
// performing the next operation, so that we can escape the loop as
// soon as possible
while (running) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Consumer subtask {} is trying to discover new partitions ...",
getRuntimeContext().getIndexOfThisSubtask());
}
final List<KafkaTopicPartition> discoveredPartitions;
try {
discoveredPartitions =
partitionDiscoverer.discoverPartitions();
} catch (AbstractPartitionDiscoverer.WakeupException
| AbstractPartitionDiscoverer.ClosedException e) {
// the partition discoverer may have been closed or woken up
// before or during the discovery;
// this would only happen if the consumer was canceled;
// simply escape the loop
break;
}
// no need to add the discovered partitions if we were closed
// during the meantime
if (running && !discoveredPartitions.isEmpty()) {
kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
}
// do not waste any time sleeping if we're not running anymore
if (running && discoveryIntervalMillis != 0) {
try {
Thread.sleep(discoveryIntervalMillis);
} catch (InterruptedException iex) {
// may be interrupted if the consumer was canceled
// midway; simply escape the loop
break;
}
}
}
} catch (Exception e) {
discoveryLoopErrorRef.set(e);
} finally {
// calling cancel will also let the fetcher loop escape
// (if not running, cancel() was already called)
if (running) {
cancel();
}
}
},
"Kafka Partition Discovery for "
+ getRuntimeContext().getTaskNameWithSubtasks());
discoveryLoopThread.start();
}
如上所示,他是通过开启一个线程,然后定时检测的方式来发现是否有新的符合规则条件的主题,如果有添加到消费队列中,读者会不会很好奇,我们讨论的是flink如何对kafka进行容灾切换,你和我说这个主题自动发现做什么?
其实这里想表达的是一样的思路,我们进行kafka容灾的切换也是可以这样做,我们开启一个线程,然后线程里面不停的检测当前消费的kafka集群的连通性是否正常,如果连接不上,那么表明发生了kafka的机房容灾,flink需要切换到kafka的机房B进行消费,那么这里剩下的就只是如何确定消费的偏移量的问题了