𝑻𝒆𝒏𝑪𝒍𝒂𝒘正在头脑风暴···
𝑻𝒆𝒏𝑲𝒊𝑺𝒆𝒀𝒂の𝑨𝒈𝒆𝒏𝒕助手
𝑻𝒆𝒏-𝒇𝒍𝒂𝒔𝒉

Go 语言并发编程实战

Go 语言的并发模型是其最强大的特性之一。通过 Goroutine 和 Channel,你可以轻松构建高性能、高并发的应用程序。本文将全面介绍 Go 语言的并发编程,从基础概念到实战案例。

一、并发编程基础

1.1 什么是并发?

并发是指同一时间段内执行多个任务,而并行是指同时执行多个任务。Go 语言的并发模型既支持并发也支持并行。

1.2 为什么选择 Go 的并发模型?

  1. 轻量级:Goroutine 只需几 KB 内存
  2. 简单易用:Goroutine + Channel 的并发模型
  3. 高性能:调度器高效利用 CPU
  4. 原生支持:语言级别的并发支持

1.3 Goroutine 简介

Goroutine 是 Go 语言的轻量级线程,由 Go 调度器管理。

package main

import (
"fmt"
"time"
)

func sayHello() {
for i := 0; i < 5; i++ {
fmt.Println("Hello")
time.Sleep(1 * time.Second)
}
}

func main() {
// 启动一个 Goroutine
go sayHello()

// 主程序继续执行
for i := 0; i < 5; i++ {
fmt.Println("Main")
time.Sleep(1 * time.Second)
}
}

二、Goroutine 核心概念

2.1 启动 Goroutine

基础启动

func main() {
// 启动 Goroutine
go func() {
fmt.Println("Goroutine 正在运行")
}()

// 主程序
fmt.Println("主程序继续执行")
}

使用命名函数

func worker(id int) {
fmt.Printf("Worker %d 开始\n", id)
time.Sleep(2 * time.Second)
fmt.Printf("Worker %d 结束\n", id)
}

func main() {
// 启动多个 Goroutine
for i := 1; i <= 3; i++ {
go worker(i)
}

time.Sleep(5 * time.Second)
}

传递参数

func worker(id int, tasks []int) {
fmt.Printf("Worker %d 开始处理任务\n", id)
for _, task := range tasks {
fmt.Printf("Worker %d: 处理任务 %d\n", id, task)
time.Sleep(1 * time.Second)
}
}

func main() {
tasks := []int{1, 2, 3, 4, 5}

for i := 1; i <= 3; i++ {
go worker(i, tasks)
}

time.Sleep(6 * time.Second)
}

2.2 主 Goroutine

主 Goroutine 会等待所有子 Goroutine 结束后才退出。

func main() {
go func() {
fmt.Println("子 Goroutine")
}()

fmt.Println("主程序")
time.Sleep(1 * time.Second)
fmt.Println("主程序结束")
}

// 输出顺序不确定,但主程序会等待子 Goroutine 结束

三、Channel

3.1 Channel 基础

Channel 是 Goroutine 之间的通信机制。

创建 Channel

package main

import "fmt"

func main() {
// 创建 channel
ch := make(chan int)

// 向 channel 发送数据
ch <- 10
fmt.Println("发送数据:", <-ch)

// 关闭 channel
close(ch)
}

Channel 类型

// 整数 channel
ch := make(chan int)

// 字符串 channel
ch := make(chan string)

// 带缓冲的 channel
ch := make(chan int, 3)

// 只读 channel
ch := make(<-chan int)

// 只写 channel
ch := make(chan<- int)

3.2 发送和接收

基础发送和接收

ch := make(chan string)

// 发送
ch <- "Hello"
fmt.Println("发送数据:", "Hello")

// 接收
msg := <-ch
fmt.Println("接收数据:", msg)

// 等待和接收
msg := <-ch
fmt.Println("接收数据:", msg)

// 带标签的接收
select {
case msg := <-ch:
fmt.Println("接收数据:", msg)
}

使用 select 进行选择

package main

import (
"fmt"
"time"
)

func main() {
ch1 := make(chan string)
ch2 := make(chan string)

go func() {
time.Sleep(1 * time.Second)
ch1 <- "来自 ch1 的消息"
}()

go func() {
time.Sleep(2 * time.Second)
ch2 <- "来自 ch2 的消息"
}()

// 等待任意一个 channel
select {
case msg := <-ch1:
fmt.Println(msg)
case msg := <-ch2:
fmt.Println(msg)
}
}

