[Go类库分享]ants——协程池

[Go类库分享]ants——协程池

[Go类库分享]ants——协程池

在现代Go应用开发中,合理管理和复用goroutine对于提升程序性能至关重要。频繁创建销毁goroutine不仅消耗资源,还可能导致系统负载过高。今天分享的ants 是一个高性能的goroutine池库,能够有效地管理和复用goroutines,显著减少内存分配开销并提高并发处理效率。

ants 提供了简单易用的API来管理工作池,并支持多种配置选项,适用于各种并发场景。

官方Github地址:https://github.***/panjf2000/ants

  • 本文全部代码地址:https://github.***/ziyifast/ziyifast-code_instruction/tree/main/go-demo/go-ants
    欢迎大家star⭐️~

1. 介绍

ants 是一款用于 goroutine 复用的 Go 语言库,旨在降低 goroutine 创建和销毁带来的性能损耗。通过预先创建一定数量的 goroutine 并放入池中,在需要执行任务时从池中取出空闲 goroutine 来运行任务,从而避免重复创建销毁带来的开销。

核心特征

  • 自动调度海量的 goroutines,复用 goroutines
  • 定期清理过期的 goroutines,进一步节省资源
  • 提供了大量实用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool 等
  • 优雅处理 panic,防止程序崩溃
  • 资源复用,极大节省内存使用量;在大规模批量并发任务场景下甚至可能比 Go 语言的无限制 goroutine 并发具有更高的性能
  • 非阻塞机制
  • 预分配内存 (环形队列,可选)

运行流程图

应用场景

  • 高并发任务处理
  • 需要控制并发数量的场景
  • 需要复用goroutine以减少资源消耗的场景
  • 需要防止goroutine泄漏的场景
  • 需要控制内存使用、保证服务稳定性的场景

2. 安装

//v1版本:
go get -u github.***/panjf2000/ants

//v2版本(需要开启GO111MODULE):
go get -u github.***/panjf2000/ants/v2

3. API介绍

ants.NewPool:创建工作池

创建一个指定容量的工作池,后续提交的任务将在池中的goroutine上异步执行

//Case 创建协程池:创建一个最大容量为10的协程池
pool, _ := ants.NewPool(10)
defer pool.Release() // 释放协程池
fmt.Println("pool=", pool)

pool.Submit:提交任务到工作池

将函数作为任务提交至工作池中执行

//Case 提交任务
task := func() {
    fmt.Println("hello world...")
}
_ = pool.Submit(task)

ants.NewPoolWithFunc:带预定义函数的工作池

创建一个带有固定处理函数的工作池,每次只需要传入参数即可

//Case 创建带预定义函数的工作池。然后通过invoke去调用触发
handler := func(data interface{}) {
    n := data.(int)
    fmt.Printf("Processing number: %d\n", n)
}
p, _ := ants.NewPoolWithFunc(500, handler)
defer p.Release()
for i := 0; i < 10; i++ {
    _ = p.Invoke(i)
}

pool.Tune:动态调节池容量

运行期间根据需求动态修改池的最大容量

//Case 动态调整协程池大小
p, _ := ants.NewPool(100)
p.Tune(200) // 调整池子大小为200

ants.WithPanicHandler:panic处理handler

当协程执行任务发生panic时,会调用panicHandler方法

panicHandler := func(p interface{}) {
    fmt.Printf("Worker exits from panic: %v\n", p)
    // Log the panic or send alert
}
pool, _ := ants.NewPool(10, ants.WithPanicHandler(panicHandler))
defer pool.Release()

4. 常见用法

基础并发任务处理

利用ants实现高效的并发任务调度,例如批量数据处理、网络请求等

var wg sync.WaitGroup

// 定义任务逻辑
demoFunc := func() {
    defer wg.Done()
    time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
    fmt.Println("Task executed by worker in pool")
}

// 初始化池
p, _ := ants.NewPool(1000)
defer p.Release()

// 提交多个并发任务
for i := 0; i < 100; i++ {
    wg.Add(1)
    _ = p.Submit(demoFunc)
}
wg.Wait()

批量任务处理优化

结合NewPoolWithFunc优化同类型任务处理流程

var sum int64
var mu sync.Mutex

// 固定任务处理函数
incr := func(i interface{}) {
    n := i.(int64)
    mu.Lock()
    sum += n
    mu.Unlock()
}

