前言
在现代的微服务架构中,消息队列已经成为了一个不可或缺的组件。
它能够帮助我们在不同的服务之间传递消息,并且能够确保这些消息不会丢失。
在众多的消息队列中,kafka 是一个非常出色的选择。
它能够处理大量的实时数据,并且提供了强大的持久化能力。
在本文中,我们将会探讨如何在 SpringBoot 中整合 Kafka。
什么是Kafka?
Apache Kafka 是一个开源的流处理平台,由 LinkedIn 团队开发并于 2011 年贡献给 Apache 基金会。Kafka 以其高吞吐量、可扩展性和容错性而闻名。它是一个基于发布/订阅模式的消息系统,通常用于大型实时数据流处理应用。
Kafka 的主要组件包括:
- Producer:负责发布消息到 Kafka 服务器。
- Broker:是 Kafka 服务器实例,负责消息的存储、接收和发送。
- Consumer:从 Kafka 服务器读取消息。
- Topic:消息的类别或者说是消息的标签,Producer 将消息发布到特定的 Topic,Consumer 从特定的 Topic 读取消息。
Kafka 可以在分布式系统中用于构建实时流数据管道,它可以在系统或应用之间可靠地获取数据。此外,Kafka 可以和 Apache Storm、Apache Hadoop、Apache Spark 等进行集成,用于大数据处理和分析。
Kafka的应用场景?
日志收集:
一个公司可能有很多服务器,每个服务器上运行着很多服务,Kafka 可以用来实现这些服务的日志收集功能。各服务的日志分别发送到 Kafka 的不同 Topic 中。
消息系统:
Kafka 能够作为一个大规模的消息处理系统,各生产者将消息发送到 Kafka,消费者从 Kafka 中读取消息进行处理。
用户活动跟踪:
Kafka 也常用于用户活动跟踪和实时分析。例如,用户的点击、搜索等行为可以实时写入到 Kafka,然后进行实时或者离线分析。
在 Kafka 上可以进行实时的流处理。例如,使用 Apache Storm 集成 Kafka 来进行实时的数据处理。
指标和日志聚合:
统计数据和监控数据也是 Kafka 的一个重要应用场景。例如,通过 Kafka 可以收集各种分布式应用的数据,然后进行统一的处理和分析。
事件源:
Kafka 可以作为大规模事件处理的源头,例如,用户的行为、系统的状态等都可以作为事件,通过 Kafka 进行分发处理。
示例
版本依赖
模块 | 版本 |
---|---|
SpringBoot | 3.1.0 |
JDK | 17 |
代码
KafkaConfig
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public KafkaReceiver listener() {
return new KafkaReceiver();
}
}
KafkaSender
@***ponent
@Slf4j
public class KafkaSender {
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
public void send(String topic, String key, String data) {
//发送消息
***pletableFuture<SendResult<String, Object>> ***pletable = kafkaTemplate.send(topic, key, data);
***pletable.when***pleteAsync((result, ex) -> {
if (null == ex) {
log.info(topic + "生产者发送消息成功:" + result.toString());
} else {
log.info(topic + "生产者发送消息失败:" + ex.getMessage());
}
});
}
}
KafkaReceiver
@***ponent
@Slf4j
public class KafkaReceiver {
/**
* 下面的主题是一个数组,可以同时订阅多主题,只需按数组格式即可,也就是用","隔开
*/
@KafkaListener(topics = {"testTopic"})
public void receive(ConsumerRecord<?, ?> record){
log.info("消费者收到的消息key: " + record.key());
log.info("消费者收到的消息value: " + record.value().toString());
}
}
KafkaController
/**
* kafka 测试接口
*/
@RestController
public class KafkaController {
@Autowired
private KafkaSender kafkaSender;
@GetMapping("/sendMessageToKafka")
public String sendMessageToKafka() {
Map<String, String> messageMap = new HashMap();
messageMap.put("message", "hello world!");
ObjectMapper objectMapper = new ObjectMapper();
String data = null;
try {
data = objectMapper.writeValueAsString(messageMap);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
String key = String.valueOf(UUID.randomUUID());
//kakfa的推送消息方法有多种,可以采取带有任务key的,也可以采取不带有的(不带时默认为null)
kafkaSender.send("testTopic", key, data);
return "ok";
}
}
测试
http://127.0.0.1:8080/sendMessageToKafka
遇见问题
Error connecting to node xxxxxx:9092 (id: 0 rack: null)
Error connecting to node iZbp127a9vpra4v3kmkkmzZ:9092 (id: 0 rack: null)
解决方案
修改本地物理机hosts文件。文件目录:C:\Windows\System32\drivers\etc
新增 xx.xx.xx.xx iZbp127a9vpra4v3kmkkmzZ
如果没生效,则需要重启系统
总结
通过上述的步骤,我们已经成功地在 SpringBoot 中整合了 Kafka。
这使得我们的应用程序能够在不同的服务之间传递消息,而不需要担心消息的丢失。
我们也看到,通过使用 SpringBoot,我们可以非常轻松地完成这个过程。
希望这篇文章能够帮助你在自己的项目中更好地使用 Kafka。
源码获取
如果需要完整源码请关注公众号"架构殿堂" ,回复 "SpringBoot+Kafka"即可获得
写在最后
感谢您的支持和鼓励! 😊🙏
如果大家对相关文章感兴趣,可以关注公众号"架构殿堂",会持续更新AIGC,java基础面试题, ***ty, spring boot, spring cloud等系列文章,一系列干货随时送达!