3.3 Channel 缓冲

package main

import "fmt"

func main() {
// 无缓冲 channel
unbuffered := make(chan int)
go func() {
fmt.Println("发送到无缓冲 channel")
unbuffered <- 10
}()
fmt.Println("从无缓冲 channel 接收:", <-unbuffered)

// 有缓冲 channel
buffered := make(chan int, 3)
go func() {
fmt.Println("发送到有缓冲 channel")
buffered <- 1
buffered <- 2
buffered <- 3
fmt.Println("发送完成")
}()

fmt.Println("从有缓冲 channel 接收:", <-buffered)
fmt.Println("从有缓冲 channel 接收:", <-buffered)
fmt.Println("从有缓冲 channel 接收:", <-buffered)
}

3.4 Range 循环

package main

import "fmt"

func main() {
ch := make(chan int, 3)

go func() {
for i := 1; i <= 3; i++ {
ch <- i
}
close(ch)
}()

// 遍历 channel
for v := range ch {
fmt.Println("接收数据:", v)
}
fmt.Println("Channel 已关闭")
}

四、并发模式

4.1 Worker Pool 模式

package main

import (
"fmt"
"time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("Worker %d 处理任务 %d\n", id, j)
time.Sleep(2 * time.Second)
results <- j * 2
}
}

func main() {
const numJobs = 5
const numWorkers = 2

jobs := make(chan int, numJobs)
results := make(chan int, numJobs)

// 启动 workers
for w := 1; w <= numWorkers; w++ {
go worker(w, jobs, results)
}

// 发送任务
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)

// 接收结果
for a := 1; a <= numJobs; a++ {
<-results
}

fmt.Println("所有任务完成")
}

4.2 Fan-Out Fan-In 模式

package main

import (
"fmt"
"time"
)

func source(done <-chan struct{}, values ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, v := range values {
select {
case <-done:
return
case out <- v:
}
}
}()
return out
}

func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for v := range in {
out <- v * v
}
}()
return out
}

func main() {
done := make(chan struct{})

// 数据源
numbers := source(done, 1, 2, 3, 4, 5)

// 并行处理
squares := square(numbers)

// 收集结果
for s := range squares {
fmt.Println(s)
}

time.Sleep(1 * time.Second)
close(done)
}

4.3 Pipeline 模式

package main

import (
"fmt"
"time"
)

// Stage 1: 生成数据
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}

// Stage 2: 过滤数据
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}

// Stage 3: 合并数据
func merge(done <-chan struct{}, channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

send := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case <-done:
return
case out <- n:
}
}
}

for _, c := range channels {
wg.Add(1)
go send(c)
}

go func() {
wg.Wait()
close(out)
}()

return out
}

func main() {
done := make(chan struct{})

// Pipeline
for result := range merge(done, square(generate(1, 2, 3, 4, 5))) {
fmt.Println(result)
}

time.Sleep(1 * time.Second)
close(done)
}

五、Context

5.1 Context 基础

Context 用于控制 Goroutine 的生命周期。

package main

import (
"context"
"fmt"
"time"
)

func worker(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d 收到取消信号\n", id)
return
default:
fmt.Printf("Worker %d 工作中...\n", id)
time.Sleep(1 * time.Second)
}
}
}

func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// 启动多个 workers
for i := 1; i <= 3; i++ {
go worker(ctx, i)
}

time.Sleep(10 * time.Second)
}

5.2 Context 取消

package main

import (
"context"
"fmt"
"time"
)

func worker(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d 收到取消信号\n", id)
return
default:
fmt.Printf("Worker %d 工作中...\n", id)
time.Sleep(1 * time.Second)
}
}
}

func main() {
ctx, cancel := context.WithCancel(context.Background())

for i := 1; i <= 3; i++ {
go worker(ctx, i)
}

// 等待一段时间后取消
time.Sleep(3 * time.Second)
fmt.Println("取消所有 workers")
cancel()
}

5.3 Context 传递值

package main

import (
"context"
"fmt"
"time"
)

func worker(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d 收到取消信号\n", id)
return
default:
// 获取 context 中的值
username := ctx.Value("username").(string)
fmt.Printf("Worker %d (用户: %s) 工作中...\n", id, username)
time.Sleep(1 * time.Second)
}
}
}