// 创建带处理函数的池
p, _ := ants.NewPoolWithFunc(1000, incr)
defer p.Release()

// 快速处理大量相似任务
for i := 0; i < 1000000; i++ {
    _ = p.Invoke(int64(i))
}
p.Wait() // 等待所有任务完成
fmt.Printf("Final result: %d\n", sum)

5. 案例实战

📢注意:此处主要为了展示实际的应用场景。整体流程不规范,比如当处理发生错误时,需要对应处理(补偿、上报等)

package main

import (
    "context"
    "fmt"
    "github.***/panjf2000/ants/v2"
    "log"
    "math/rand"
    "runtime"
    "sync"
    "sync/atomic"
    "time"
)

// OrderProcessor 订单处理器
type OrderProcessor struct {
    // 不同业务类型的专用协程池
    orderValidationPool *ants.Pool // 订单验证池
    paymentProcessPool  *ants.Pool // 支付处理池
    inventoryUpdatePool *ants.Pool // 库存更新池
    emailNotifyPool     *ants.Pool // 邮件通知池

    // 统计数据
    totalProcessed int64
    su***essCount   int64
    errorCount     int64
}

// Order 订单结构
type Order struct {
    ID          string
    UserID      int64
    Products    []ProductItem
    TotalAmount float64
    Status      string
    CreatedAt   time.Time
}

// ProductItem 商品项
type ProductItem struct {
    ProductID int64
    Quantity  int
    Price     float64
}

// NewOrderProcessor 创建订单处理器
func NewOrderProcessor() *OrderProcessor {
    return &OrderProcessor{
       // 根据业务特点分配不同容量的协程池
       orderValidationPool: createPoolWithConfig(500), // 订单验证较轻量
       paymentProcessPool:  createPoolWithConfig(200), // 支付涉及外部系统,限制并发
       inventoryUpdatePool: createPoolWithConfig(300), // 库存更新需要控制
       emailNotifyPool:     createPoolWithConfig(100), // 邮件通知可以较低优先级
    }
}

// createPoolWithConfig 创建带配置的协程池.具体配置结合自身业务场景
func createPoolWithConfig(size int) *ants.Pool {
    pool, err := ants.NewPool(size,
       ants.WithExpiryDuration(30*time.Second), // 协程空闲30秒后回收
       ants.WithNonblocking(false))             // 阻塞模式,排队等待而非拒绝
    if err != nil {
       log.Fatal("创建协程池失败:", err)
    }
    return pool
}

// ProcessOrder 处理订单
func (op *OrderProcessor) ProcessOrder(order *Order) error {
    atomic.AddInt64(&op.totalProcessed, 1)

    ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
    defer cancel()

    var wg sync.WaitGroup
    var errors []error
    var mu sync.Mutex

    // 1. 订单验证 - 使用专用验证池
    wg.Add(1)
    err := op.orderValidationPool.Submit(func() {
       defer wg.Done()
       if err := op.validateOrder(ctx, order); err != nil {
          mu.Lock()
          errors = append(errors, fmt.Errorf("验证失败: %v", err))
          mu.Unlock()
       }
    })
    if err != nil {
       return fmt.Errorf("验证任务提交失败: %v", err)
    }

    // 2. 支付处理 - 使用专用支付池
    wg.Add(1)
    err = op.paymentProcessPool.Submit(func() {
       defer wg.Done()
       if err := op.processPayment(ctx, order); err != nil {
          mu.Lock()
          errors = append(errors, fmt.Errorf("支付失败: %v", err))
          mu.Unlock()
       }
    })
    if err != nil {
       return fmt.Errorf("支付任务提交失败: %v", err)
    }

    // 等待关键步骤完成
    wg.Wait()

    // 检查是否有错误
    if len(errors) > 0 {
       atomic.AddInt64(&op.errorCount, 1)
       order.Status = "failed"
       return errors[0] // 返回第一个错误
    }

    // 3. 异步更新库存 - 使用库存池
    go func() {
       _ = op.inventoryUpdatePool.Submit(func() {
          op.updateInventory(ctx, order)
       })
    }()

    // 4. 异步发送邮件 - 使用邮件池
    go func() {
       _ = op.emailNotifyPool.Submit(func() {
          op.sendEmailNotification(ctx, order)
       })
    }()

    order.Status = "***pleted"
    atomic.AddInt64(&op.su***essCount, 1)
    return nil
}

