一个异步架构设计:批量消费RabbitMQ,批量写入Elasticsearch(golang实现)

       在数仓团队,异步任务设计非常常见,主要原因就是数据量太大,不适合做成同步,在自动驾驶这个业务上,数据大到什么程度呢,单模块每天标签的上报数量就能达到5000W,如果算上车端挖掘、云端挖掘、标注、数据生产、仿真等,每天标签上亿,那是太简单的事了,因此,为了高效完成数据的入库,一个稳定、高吞吐量的异步架构设计显得非常之重要。

配图:北魏家宴饭店  摄影 by 棉花糖

在之前的一篇文章中介绍过异步设计的思想:一个优秀的rabbitmq消费者(consumer)设计,可直接上线使用。这篇文章就是上一篇文章思想的具体落地:

这次就是把上面架构图中消费者部分的批量处理具体为批量写入elasticsearch,这是一种常见的行为,具体场景就是为自动驾驶入库数据创建索引,以支持检索,方便各业务低成本获取所需的数据。

1、RabbitMQ消费者实现

rabbitmq消费者实现的一个核心设计思想:数据消费与业务逻辑处理解耦,即接收数据只从mq中获取数据,不做任何其他事宜,业务逻辑处理放到其他线程中执行。

业务逻辑的处理放到主线程中进行,而且要批量化处理,这样才能实现高吞吐量。

上面数据接收,是把mq的消息放到了一个buffer中,业务获取数据就可以直接从该buffer中读取:

批量读取到数据后,就可以批量处理,比如建索引,写es。

2、Elasticsearch的批量写入

实现es批量写入的核心代码如下:

func (b *EsBatchInsert) BatchInSert(docs []Document) error {
	var json = jsoniter.Config***patibleWithStandardLibrary
	buf := bytes.Buffer{}
	for _, doc := range docs {
		meta := map[string]any{
			"index": map[string]any{
				"_index": doc.Index,
				"_id":    doc.ID,
			},
		}
		if err := json.Ne
转载请说明出处内容投诉
CSS教程网 » 一个异步架构设计:批量消费RabbitMQ,批量写入Elasticsearch(golang实现)

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买