解析Go语言的高级特性

嵌入式技术

1368人已加入

描述

作者:Goland猫 

对于大型的互联网应用程序,如电商平台、社交网络、金融交易平台等,每秒钟都会收到大量的请求。在这些应用程序中,需要使用高效的技术来应对高并发的请求,尤其是在短时间内处理大量的请求,如1分钟百万请求。

同时,为了降低用户的使用门槛和提升用户体验,前端需要实现参数的无感知传递。这样用户在使用时,无需担心参数传递的问题,能够轻松地享受应用程序的服务。

在处理1分钟百万请求时,需要使用高效的技术和算法,以提高请求的响应速度和处理能力。Go语言以其高效性和并发性而闻名,因此成为处理高并发请求的优秀选择。Go中有多种模式可供选择,如基于goroutine和channel的并发模型、使用池技术的协程模型等,以便根据具体应用的需要来选择适合的技术模式。

本文代码参考搬至

W1

W1 结构体类型,它有五个成员:

WgSend 用于等待任务发送的 goroutine 完成。

Wg 用于等待任务处理的 goroutine 完成。

MaxNum 表示 goroutine 池的大小。

Ch 是一个字符串类型的通道,用于传递任务。

DispatchStop 是一个空结构体类型的通道,用于停止任务分发。

 

type W1 struct {
 WgSend       *sync.WaitGroup
 Wg           *sync.WaitGroup
 MaxNum       int
 Ch           chan string
 DispatchStop chan struct{}
}

 

接下来是 Dispatch 方法,它将任务发送到通道 Ch 中。它通过 for 循环来发送 10 倍于 MaxNum 的任务,每个任务都是一个 goroutine。defer 语句用于在任务完成时减少 WgSend 的计数。select 语句用于在任务分发被中止时退出任务发送。

Dispatch

 

func (w *W1) Dispatch(job string) {
 w.WgSend.Add(10 * w.MaxNum)
 for i := 0; i < 10*w.MaxNum; i++ {
  go func(i int) {
   defer w.WgSend.Done()

   select {
   case w.Ch <- fmt.Sprintf("%d", i):
    return
   case <-w.DispatchStop:
    fmt.Println("退出发送 job: ", fmt.Sprintf("%d", i))
    return
   }

  }(i)
 }
}

 

StartPool

然后是 StartPool 方法,它创建了一个 goroutine 池来处理从通道 Ch 中读取到的任务。

如果通道 Ch 还没有被创建,那么它将被创建。如果计数器 WgSend 还没有被创建,那么它也将被创建。如果计数器 Wg 还没有被创建,那么它也将被创建。

如果通道 DispatchStop 还没有被创建,那么它也将被创建。

for 循环用于创建 MaxNum 个 goroutine 来处理从通道中读取到的任务。defer 语句用于在任务完成时减少 Wg 的计数。

 