func main() {
// 创建带值的 context
ctx := context.WithValue(context.Background(), "username", "张三")

for i := 1; i <= 3; i++ {
go worker(ctx, i)
}

time.Sleep(5 * time.Second)
}

六、错误处理

6.1 错误传递

package main

import (
"errors"
"fmt"
)

func divide(a, b float64) (float64, error) {
if b == 0 {
return 0, errors.New("除数不能为零")
}
return a / b, nil
}

func main() {
result, err := divide(10, 2)
if err != nil {
fmt.Println("错误:", err)
return
}
fmt.Println("结果:", result)
}

6.2 错误在 Channel 中传递

package main

import (
"errors"
"fmt"
)

func worker(done chan<- error) {
defer func() {
close(done)
}()

// 模拟错误
done <- errors.New("工作中遇到错误")
}

func main() {
done := make(chan error)

go worker(done)

if err := <-done; err != nil {
fmt.Println("收到错误:", err)
}
}

七、性能优化

7.1 Sync 包

package main

import (
"fmt"
"sync"
)

func main() {
var wg sync.WaitGroup
var count int
var mutex sync.Mutex

// 多个 Goroutine 增加 count
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()

mutex.Lock()
count++
mutex.Unlock()
}()
}

wg.Wait()
fmt.Println("最终计数:", count)
}

7.2 Sync.Pool

package main

import (
"fmt"
"sync"
)

var pool = sync.Pool{
New: func() interface{} {
fmt.Println("创建新对象")
return make([]int, 10)
},
}

func main() {
// 获取对象
v1 := pool.Get().([]int)
fmt.Printf("对象: %v\n", v1)

// 释放对象
pool.Put(v1)

// 再次获取
v2 := pool.Get().([]int)
fmt.Printf("对象: %v\n", v2)
}

八、实战案例

8.1 HTTP 并发请求

package main

import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
)

func fetch(url string) string {
resp, err := http.Get(url)
if err != nil {
return fmt.Sprintf("错误: %v", err)
}
defer resp.Body.Close()

body, _ := ioutil.ReadAll(resp.Body)
return fmt.Sprintf("%s: %d bytes", url, len(body))
}

func main() {
urls := []string{
"https://example.com",
"https://google.com",
"https://github.com",
}

var wg sync.WaitGroup

// 并发请求
for _, url := range urls {
wg.Add(1)
go func(u string) {
defer wg.Done()
fmt.Println(fetch(u))
}(url)
}

wg.Wait()
fmt.Println("所有请求完成")
}

8.2 简单的爬虫

package main

import (
"fmt"
"io/ioutil"
"net/http"
"regexp"
"sync"
)

func fetch(url string) (string, error) {
resp, err := http.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}

return string(body), nil
}

func main() {
urls := []string{
"https://example.com",
"https://github.com",
}

var wg sync.WaitGroup
var mu sync.Mutex
seen := make(map[string]bool)

for _, url := range urls {
wg.Add(1)
go func(u string) {
defer wg.Done()

body, err := fetch(u)
if err != nil {
fmt.Println("错误:", err)
return
}

// 解析链接
re := regexp.MustCompile(`href="([^"]+)"`)
links := re.FindAllStringSubmatch(body, -1)

mu.Lock()
for _, link := range links {
if len(link) > 1 && !seen[link[1]] {
seen[link[1]] = true
wg.Add(1)
go func(l string) {
defer wg.Done()
fmt.Println("发现链接:", l)
}(link[1])
}
}
mu.Unlock()
}(url)
}

wg.Wait()
}

九、最佳实践

9.1 并发安全

// 使用 mutex 保护共享数据
var (
counter int
mutex sync.Mutex
)

func increment() {
mutex.Lock()
defer mutex.Unlock()
counter++
}

9.2 Context 使用

// 在函数中传递 context
func process(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
// 处理逻辑
}
return nil
}

9.3 错误处理

// 使用逗号-ok 模式
if _, ok := <-ch; ok {
// channel 未关闭
}

十、总结

Go 语言的并发编程模型:

  1. Goroutine:轻量级线程
  2. Channel:通信机制
  3. Context:控制生命周期
  4. sync 包:同步原语

优势:

  • 简单易用
  • 高性能
  • 并发友好

适用场景:

  • Web 服务
  • 微服务
  • 实时系统
  • 并行处理

掌握 Go 的并发编程,可以构建高性能的应用程序!