Go 语言并发编程实战 Go 语言的并发模型是其最强大的特性之一。通过 Goroutine 和 Channel,你可以轻松构建高性能、高并发的应用程序。本文将全面介绍 Go 语言的并发编程,从基础概念到实战案例。
一、并发编程基础 1.1 什么是并发? 并发是指同一时间段内执行多个任务,而并行是指同时执行多个任务。Go 语言的并发模型既支持并发也支持并行。
1.2 为什么选择 Go 的并发模型? 轻量级 :Goroutine 只需几 KB 内存简单易用 :Goroutine + Channel 的并发模型高性能 :调度器高效利用 CPU原生支持 :语言级别的并发支持1.3 Goroutine 简介 Goroutine 是 Go 语言的轻量级线程,由 Go 调度器管理。
package mainimport ( "fmt" "time" ) func sayHello () { for i := 0 ; i < 5 ; i++ { fmt.Println("Hello" ) time.Sleep(1 * time.Second) } } func main () { go sayHello() for i := 0 ; i < 5 ; i++ { fmt.Println("Main" ) time.Sleep(1 * time.Second) } }
二、Goroutine 核心概念 2.1 启动 Goroutine 基础启动 func main () { go func () { fmt.Println("Goroutine 正在运行" ) }() fmt.Println("主程序继续执行" ) }
使用命名函数 func worker (id int ) { fmt.Printf("Worker %d 开始\n" , id) time.Sleep(2 * time.Second) fmt.Printf("Worker %d 结束\n" , id) } func main () { for i := 1 ; i <= 3 ; i++ { go worker(i) } time.Sleep(5 * time.Second) }
传递参数 func worker (id int , tasks []int ) { fmt.Printf("Worker %d 开始处理任务\n" , id) for _, task := range tasks { fmt.Printf("Worker %d: 处理任务 %d\n" , id, task) time.Sleep(1 * time.Second) } } func main () { tasks := []int {1 , 2 , 3 , 4 , 5 } for i := 1 ; i <= 3 ; i++ { go worker(i, tasks) } time.Sleep(6 * time.Second) }
2.2 主 Goroutine 主 Goroutine 会等待所有子 Goroutine 结束后才退出。
func main () { go func () { fmt.Println("子 Goroutine" ) }() fmt.Println("主程序" ) time.Sleep(1 * time.Second) fmt.Println("主程序结束" ) }
三、Channel 3.1 Channel 基础 Channel 是 Goroutine 之间的通信机制。
创建 Channel package mainimport "fmt" func main () { ch := make (chan int ) ch <- 10 fmt.Println("发送数据:" , <-ch) close (ch) }
Channel 类型 ch := make (chan int ) ch := make (chan string ) ch := make (chan int , 3 ) ch := make (<-chan int ) ch := make (chan <- int )
3.2 发送和接收 基础发送和接收 ch := make (chan string ) ch <- "Hello" fmt.Println("发送数据:" , "Hello" ) msg := <-ch fmt.Println("接收数据:" , msg) msg := <-ch fmt.Println("接收数据:" , msg) select {case msg := <-ch: fmt.Println("接收数据:" , msg) }
使用 select 进行选择 package mainimport ( "fmt" "time" ) func main () { ch1 := make (chan string ) ch2 := make (chan string ) go func () { time.Sleep(1 * time.Second) ch1 <- "来自 ch1 的消息" }() go func () { time.Sleep(2 * time.Second) ch2 <- "来自 ch2 的消息" }() select { case msg := <-ch1: fmt.Println(msg) case msg := <-ch2: fmt.Println(msg) } }
3.3 Channel 缓冲 package mainimport "fmt" func main () { unbuffered := make (chan int ) go func () { fmt.Println("发送到无缓冲 channel" ) unbuffered <- 10 }() fmt.Println("从无缓冲 channel 接收:" , <-unbuffered) buffered := make (chan int , 3 ) go func () { fmt.Println("发送到有缓冲 channel" ) buffered <- 1 buffered <- 2 buffered <- 3 fmt.Println("发送完成" ) }() fmt.Println("从有缓冲 channel 接收:" , <-buffered) fmt.Println("从有缓冲 channel 接收:" , <-buffered) fmt.Println("从有缓冲 channel 接收:" , <-buffered) }
3.4 Range 循环 package mainimport "fmt" func main () { ch := make (chan int , 3 ) go func () { for i := 1 ; i <= 3 ; i++ { ch <- i } close (ch) }() for v := range ch { fmt.Println("接收数据:" , v) } fmt.Println("Channel 已关闭" ) }
四、并发模式 4.1 Worker Pool 模式 package mainimport ( "fmt" "time" ) func worker (id int , jobs <-chan int , results chan <- int ) { for j := range jobs { fmt.Printf("Worker %d 处理任务 %d\n" , id, j) time.Sleep(2 * time.Second) results <- j * 2 } } func main () { const numJobs = 5 const numWorkers = 2 jobs := make (chan int , numJobs) results := make (chan int , numJobs) for w := 1 ; w <= numWorkers; w++ { go worker(w, jobs, results) } for j := 1 ; j <= numJobs; j++ { jobs <- j } close (jobs) for a := 1 ; a <= numJobs; a++ { <-results } fmt.Println("所有任务完成" ) }
4.2 Fan-Out Fan-In 模式 package mainimport ( "fmt" "time" ) func source (done <-chan struct {}, values ...int ) <-chan int { out := make (chan int ) go func () { defer close (out) for _, v := range values { select { case <-done: return case out <- v: } } }() return out } func square (in <-chan int ) <-chan int { out := make (chan int ) go func () { defer close (out) for v := range in { out <- v * v } }() return out } func main () { done := make (chan struct {}) numbers := source(done, 1 , 2 , 3 , 4 , 5 ) squares := square(numbers) for s := range squares { fmt.Println(s) } time.Sleep(1 * time.Second) close (done) }
4.3 Pipeline 模式 package mainimport ( "fmt" "time" ) func generate (nums ...int ) <-chan int { out := make (chan int ) go func () { defer close (out) for _, n := range nums { out <- n } }() return out } func square (in <-chan int ) <-chan int { out := make (chan int ) go func () { defer close (out) for n := range in { out <- n * n } }() return out } func merge (done <-chan struct {}, channels ...<-chan int ) <-chan int { var wg sync.WaitGroup out := make (chan int ) send := func (c <-chan int ) { defer wg.Done() for n := range c { select { case <-done: return case out <- n: } } } for _, c := range channels { wg.Add(1 ) go send(c) } go func () { wg.Wait() close (out) }() return out } func main () { done := make (chan struct {}) for result := range merge(done, square(generate(1 , 2 , 3 , 4 , 5 ))) { fmt.Println(result) } time.Sleep(1 * time.Second) close (done) }
五、Context 5.1 Context 基础 Context 用于控制 Goroutine 的生命周期。
package mainimport ( "context" "fmt" "time" ) func worker (ctx context.Context, id int ) { for { select { case <-ctx.Done(): fmt.Printf("Worker %d 收到取消信号\n" , id) return default : fmt.Printf("Worker %d 工作中...\n" , id) time.Sleep(1 * time.Second) } } } func main () { ctx, cancel := context.WithTimeout(context.Background(), 5 *time.Second) defer cancel() for i := 1 ; i <= 3 ; i++ { go worker(ctx, i) } time.Sleep(10 * time.Second) }
5.2 Context 取消 package mainimport ( "context" "fmt" "time" ) func worker (ctx context.Context, id int ) { for { select { case <-ctx.Done(): fmt.Printf("Worker %d 收到取消信号\n" , id) return default : fmt.Printf("Worker %d 工作中...\n" , id) time.Sleep(1 * time.Second) } } } func main () { ctx, cancel := context.WithCancel(context.Background()) for i := 1 ; i <= 3 ; i++ { go worker(ctx, i) } time.Sleep(3 * time.Second) fmt.Println("取消所有 workers" ) cancel() }
5.3 Context 传递值 package mainimport ( "context" "fmt" "time" ) func worker (ctx context.Context, id int ) { for { select { case <-ctx.Done(): fmt.Printf("Worker %d 收到取消信号\n" , id) return default : username := ctx.Value("username" ).(string ) fmt.Printf("Worker %d (用户: %s) 工作中...\n" , id, username) time.Sleep(1 * time.Second) } } } func main () { ctx := context.WithValue(context.Background(), "username" , "张三" ) for i := 1 ; i <= 3 ; i++ { go worker(ctx, i) } time.Sleep(5 * time.Second) }
六、错误处理 6.1 错误传递 package mainimport ( "errors" "fmt" ) func divide (a, b float64 ) (float64 , error ) { if b == 0 { return 0 , errors.New("除数不能为零" ) } return a / b, nil } func main () { result, err := divide(10 , 2 ) if err != nil { fmt.Println("错误:" , err) return } fmt.Println("结果:" , result) }
6.2 错误在 Channel 中传递 package mainimport ( "errors" "fmt" ) func worker (done chan <- error ) { defer func () { close (done) }() done <- errors.New("工作中遇到错误" ) } func main () { done := make (chan error ) go worker(done) if err := <-done; err != nil { fmt.Println("收到错误:" , err) } }
七、性能优化 7.1 Sync 包 package mainimport ( "fmt" "sync" ) func main () { var wg sync.WaitGroup var count int var mutex sync.Mutex for i := 0 ; i < 100 ; i++ { wg.Add(1 ) go func () { defer wg.Done() mutex.Lock() count++ mutex.Unlock() }() } wg.Wait() fmt.Println("最终计数:" , count) }
7.2 Sync.Pool package mainimport ( "fmt" "sync" ) var pool = sync.Pool{ New: func () interface {} { fmt.Println("创建新对象" ) return make ([]int , 10 ) }, } func main () { v1 := pool.Get().([]int ) fmt.Printf("对象: %v\n" , v1) pool.Put(v1) v2 := pool.Get().([]int ) fmt.Printf("对象: %v\n" , v2) }
八、实战案例 8.1 HTTP 并发请求 package mainimport ( "fmt" "io/ioutil" "net/http" "sync" "time" ) func fetch (url string ) string { resp, err := http.Get(url) if err != nil { return fmt.Sprintf("错误: %v" , err) } defer resp.Body.Close() body, _ := ioutil.ReadAll(resp.Body) return fmt.Sprintf("%s: %d bytes" , url, len (body)) } func main () { urls := []string { "https://example.com" , "https://google.com" , "https://github.com" , } var wg sync.WaitGroup for _, url := range urls { wg.Add(1 ) go func (u string ) { defer wg.Done() fmt.Println(fetch(u)) }(url) } wg.Wait() fmt.Println("所有请求完成" ) }
8.2 简单的爬虫 package mainimport ( "fmt" "io/ioutil" "net/http" "regexp" "sync" ) func fetch (url string ) (string , error ) { resp, err := http.Get(url) if err != nil { return "" , err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { return "" , err } return string (body), nil } func main () { urls := []string { "https://example.com" , "https://github.com" , } var wg sync.WaitGroup var mu sync.Mutex seen := make (map [string ]bool ) for _, url := range urls { wg.Add(1 ) go func (u string ) { defer wg.Done() body, err := fetch(u) if err != nil { fmt.Println("错误:" , err) return } re := regexp.MustCompile(`href="([^"]+)"` ) links := re.FindAllStringSubmatch(body, -1 ) mu.Lock() for _, link := range links { if len (link) > 1 && !seen[link[1 ]] { seen[link[1 ]] = true wg.Add(1 ) go func (l string ) { defer wg.Done() fmt.Println("发现链接:" , l) }(link[1 ]) } } mu.Unlock() }(url) } wg.Wait() }
九、最佳实践 9.1 并发安全 var ( counter int mutex sync.Mutex ) func increment () { mutex.Lock() defer mutex.Unlock() counter++ }
9.2 Context 使用 func process (ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() default : } return nil }
9.3 错误处理 十、总结 Go 语言的并发编程模型:
Goroutine :轻量级线程Channel :通信机制Context :控制生命周期sync 包 :同步原语优势:
适用场景:
掌握 Go 的并发编程,可以构建高性能的应用程序!