// validateOrder 订单验证
func (op *OrderProcessor) validateOrder(ctx context.Context, order *Order) error {
    // 模拟数据库查询和业务验证
    time.Sleep(time.Duration(rand.Intn(50)+10) * time.Millisecond)

    // 模拟验证逻辑
    if len(order.Products) == 0 {
       return fmt.Errorf("订单商品不能为空")
    }

    for _, item := range order.Products {
       if item.Quantity <= 0 {
          return fmt.Errorf("商品数量必须大于0")
       }
    }

    log.Printf("订单%s验证通过", order.ID)
    return nil
}

// processPayment 处理支付
func (op *OrderProcessor) processPayment(ctx context.Context, order *Order) error {
    // 模拟调用第三方支付接口
    time.Sleep(time.Duration(rand.Intn(200)+50) * time.Millisecond)

    // 模拟支付成功率
    if rand.Float32() < 0.02 { // 2%失败率
       return fmt.Errorf("支付网关超时")
    }

    log.Printf("订单%s支付成功,金额: %.2f", order.ID, order.TotalAmount)
    return nil
}

// updateInventory 更新库存
func (op *OrderProcessor) updateInventory(ctx context.Context, order *Order) {
    // 模拟库存系统更新
    time.Sleep(time.Duration(rand.Intn(100)+20) * time.Millisecond)
    log.Printf("订单%s库存更新完成", order.ID)
}

// sendEmailNotification 发送邮件通知
func (op *OrderProcessor) sendEmailNotification(ctx context.Context, order *Order) {
    // 模拟邮件发送
    time.Sleep(time.Duration(rand.Intn(300)+50) * time.Millisecond)
    log.Printf("订单%s邮件通知发送完成", order.ID)
}

// GetStats 获取处理统计
func (op *OrderProcessor) GetStats() map[string]int64 {
    return map[string]int64{
       "total_processed": atomic.LoadInt64(&op.totalProcessed),
       "su***ess_count":   atomic.LoadInt64(&op.su***essCount),
       "error_count":     atomic.LoadInt64(&op.errorCount),
    }
}

// GetPoolStats 获取协程池状态
func (op *OrderProcessor) GetPoolStats() map[string]map[string]int {
    return map[string]map[string]int{
       "validation_pool": {
          "capacity": op.orderValidationPool.Cap(),
          "running":  op.orderValidationPool.Running(),
       },
       "payment_pool": {
          "capacity": op.paymentProcessPool.Cap(),
          "running":  op.paymentProcessPool.Running(),
       },
       "inventory_pool": {
          "capacity": op.inventoryUpdatePool.Cap(),
          "running":  op.inventoryUpdatePool.Running(),
       },
       "email_pool": {
          "capacity": op.emailNotifyPool.Cap(),
          "running":  op.emailNotifyPool.Running(),
       },
    }
}

// Close 关闭所有协程池
func (op *OrderProcessor) Close() {
    op.orderValidationPool.Release()
    op.paymentProcessPool.Release()
    op.inventoryUpdatePool.Release()
    op.emailNotifyPool.Release()
}

// generateTestOrders 生成测试订单
func generateTestOrders(count int) []*Order {
    orders := make([]*Order, count)
    for i := 0; i < count; i++ {
       orders[i] = &Order{
          ID:     fmt.Sprintf("ORDER_%06d", i),
          UserID: int64(rand.Intn(10000) + 1),
          Products: []ProductItem{
             {ProductID: int64(rand.Intn(1000) + 1), Quantity: rand.Intn(5) + 1, Price: float64(rand.Intn(1000))/10 + 10},
          },
          TotalAmount: float64(rand.Intn(5000))/10 + 50,
          CreatedAt:   time.Now(),
       }
    }
    return orders
}

