接口调用并发执行十个任务总结

描述

需求

一个接口调用时,接收到一个列表,十个元素,需要并发执行十个任务,每个任务都要返回执行的结果和异常,然后对返回的结果装填到一个切片列表里,统一返回结果。

需要协程处理的结构体

type Order struct {  
  Name string `json:"name"`  
  Id int `json:"id"`  
}

确定通道数量

一般按入参的需要处理的元素数量为准

taskNum := 10 

初始化通道

orderCh := make(chan Order, taskNum) //接收返回的结果
errCh := make(chan error, taskNum) //接收返回的异常

发起执行,我们使用sync.WaitGroup来监听执行情况

wg := sync.WaitGroup{}
for i:=0; i < taskNum; i++ {
   wg.Add(1)
   go func() {
     defer wg.Done()
     if i == 3 {//模拟当i=3的时候,返回一个异常
         err := errors.New("there is an error")
         errCh <- err 
         return
     }
     //组装返回结果
     res := Order{  
         Name: "num: " + strconv.Itoa(i),  
         Id: i,  
         }
     orderCh <- res    
  }()
}
wg.Wait() //等待所有任务执行完毕

使用for-select接收执行结果

orderList := make([]Order, taskNum)
for i:=0; i

1,超时问题

任务执行过程中,需要控制每个任务的执行时间,不能超过一定范围,我们用定时器来解决这个问题

timeoutTime := time.Second * 3  //超时时间
taskTimer := time.NewTimer(timeoutTime) //初始化定时器
orderList := make([]Order, taskNum)
for i:=0; i

2, 协程panic问题

主程序是无法捕捉协程内的panic,因此如果不手动处理,就会发生协程内panic导致整个程序中止的情况,我们在defer里处理

for i:=0; i < taskNum; i++ {
   wg.Add(1)
   go func() {
     defer func () {
      wg.Done()
      //协程内单独捕捉异常  
      if r := recover(); r != nil {  
        err := errors.New(fmt.Sprintf("System panic:%v", r))  
        errCh <- err //此处将panic信息转为err返回,也可以按需求和异常等级进行处理
        return
      }
     }()
   ........
  }()
}

3, 顺序问题

返回的列表元素的顺序,需要跟传参的列表顺序保持一致,这时我们需要定义个带序号的结构体

// 需要记录原始顺序的时候,定义个带编号的结构体  
type OrderWithSeq struct {  
    Seq int  
    OrderItem Order  
}  
//重写相关排序类型
type BySeq []OrderWithSeq  
  
func (a BySeq) Len() int {  
    return len(a)  
}  
func (a BySeq) Swap(i, j int) {  
    a[i], a[j] = a[j], a[i]  
}  
func (a BySeq) Less(i, j int) bool {  
    return a[i].Seq < a[j].Seq  
}
// 调整返回结果
orderCh := make(chan OrderWithSeq, taskNum) //接收带序号的结构体
//在执行任务时,加入序号
for i:=0; i < taskNum; i++ {
   i:= i
   wg.Add(1)
   go func() {
     ····
     //组装返回结果
     res := Order{  
         Name: "num: " + strconv.Itoa(i),  
         Id: i,  
         }
     orderCh <-OrderWithSeq {
         Seq: i, //带上i这个序号
         OrderItem: res,
     }
  }()
 //接收信息,也按带序号的结构体进行组装
 orderSeqList := make([]OrderWithSeq, taskNum)
 for i:=0; i

总结

标准模板如下:

type Order struct {  
  Name string `json:"name"`  
  Id int `json:"id"`  
}


// 需要记录原始顺序的时候,定义个带编号的结构体  
type OrderWithSeq struct {  
    Seq int  
    OrderItem Order  
}  
//重写相关排序类型
type BySeq []OrderWithSeq  
  
func (a BySeq) Len() int {  
    return len(a)  
}  
func (a BySeq) Swap(i, j int) {  
    a[i], a[j] = a[j], a[i]  
}  
func (a BySeq) Less(i, j int) bool {  
    return a[i].Seq < a[j].Seq  
}


taskNum := 10 
orderCh := make(chan OrderWithSeq, taskNum) //接收带序号的结构体
errCh := make(chan error, taskNum) //接收返回的异常
wg := sync.WaitGroup{}
//在执行任务时,加入序号
for i:=0; i < taskNum; i++ {
   i:= i
   wg.Add(1)
   go func() {
     defer func () {
      wg.Done()
      //协程内单独捕捉异常  
      if r := recover(); r != nil {  
        err := errors.New(fmt.Sprintf("System panic:%v", r))  
        errCh <- err //此处将panic信息转为err返回,也可以按需求和异常等级进行处理
        return
      }
     }()
     //组装返回结果
     res := Order{  
         Name: "num: " + strconv.Itoa(i),  
         Id: i,  
         }
     orderCh <-OrderWithSeq {
         Seq: i, //带上i这个序号
         OrderItem: res,
     }
  }()
 wg.Wait()
  //接收信息,也按带序号的结构体进行组装
 orderSeqList := make([]OrderWithSeq, taskNum)
 timeoutTime := time.Second * 3 
 taskTimer := time.NewTimer(timeoutTime)
 for i:=0; i

                    
                    
                    
                
打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分