func (w *W1) StartPool() {
 if w.Ch == nil {
  w.Ch = make(chan string, w.MaxNum)
 }

 if w.WgSend == nil {
  w.WgSend = &sync.WaitGroup{}
 }

 if w.Wg == nil {
  w.Wg = &sync.WaitGroup{}
 }

 if w.DispatchStop == nil {
  w.DispatchStop = make(chan struct{})
 }
 w.Wg.Add(w.MaxNum)
 for i := 0; i < w.MaxNum; i++ {
  go func() {
   defer w.Wg.Done()
   for v := range w.Ch {
    fmt.Printf("完成工作: %s 
", v)
   }
  }()
 }
}

 

Stop

最后是 Stop 方法,它停止任务分发并等待所有任务完成。

它关闭了通道 DispatchStop,等待 WgSend 中的任务发送 goroutine 完成,然后关闭通道 Ch,等待 Wg 中的任务处理 goroutine 完成。

 

func (w *W1) Stop() {
 close(w.DispatchStop)
 w.WgSend.Wait()
 close(w.Ch)
 w.Wg.Wait()
}
代码

 

W2

SubWorker

 

type SubWorker struct {
 JobChan chan string
}

 

子协程,它有一个 JobChan,用于接收任务。

Run:SubWorker 的方法,用于启动一个子协程,从 JobChan 中读取任务并执行。

 

func (sw *SubWorker) Run(wg *sync.WaitGroup, poolCh chan chan string, quitCh chan struct{}) {
 if sw.JobChan == nil {
  sw.JobChan = make(chan string)
 }
 wg.Add(1)

 go func() {
  defer wg.Done()

  for {
   poolCh <- sw.JobChan

   select {
   case res := <-sw.JobChan:
    fmt.Printf("完成工作: %s 
", res)

   case <-quitCh:
    fmt.Printf("消费者结束...... 
")
    return

   }
  }
 }()
}

 

W2

 

type W2 struct {
 SubWorkers []SubWorker
 Wg         *sync.WaitGroup
 MaxNum     int
 ChPool     chan chan string
 QuitChan   chan struct{}
}

 

Dispatch

Dispatch:W2 的方法,用于从 ChPool 中获取 TaskChan,将任务发送给一个 SubWorker 执行。

 

func (w *W2) Dispatch(job string) {
 jobChan := <-w.ChPool

 select {
 case jobChan <- job:
  fmt.Printf("发送任务 : %s 完成 
", job)
  return

 case <-w.QuitChan:
  fmt.Printf("发送者(%s)结束 
", job)
  return

 }
}

 

StartPool

StartPool:W2 的方法,用于初始化协程池,启动所有子协程并把 TaskChan 存储在 ChPool 中。

 

func (w *W2) StartPool() {
 if w.ChPool == nil {
  w.ChPool = make(chan chan string, w.MaxNum)
 }

 if w.SubWorkers == nil {
  w.SubWorkers = make([]SubWorker, w.MaxNum)
 }

 if w.Wg == nil {
  w.Wg = &sync.WaitGroup{}
 }

 for i := 0; i < len(w.SubWorkers); i++ {
  w.SubWorkers[i].Run(w.Wg, w.ChPool, w.QuitChan)
 }
}

 

Stop

Stop:W2 的方法,用于停止协程的工作,并等待所有协程结束。

 

func (w *W2) Stop() {
 close(w.QuitChan)
 w.Wg.Wait()

 close(w.ChPool)
}

 

DealW2 函数则是整个协程池的入口,它通过 NewWorker 方法创建一个 W2 实例,然后调用 StartPool 启动协程池,并通过 Dispatch 发送任务,最后调用 Stop 停止协程池。

 

func DealW2(max int) {
 w := NewWorker(w2, max)
 w.StartPool()
 for i := 0; i < 10*max; i++ {
  go w.Dispatch(fmt.Sprintf("%d", i))
 }

 w.Stop()
}
代码

 

个人见解

看到这里对于w2我已经有点迷糊了,还能传递w.Wg, w.ChPool, w.QuitChan?

 

原来是golang里如果方法传递的不是地址,那么就会做一个拷贝,所以这里调用的wg根本就不是一个对象。

传递的地方传递地址就可以了,如果不传递地址,将会出现死锁

go doSomething(i, &wg, ch)

func doSomething(index int, wg *sync.WaitGroup, ch chan int) {

 

w1也有一个比较大的问题。在处理请求时,每个 Goroutine 都会占用一定的系统资源,如果请求量过大,会造成 Goroutine 数量的剧增,消耗过多系统资源,程序可能会崩溃

探究原文

在这段代码中,poolCh代表工作者池,sw.JobChan代表工作者的工作通道。当一个工作者完成了工作后,它会将工作结果发送到sw.JobChan,此时可以通过case res := <-sw.JobChan:来接收该工作的结果。

在这个代码块中,还需要处理一个退出信号quitCh。因此,第二个case <-quitCh:用于检测是否接收到了退出信号。如果接收到了退出信号,程序将打印出消息并结束。

需要注意的是,这两个case语句是互斥的,只有当工作者完成工作或收到退出信号时,才会进入其中一个语句。因此,这个循环可以保证在工作者完成工作或收到退出信号时退出。

需要读取两次sw.JobChan的原因是:第一次读取用于将工作者的工作通道放回工作者池中,这样其他工作者就可以使用该通道。第二次读取用于接收工作者的工作结果或退出信号。因此,这两次读取是为了确保能够在正确的时刻将工作者的工作通道放回工作者池中并正确地处理工作结果或退出信号。

根据w2的特点 我自己写了一个w2

 

import (
   "fmt"
   "sync"
)

type SubWorkerNew struct {
   JobChan chan string
}

type W2New struct {
   SubWorkers []SubWorkerNew
   Wg         *sync.WaitGroup
   MaxNum     int
   ChPool     chan chan string
   QuitChan   chan struct{}
}

func NewW2(maxNum int) *W2New {
   subWorkers := make([]SubWorkerNew, maxNum)
   for i := 0; i < maxNum; i++ {
      subWorkers[i] = SubWorkerNew{JobChan: make(chan string)}
   }

   pool := make(chan chan string, maxNum)
   for i := 0; i < maxNum; i++ {
      pool <- subWorkers[i].JobChan
   }

   return &W2New{
      SubWorkers: subWorkers,
      Wg:         &sync.WaitGroup{},
      MaxNum:     maxNum,
      ChPool:     pool,
      QuitChan:   make(chan struct{}),
   }
}

func (w *W2New) Dispatch(job string) {
   select {
   case jobChannel := <-w.ChPool:
      jobChannel <- job
   default:
      fmt.Println("All workers busy")
   }
}

func (w *W2New) StartPool() {
   for i := 0; i < w.MaxNum; i++ {
      go func(subWorker *SubWorkerNew) {
         w.Wg.Add(1)
         defer w.Wg.Done()

         for {
            select {
            case job := <-subWorker.JobChan:
               fmt.Println("processing ", job)
            case <-w.QuitChan:
               return
            }
         }
      }(&w.SubWorkers[i])
   }
}

func (w *W2New) Stop() {
   close(w.QuitChan)
   w.Wg.Wait()
   close(w.ChPool)

   for _, subWorker := range w.SubWorkers {
      close(subWorker.JobChan)
   }
}

func main() {
   w := NewW2(5)
   w.StartPool()

   for i := 0; i < 20; i++ {
      w.Dispatch(fmt.Sprintf("job %d", i))
   }

   w.Stop()
}

 

但是有几个点需要注意

1.没有考虑JobChan通道的缓冲区大小,如果有大量任务被并发分配,容易导致内存占用过高;

2.每个线程都会执行无限循环,此时线程退出的条件是接收到QuitChan通道的信号,可能导致线程的阻塞等问题;

3.Dispatch函数的默认情况下只会输出"All workers busy",而不是阻塞,这意味着当所有线程都处于忙碌状态时,任务会丢失

4.线程池启动后无法动态扩展或缩小。

优化

这个优化版本改了很多次。有一些需要注意的点是,不然会一直死锁

 

1.使用sync.WaitGroup来确保线程池中所有线程都能够启动并运行;

2.在Stop函数中,先向SubWorker的JobChan中发送一个关闭信号,再等待所有SubWorker线程退出;

3.在Dispatch函数中,将默认情况下的输出改为阻塞等待可用通道;

 

w2new

 

package handle_million_requests

import (
 "fmt"
 "sync"
 "time"
)

type SubWorkerNew struct {
 Id      int
 JobChan chan string
}

type W2New struct {
 SubWorkers []SubWorkerNew
 MaxNum     int
 ChPool     chan chan string
 QuitChan   chan struct{}
 Wg         *sync.WaitGroup
}

func NewW2(maxNum int) *W2New {
 chPool := make(chan chan string, maxNum)
 subWorkers := make([]SubWorkerNew, maxNum)
 for i := 0; i < maxNum; i++ {
  subWorkers[i] = SubWorkerNew{Id: i, JobChan: make(chan string)}
  chPool <- subWorkers[i].JobChan
 }
 wg := new(sync.WaitGroup)
 wg.Add(maxNum)

 return &W2New{
  MaxNum:     maxNum,
  SubWorkers: subWorkers,
  ChPool:     chPool,
  QuitChan:   make(chan struct{}),
  Wg:         wg,
 }
}

func (w *W2New) StartPool() {
 for i := 0; i < w.MaxNum; i++ {
  go func(wg *sync.WaitGroup, subWorker *SubWorkerNew) {
   defer wg.Done()
   for {
    select {
    case job := <-subWorker.JobChan:
     fmt.Printf("SubWorker %d processing job %s
", subWorker.Id, job)
     time.Sleep(time.Second) // 模拟任务处理过程
    case <-w.QuitChan:
     return
    }
   }
  }(w.Wg, &w.SubWorkers[i])
 }
}

func (w *W2New) Stop() {
 close(w.QuitChan)
 for i := 0; i < w.MaxNum; i++ {
  close(w.SubWorkers[i].JobChan)
 }
 w.Wg.Wait()
}

func (w *W2New) Dispatch(job string) {
 select {
 case jobChan := <-w.ChPool:
  jobChan <- job
 default:
  fmt.Println("All workers busy")
 }
}
func (w *W2New) AddWorker() {
 newWorker := SubWorkerNew{Id: w.MaxNum, JobChan: make(chan string)}
 w.SubWorkers = append(w.SubWorkers, newWorker)
 w.ChPool <- newWorker.JobChan
 w.MaxNum++
 w.Wg.Add(1)
 go func(subWorker *SubWorkerNew) {
  defer w.Wg.Done()

  for {
   select {
   case job := <-subWorker.JobChan:
    fmt.Printf("SubWorker %d processing job %s
", subWorker.Id, job)
    time.Sleep(time.Second) // 模拟任务处理过程
   case <-w.QuitChan:
    return
   }
  }
 }(&newWorker)
}

func (w *W2New) RemoveWorker() {
 if w.MaxNum > 1 {
  worker := w.SubWorkers[w.MaxNum-1]
  close(worker.JobChan)
  w.MaxNum--
  w.SubWorkers = w.SubWorkers[:w.MaxNum]
 }
}

 

AddWorker和RemoveWorker,用于动态扩展/缩小线程池。

在AddWorker函数中,我们首先将MaxNum增加了1,然后创建一个新的SubWorkerNew结构体,将其添加到SubWorkers中,并将其JobChan通道添加到ChPool通道中。最后,我们创建一个新的协程来处理新添加的SubWorkerNew并让它进入无限循环,等待接收任务。

在RemoveWorker函数中,我们首先将MaxNum减少1,然后获取最后一个SubWorkerNew结构体,将它的JobChan通道发送到ChPool通道中,并从其通道中读取任何待处理的任务,最后创建一个新的协程来处理SubWorkerNew,继续处理任务。

测试用例

 

func TestW2New(t *testing.T) {  
    pool := NewW2(3)  
    pool.StartPool()  

    pool.Dispatch("task 1")  
    pool.Dispatch("task 2")  
    pool.Dispatch("task 3")  

    pool.AddWorker()  
    pool.AddWorker()  
    pool.RemoveWorker()  

    pool.Stop()  
}
代码

 

当Dispatch函数向ChPool通道获取可用通道时,会从通道中取出一个SubWorker的JobChan通道,并将任务发送到该通道中。而对于SubWorker来说,并没有进行任务的使用次数限制,所以它可以处理多个任务。

在这个例子中,当任务数量比SubWorker数量多时,一个SubWorker的JobChan通道会接收到多个任务,它们会在SubWorker的循环中按顺序依次处理,直到JobChan中没有未处理的任务为止。因此,如果任务数量特别大,可能会导致某些SubWorker的JobChan通道暂时处于未处理任务状态,而其他的SubWorker在执行任务。

在测试结果中,最后三行中出现了多个"SubWorker 0 processing job",说明SubWorker 0的JobChan通道接收了多个任务,并且在其循环中处理这些任务。下面的代码片段显示了这个过程:

// SubWorker 0 的循环部分

 

for {

    select {

    case job := <-subWorker.JobChan:

        fmt.Printf("SubWorker %d processing job %s
", subWorker.Id, job)

    case <-w.QuitChan:

        return

    }

}
代码


  审核编辑:汤梓红
 
打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分