func main() {
    fmt.Println("=== 电商订单处理系统 - ants企业级应用 ===")
    fmt.Printf("CPU核心数: %d\n", runtime.NumCPU())
    fmt.Printf("初始goroutine数: %d\n\n", runtime.NumGoroutine())

    // 创建订单处理器
    processor := NewOrderProcessor()
    defer processor.Close()

    // 生成测试订单
    testOrders := generateTestOrders(5000)
    fmt.Printf("生成测试订单: %d个\n", len(testOrders))

    startTime := time.Now()

    // 模拟高并发订单处理
    var wg sync.WaitGroup

    // 使用多个goroutine并发提交订单处理任务
    workerCount := 100
    ordersPerWorker := len(testOrders) / workerCount

    for worker := 0; worker < workerCount; worker++ {
       wg.Add(1)
       go func(workerID int) {
          defer wg.Done()

          startIdx := workerID * ordersPerWorker
          endIdx := startIdx + ordersPerWorker
          if workerID == workerCount-1 {
             endIdx = len(testOrders) // 处理余数
          }

          for i := startIdx; i < endIdx; i++ {
             err := processor.ProcessOrder(testOrders[i])
             if err != nil {
                log.Printf("Worker%d: 订单%s处理失败: %v", workerID, testOrders[i].ID, err)
                //todo 记录错误,进行补偿/上报/其他处理,此处为了代码简单,不做其他逻辑
             }

             // 模拟订单到达间隔
             if i%100 == 0 {
                time.Sleep(time.Millisecond)
             }
          }
       }(worker)
    }

    wg.Wait()

    processingTime := time.Since(startTime)

    // 输出结果
    fmt.Println("\n=== 处理结果统计 ===")
    stats := processor.GetStats()
    fmt.Printf("总处理时间: %v\n", processingTime)
    fmt.Printf("处理订单总数: %d\n", stats["total_processed"])
    fmt.Printf("成功处理: %d\n", stats["su***ess_count"])
    fmt.Printf("处理失败: %d\n", stats["error_count"])
    fmt.Printf("成功率: %.2f%%\n",
       float64(stats["su***ess_count"])/float64(stats["total_processed"])*100)
    fmt.Printf("处理吞吐量: %.2f 订单/秒\n",
       float64(stats["total_processed"])/processingTime.Seconds())

    // 协程池状态
    fmt.Println("\n=== 协程池状态 ===")
    poolStats := processor.GetPoolStats()
    for poolName, stat := range poolStats {
       fmt.Printf("%s: 容量=%d, 运行中=%d\n",
          poolName, stat["capacity"], stat["running"])
    }

    // 内存使用情况
    var m runtime.MemStats
    runtime.ReadMemStats(&m)
    fmt.Printf("\n内存使用: %d MB\n", m.Alloc/1024/1024)
    fmt.Printf("最终goroutine数: %d\n", runtime.NumGoroutine())
}

6. 原生协程 vs 协程池

1. 原生协程。优势:

  • 代码简洁直观
  • Go调度器已优化到纳秒级切换
  • GC处理小对象效率极高
// 典型场景:短平快任务
for i := 0; i < 10000; i++ {
    go process(i) // Go自己调度完全没问题!
}

2. 协程池。优势:

  • 内存控制:防止百万级goroutine吃光内存(每个至少2KB → 200MB起)
    最大协程数 = (可用内存 × 0.8) / 预估单协程峰值内存,例:可用4G → 4×0.8/0.008=400 (保险起见设300)
  • 资源隔离:关键业务不受突发流量冲击
  • 优雅退出:统一关闭所有worker确保任务完成
// 典型场景:长生命周期任务
pool := ants.NewPool(1000) // 限制最大并发
for req := range requests {
    pool.Submit(handleRequest) // 超出容量自动阻塞/拒绝
}

性能实测对比(基于 ants 库)

指标 裸跑 goroutine 协程池 1000 workers
10w短任务耗时 约0.8s 约1.2s
内存峰值 1.2GB 200MB
GC停顿 26ms+ <5ms
响应延迟 波动较大 平稳如狗

Q:是否使用协程池,什么时候使用协程池?
A:

  • 默认可不用:Go1.22+的调度器已经能处理百万级goroutine
  • 这些情况下可以使用协程池:
    • IoT设备等内存敏感环境
    • 需要实现优先级队列等高级调度
    • Web服务要防雪崩(如电商大促)

参考文章:
https://mp.weixin.qq.***/s/nHX8Z6Okg74UR_z2dSbWNQ
https://github.***/panjf2000/ants

转载请说明出处内容投诉
CSS教程网 » [Go类库分享]ants——协程池

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买