需求
一个接口调用时,接收到一个列表,十个元素,需要并发执行十个任务,每个任务都要返回执行的结果和异常,然后对返回的结果装填到一个切片列表里,统一返回结果。
需要协程处理的结构体
type Order struct { Name string `json:"name"` Id int `json:"id"` }
确定通道数量
一般按入参的需要处理的元素数量为准
taskNum := 10
初始化通道
orderCh := make(chan Order, taskNum) //接收返回的结果 errCh := make(chan error, taskNum) //接收返回的异常
发起执行,我们使用sync.WaitGroup来监听执行情况
wg := sync.WaitGroup{} for i:=0; i < taskNum; i++ { wg.Add(1) go func() { defer wg.Done() if i == 3 {//模拟当i=3的时候,返回一个异常 err := errors.New("there is an error") errCh <- err return } //组装返回结果 res := Order{ Name: "num: " + strconv.Itoa(i), Id: i, } orderCh <- res }() } wg.Wait() //等待所有任务执行完毕
使用for-select接收执行结果
orderList := make([]Order, taskNum) for i:=0; i1,超时问题
任务执行过程中,需要控制每个任务的执行时间,不能超过一定范围,我们用定时器来解决这个问题
timeoutTime := time.Second * 3 //超时时间 taskTimer := time.NewTimer(timeoutTime) //初始化定时器 orderList := make([]Order, taskNum) for i:=0; i2, 协程panic问题
主程序是无法捕捉协程内的panic,因此如果不手动处理,就会发生协程内panic导致整个程序中止的情况,我们在defer里处理
for i:=0; i < taskNum; i++ { wg.Add(1) go func() { defer func () { wg.Done() //协程内单独捕捉异常 if r := recover(); r != nil { err := errors.New(fmt.Sprintf("System panic:%v", r)) errCh <- err //此处将panic信息转为err返回,也可以按需求和异常等级进行处理 return } }() ........ }() }3, 顺序问题
返回的列表元素的顺序,需要跟传参的列表顺序保持一致,这时我们需要定义个带序号的结构体
// 需要记录原始顺序的时候,定义个带编号的结构体 type OrderWithSeq struct { Seq int OrderItem Order } //重写相关排序类型 type BySeq []OrderWithSeq func (a BySeq) Len() int { return len(a) } func (a BySeq) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a BySeq) Less(i, j int) bool { return a[i].Seq < a[j].Seq } // 调整返回结果 orderCh := make(chan OrderWithSeq, taskNum) //接收带序号的结构体 //在执行任务时,加入序号 for i:=0; i < taskNum; i++ { i:= i wg.Add(1) go func() { ···· //组装返回结果 res := Order{ Name: "num: " + strconv.Itoa(i), Id: i, } orderCh <-OrderWithSeq { Seq: i, //带上i这个序号 OrderItem: res, } }() //接收信息,也按带序号的结构体进行组装 orderSeqList := make([]OrderWithSeq, taskNum) for i:=0; i总结
标准模板如下:
type Order struct { Name string `json:"name"` Id int `json:"id"` } // 需要记录原始顺序的时候,定义个带编号的结构体 type OrderWithSeq struct { Seq int OrderItem Order } //重写相关排序类型 type BySeq []OrderWithSeq func (a BySeq) Len() int { return len(a) } func (a BySeq) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a BySeq) Less(i, j int) bool { return a[i].Seq < a[j].Seq } taskNum := 10 orderCh := make(chan OrderWithSeq, taskNum) //接收带序号的结构体 errCh := make(chan error, taskNum) //接收返回的异常 wg := sync.WaitGroup{} //在执行任务时,加入序号 for i:=0; i < taskNum; i++ { i:= i wg.Add(1) go func() { defer func () { wg.Done() //协程内单独捕捉异常 if r := recover(); r != nil { err := errors.New(fmt.Sprintf("System panic:%v", r)) errCh <- err //此处将panic信息转为err返回,也可以按需求和异常等级进行处理 return } }() //组装返回结果 res := Order{ Name: "num: " + strconv.Itoa(i), Id: i, } orderCh <-OrderWithSeq { Seq: i, //带上i这个序号 OrderItem: res, } }() wg.Wait() //接收信息,也按带序号的结构体进行组装 orderSeqList := make([]OrderWithSeq, taskNum) timeoutTime := time.Second * 3 taskTimer := time.NewTimer(timeoutTime) for i:=0; i
全部0条评论
快来发表一下你的评论吧 !