[Go类库分享]ants——协程池
在现代Go应用开发中,合理管理和复用goroutine对于提升程序性能至关重要。频繁创建销毁goroutine不仅消耗资源,还可能导致系统负载过高。今天分享的ants 是一个高性能的goroutine池库,能够有效地管理和复用goroutines,显著减少内存分配开销并提高并发处理效率。
ants 提供了简单易用的API来管理工作池,并支持多种配置选项,适用于各种并发场景。
官方Github地址:https://github.***/panjf2000/ants
本文全部代码地址:https://github.***/ziyifast/ziyifast-code_instruction/tree/main/go-demo/go-ants
欢迎大家star⭐️~
1. 介绍
ants 是一款用于 goroutine 复用的 Go 语言库,旨在降低 goroutine 创建和销毁带来的性能损耗。通过预先创建一定数量的 goroutine 并放入池中,在需要执行任务时从池中取出空闲 goroutine 来运行任务,从而避免重复创建销毁带来的开销。
核心特征
- 自动调度海量的 goroutines,复用 goroutines
- 定期清理过期的 goroutines,进一步节省资源
- 提供了大量实用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool 等
- 优雅处理 panic,防止程序崩溃
- 资源复用,极大节省内存使用量;在大规模批量并发任务场景下甚至可能比 Go 语言的无限制 goroutine 并发具有更高的性能
- 非阻塞机制
- 预分配内存 (环形队列,可选)
运行流程图
应用场景
- 高并发任务处理
- 需要控制并发数量的场景
- 需要复用goroutine以减少资源消耗的场景
- 需要防止goroutine泄漏的场景
- 需要控制内存使用、保证服务稳定性的场景
2. 安装
//v1版本:
go get -u github.***/panjf2000/ants
//v2版本(需要开启GO111MODULE):
go get -u github.***/panjf2000/ants/v2
3. API介绍
ants.NewPool:创建工作池
创建一个指定容量的工作池,后续提交的任务将在池中的goroutine上异步执行
//Case 创建协程池:创建一个最大容量为10的协程池
pool, _ := ants.NewPool(10)
defer pool.Release() // 释放协程池
fmt.Println("pool=", pool)
pool.Submit:提交任务到工作池
将函数作为任务提交至工作池中执行
//Case 提交任务
task := func() {
fmt.Println("hello world...")
}
_ = pool.Submit(task)
ants.NewPoolWithFunc:带预定义函数的工作池
创建一个带有固定处理函数的工作池,每次只需要传入参数即可
//Case 创建带预定义函数的工作池。然后通过invoke去调用触发
handler := func(data interface{}) {
n := data.(int)
fmt.Printf("Processing number: %d\n", n)
}
p, _ := ants.NewPoolWithFunc(500, handler)
defer p.Release()
for i := 0; i < 10; i++ {
_ = p.Invoke(i)
}
pool.Tune:动态调节池容量
运行期间根据需求动态修改池的最大容量
//Case 动态调整协程池大小
p, _ := ants.NewPool(100)
p.Tune(200) // 调整池子大小为200
ants.WithPanicHandler:panic处理handler
当协程执行任务发生panic时,会调用panicHandler方法
panicHandler := func(p interface{}) {
fmt.Printf("Worker exits from panic: %v\n", p)
// Log the panic or send alert
}
pool, _ := ants.NewPool(10, ants.WithPanicHandler(panicHandler))
defer pool.Release()
4. 常见用法
基础并发任务处理
利用ants实现高效的并发任务调度,例如批量数据处理、网络请求等
var wg sync.WaitGroup
// 定义任务逻辑
demoFunc := func() {
defer wg.Done()
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
fmt.Println("Task executed by worker in pool")
}
// 初始化池
p, _ := ants.NewPool(1000)
defer p.Release()
// 提交多个并发任务
for i := 0; i < 100; i++ {
wg.Add(1)
_ = p.Submit(demoFunc)
}
wg.Wait()
批量任务处理优化
结合NewPoolWithFunc优化同类型任务处理流程
var sum int64
var mu sync.Mutex
// 固定任务处理函数
incr := func(i interface{}) {
n := i.(int64)
mu.Lock()
sum += n
mu.Unlock()
}
// 创建带处理函数的池
p, _ := ants.NewPoolWithFunc(1000, incr)
defer p.Release()
// 快速处理大量相似任务
for i := 0; i < 1000000; i++ {
_ = p.Invoke(int64(i))
}
p.Wait() // 等待所有任务完成
fmt.Printf("Final result: %d\n", sum)
5. 案例实战
📢注意:此处主要为了展示实际的应用场景。整体流程不规范,比如当处理发生错误时,需要对应处理(补偿、上报等)
package main
import (
"context"
"fmt"
"github.***/panjf2000/ants/v2"
"log"
"math/rand"
"runtime"
"sync"
"sync/atomic"
"time"
)
// OrderProcessor 订单处理器
type OrderProcessor struct {
// 不同业务类型的专用协程池
orderValidationPool *ants.Pool // 订单验证池
paymentProcessPool *ants.Pool // 支付处理池
inventoryUpdatePool *ants.Pool // 库存更新池
emailNotifyPool *ants.Pool // 邮件通知池
// 统计数据
totalProcessed int64
su***essCount int64
errorCount int64
}
// Order 订单结构
type Order struct {
ID string
UserID int64
Products []ProductItem
TotalAmount float64
Status string
CreatedAt time.Time
}
// ProductItem 商品项
type ProductItem struct {
ProductID int64
Quantity int
Price float64
}
// NewOrderProcessor 创建订单处理器
func NewOrderProcessor() *OrderProcessor {
return &OrderProcessor{
// 根据业务特点分配不同容量的协程池
orderValidationPool: createPoolWithConfig(500), // 订单验证较轻量
paymentProcessPool: createPoolWithConfig(200), // 支付涉及外部系统,限制并发
inventoryUpdatePool: createPoolWithConfig(300), // 库存更新需要控制
emailNotifyPool: createPoolWithConfig(100), // 邮件通知可以较低优先级
}
}
// createPoolWithConfig 创建带配置的协程池.具体配置结合自身业务场景
func createPoolWithConfig(size int) *ants.Pool {
pool, err := ants.NewPool(size,
ants.WithExpiryDuration(30*time.Second), // 协程空闲30秒后回收
ants.WithNonblocking(false)) // 阻塞模式,排队等待而非拒绝
if err != nil {
log.Fatal("创建协程池失败:", err)
}
return pool
}
// ProcessOrder 处理订单
func (op *OrderProcessor) ProcessOrder(order *Order) error {
atomic.AddInt64(&op.totalProcessed, 1)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
var wg sync.WaitGroup
var errors []error
var mu sync.Mutex
// 1. 订单验证 - 使用专用验证池
wg.Add(1)
err := op.orderValidationPool.Submit(func() {
defer wg.Done()
if err := op.validateOrder(ctx, order); err != nil {
mu.Lock()
errors = append(errors, fmt.Errorf("验证失败: %v", err))
mu.Unlock()
}
})
if err != nil {
return fmt.Errorf("验证任务提交失败: %v", err)
}
// 2. 支付处理 - 使用专用支付池
wg.Add(1)
err = op.paymentProcessPool.Submit(func() {
defer wg.Done()
if err := op.processPayment(ctx, order); err != nil {
mu.Lock()
errors = append(errors, fmt.Errorf("支付失败: %v", err))
mu.Unlock()
}
})
if err != nil {
return fmt.Errorf("支付任务提交失败: %v", err)
}
// 等待关键步骤完成
wg.Wait()
// 检查是否有错误
if len(errors) > 0 {
atomic.AddInt64(&op.errorCount, 1)
order.Status = "failed"
return errors[0] // 返回第一个错误
}
// 3. 异步更新库存 - 使用库存池
go func() {
_ = op.inventoryUpdatePool.Submit(func() {
op.updateInventory(ctx, order)
})
}()
// 4. 异步发送邮件 - 使用邮件池
go func() {
_ = op.emailNotifyPool.Submit(func() {
op.sendEmailNotification(ctx, order)
})
}()
order.Status = "***pleted"
atomic.AddInt64(&op.su***essCount, 1)
return nil
}
// validateOrder 订单验证
func (op *OrderProcessor) validateOrder(ctx context.Context, order *Order) error {
// 模拟数据库查询和业务验证
time.Sleep(time.Duration(rand.Intn(50)+10) * time.Millisecond)
// 模拟验证逻辑
if len(order.Products) == 0 {
return fmt.Errorf("订单商品不能为空")
}
for _, item := range order.Products {
if item.Quantity <= 0 {
return fmt.Errorf("商品数量必须大于0")
}
}
log.Printf("订单%s验证通过", order.ID)
return nil
}
// processPayment 处理支付
func (op *OrderProcessor) processPayment(ctx context.Context, order *Order) error {
// 模拟调用第三方支付接口
time.Sleep(time.Duration(rand.Intn(200)+50) * time.Millisecond)
// 模拟支付成功率
if rand.Float32() < 0.02 { // 2%失败率
return fmt.Errorf("支付网关超时")
}
log.Printf("订单%s支付成功,金额: %.2f", order.ID, order.TotalAmount)
return nil
}
// updateInventory 更新库存
func (op *OrderProcessor) updateInventory(ctx context.Context, order *Order) {
// 模拟库存系统更新
time.Sleep(time.Duration(rand.Intn(100)+20) * time.Millisecond)
log.Printf("订单%s库存更新完成", order.ID)
}
// sendEmailNotification 发送邮件通知
func (op *OrderProcessor) sendEmailNotification(ctx context.Context, order *Order) {
// 模拟邮件发送
time.Sleep(time.Duration(rand.Intn(300)+50) * time.Millisecond)
log.Printf("订单%s邮件通知发送完成", order.ID)
}
// GetStats 获取处理统计
func (op *OrderProcessor) GetStats() map[string]int64 {
return map[string]int64{
"total_processed": atomic.LoadInt64(&op.totalProcessed),
"su***ess_count": atomic.LoadInt64(&op.su***essCount),
"error_count": atomic.LoadInt64(&op.errorCount),
}
}
// GetPoolStats 获取协程池状态
func (op *OrderProcessor) GetPoolStats() map[string]map[string]int {
return map[string]map[string]int{
"validation_pool": {
"capacity": op.orderValidationPool.Cap(),
"running": op.orderValidationPool.Running(),
},
"payment_pool": {
"capacity": op.paymentProcessPool.Cap(),
"running": op.paymentProcessPool.Running(),
},
"inventory_pool": {
"capacity": op.inventoryUpdatePool.Cap(),
"running": op.inventoryUpdatePool.Running(),
},
"email_pool": {
"capacity": op.emailNotifyPool.Cap(),
"running": op.emailNotifyPool.Running(),
},
}
}
// Close 关闭所有协程池
func (op *OrderProcessor) Close() {
op.orderValidationPool.Release()
op.paymentProcessPool.Release()
op.inventoryUpdatePool.Release()
op.emailNotifyPool.Release()
}
// generateTestOrders 生成测试订单
func generateTestOrders(count int) []*Order {
orders := make([]*Order, count)
for i := 0; i < count; i++ {
orders[i] = &Order{
ID: fmt.Sprintf("ORDER_%06d", i),
UserID: int64(rand.Intn(10000) + 1),
Products: []ProductItem{
{ProductID: int64(rand.Intn(1000) + 1), Quantity: rand.Intn(5) + 1, Price: float64(rand.Intn(1000))/10 + 10},
},
TotalAmount: float64(rand.Intn(5000))/10 + 50,
CreatedAt: time.Now(),
}
}
return orders
}
func main() {
fmt.Println("=== 电商订单处理系统 - ants企业级应用 ===")
fmt.Printf("CPU核心数: %d\n", runtime.NumCPU())
fmt.Printf("初始goroutine数: %d\n\n", runtime.NumGoroutine())
// 创建订单处理器
processor := NewOrderProcessor()
defer processor.Close()
// 生成测试订单
testOrders := generateTestOrders(5000)
fmt.Printf("生成测试订单: %d个\n", len(testOrders))
startTime := time.Now()
// 模拟高并发订单处理
var wg sync.WaitGroup
// 使用多个goroutine并发提交订单处理任务
workerCount := 100
ordersPerWorker := len(testOrders) / workerCount
for worker := 0; worker < workerCount; worker++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
startIdx := workerID * ordersPerWorker
endIdx := startIdx + ordersPerWorker
if workerID == workerCount-1 {
endIdx = len(testOrders) // 处理余数
}
for i := startIdx; i < endIdx; i++ {
err := processor.ProcessOrder(testOrders[i])
if err != nil {
log.Printf("Worker%d: 订单%s处理失败: %v", workerID, testOrders[i].ID, err)
//todo 记录错误,进行补偿/上报/其他处理,此处为了代码简单,不做其他逻辑
}
// 模拟订单到达间隔
if i%100 == 0 {
time.Sleep(time.Millisecond)
}
}
}(worker)
}
wg.Wait()
processingTime := time.Since(startTime)
// 输出结果
fmt.Println("\n=== 处理结果统计 ===")
stats := processor.GetStats()
fmt.Printf("总处理时间: %v\n", processingTime)
fmt.Printf("处理订单总数: %d\n", stats["total_processed"])
fmt.Printf("成功处理: %d\n", stats["su***ess_count"])
fmt.Printf("处理失败: %d\n", stats["error_count"])
fmt.Printf("成功率: %.2f%%\n",
float64(stats["su***ess_count"])/float64(stats["total_processed"])*100)
fmt.Printf("处理吞吐量: %.2f 订单/秒\n",
float64(stats["total_processed"])/processingTime.Seconds())
// 协程池状态
fmt.Println("\n=== 协程池状态 ===")
poolStats := processor.GetPoolStats()
for poolName, stat := range poolStats {
fmt.Printf("%s: 容量=%d, 运行中=%d\n",
poolName, stat["capacity"], stat["running"])
}
// 内存使用情况
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("\n内存使用: %d MB\n", m.Alloc/1024/1024)
fmt.Printf("最终goroutine数: %d\n", runtime.NumGoroutine())
}
6. 原生协程 vs 协程池
1. 原生协程。优势:
- 代码简洁直观
- Go调度器已优化到纳秒级切换
- GC处理小对象效率极高
// 典型场景:短平快任务
for i := 0; i < 10000; i++ {
go process(i) // Go自己调度完全没问题!
}
2. 协程池。优势:
- 内存控制:防止百万级goroutine吃光内存(每个至少2KB → 200MB起)
最大协程数 = (可用内存 × 0.8) / 预估单协程峰值内存,例:可用4G → 4×0.8/0.008=400 (保险起见设300) - 资源隔离:关键业务不受突发流量冲击
- 优雅退出:统一关闭所有worker确保任务完成
// 典型场景:长生命周期任务
pool := ants.NewPool(1000) // 限制最大并发
for req := range requests {
pool.Submit(handleRequest) // 超出容量自动阻塞/拒绝
}
性能实测对比(基于 ants 库)
| 指标 | 裸跑 goroutine | 协程池 1000 workers |
|---|---|---|
| 10w短任务耗时 | 约0.8s | 约1.2s |
| 内存峰值 | 1.2GB | 200MB |
| GC停顿 | 26ms+ | <5ms |
| 响应延迟 | 波动较大 | 平稳如狗 |
Q:是否使用协程池,什么时候使用协程池?
A:
- 默认可不用:Go1.22+的调度器已经能处理百万级goroutine
- 这些情况下可以使用协程池:
- IoT设备等内存敏感环境
- 需要实现优先级队列等高级调度
- Web服务要防雪崩(如电商大促)
…
参考文章:
https://mp.weixin.qq.***/s/nHX8Z6Okg74UR_z2dSbWNQ
https://github.***/panjf2000/ants