并发

我们知道,程序是从上到下逐行执行的。只有执行完当前行,才会执行下一行。

如果某一行的操作是一个长耗时的操作,尤其是涉及到网络操作时,就会导致程序长时间的等待,阻塞后续程序的执行。

这个时候如果有一个别的“人”来帮我们执行这个耗时的操作,并在合适的时机将结果告知给我们,从而使我们从这个麻烦中“腾出来”,有精力继续执行后续的操作该多好呀。

这种形式,我们称之为“并发”或“异步”,即“非阻塞”模型。

为了最大化优化程序的执行性能,Go 在语言层面提供了并发编程的支持。

并发,是 Go 语言的核心特性。

goroutine(Go 程) 是 Go 并行设计的核心。说到底其实就是一种协程。比线程更加轻量级。Go 程的设计隐藏了线程创建和管理的诸多复杂性,使我们可以以一种很简洁的方式来进行并发编程。

goroutine 的使用方式也很简单,通过一个 go 关键字和普通的函数即可。

在函数或方法前添加 go 关键字就可以在新的 goroutine 中调用它。当调用完成后, 该 goroutine 也会安静地退出。

// 通过在普通函数前加 go 关键字开启一个新的 goroutine
go loadDataFromRemote()

// 执行匿名函数
go func () {
  // do something...
}()

多个 goroutine 其实在运行在同一个进程之中,多个 goroutine 之间会共享内存数据。因此,在 goroutine 中直接更改数据会影响到其他的 goroutine。这会导致我们的程序数据混乱,出现难以预测的问题。

package main

import (
    "fmt"
    "time"
)

var a string = "Hello"

func update() {
    a = "World"
}

func main() {
    fmt.Println(a) // a 的值为 Hello

    go update()  // 开启一个新 goroutine,并尝试更改 a 的值。

  // 表示当前 goroutine 沉睡 1000 毫秒。Sleep 函数默认接受的单位为纳秒。
    time.Sleep(1000 * time.Millisecond)

    fmt.Println(a) // World
}

因此,Go 语言提供了一种在多个 goroutine 之间通信的机制 —— Channel(通道)。在程序设计上我们要遵循:不要通过共享来通信,而要通过通信来共享。

通道

Channel 是一种用来在多个 goroutine 之间进行通信和数据传递的机制。我们可以通过它来发送值或者接收值。

Channel 是一种特定的类型,需要通过 make 函数创建。并且在定义一个 Channel 时,也必须定义发送到该 Channel 的值的类型。

var c chan int = make(chan int)

ci := make(chan int)
cs := make(chan string)
cf := make(chan struct{})

Channel 在定义的时候支持通过 <- 指定数据发送的方向。

c1 := make(chan int)    // 可以接收和发送类型为 int 的数据
c2 := make(chan<- int)    // 可以发送类型为 int 的数据
c3 := make(<-chan int)    // 可以接收类型为 int 的数据

Channel 创建完成之后,我们就可以通过 <- 操作符来使用这个通道进行接收和发送数据。

c <- 1                // 将值 1 发送到通道 c 中
value := <-c    // 从通道 c 中取出一个值并赋值给 value

例如,我们来看一个简化版的例子:

c := make(chan string)  // 分配一个信道

// 在 goroutine 中启动排序。当它完成后,在信道上发送信号。
go func() {
    result := loadDataFromRemote()

    c <- result  // 发送信号传递数据
}()

doSomethingForAWhile()

data := <-c   // 从 channel 中接收数据

Channel 中发送数据的 goroutine 称为发送方,通过 Channel 接收数据的 goroutine 称为接收方。

package main

import "fmt"

func sum(a []int, c chan int) {
    total := 0

    for _, v := range a {
        total += v
    }

    // 发送方
    c <- total
}

func main() {
    a := []int{7, 2, 8, -9, 4, 0}
    b := []int{2, 4, -5, 6, 9}

    c := make(chan int)

    go sum(a, c)
    go sum(b, c)

    // 当前 goroutine 为接收方
    x := <-c
    y := <-c

    fmt.Println(x)
    fmt.Println(y)
}

默认情况下,在另一端准备好之前,发送和接收都会阻塞。这使得 goroutine 可以在没有明确的锁或竞态变量的情况下进行同步。

我们来看一个例子来更好的理解这种行为:

package main

import (
    "fmt"
    "time"
)

func do(c chan string) {
    time.Sleep(1000 * time.Millisecond)

    c <- "done"
}

func main() {
    c := make(chan string)

    fmt.Println("1")

    go do(c)

    fmt.Println("2")

    a := <-c

    fmt.Println("3")
    fmt.Println(a)
}

程序会先打印出 1,然后开启一个新的 goroutine 去并行执行 do(),而当前 goroutine 继续向下执行打印 2,之后遇到了 <-c 从通道中接收值的操作,但是由于执行 do() 的新 goroutine 进行了一个比较耗时的操作(程序主动沉睡)导致并未向通道中发送值(即发送方未准备好),此时接收方进入阻塞状态,暂停执行,直至发送方准备好开始发送数据,程序的表现就是在此等待 1 秒钟之后打印 3done

同样,如果接收方没有准备好,那么任何发送操作也会被阻塞,直到数据被读出。

缓冲通道

使用 make 创建通道的时候也可以指定该通道的大小,即通道的缓冲。

缓冲的大小表示该通道能容纳的最多的元素的数量。

设置缓冲大小的方式是:

c := make(chan type, size)

如果没有设置容量,或者容量设置为 0, 说明 Channel 没有缓存,只有发送方和接收方都准备好了后它们的通讯才会发生。

如果设置了缓存,就有可能不发生阻塞。向缓冲 Channel 发送数据的时候,只有在缓冲区满的时候才会阻塞。当缓冲区为空的时候接受方会阻塞。

带缓冲的信道可被用作信号量,例如限制吞吐量。

var MaxOutstanding int = 5 // 设置最大吞吐量
var sem = make(chan bool, MaxOutstanding)

func Serve(queue chan *Request) {
    for req := range queue {
        req := req // 为新 goroutine 创建 req 的新实例,避免多个 goroutine 使用操作一个值。

        // sem 通道的缓冲区大小为 5,
        // 因此当缓冲区未满时,goroutine 会继续向下执行而不会被阻塞,
        // 当超过 5 个,即缓冲区满的时候,该发送操作就会被阻塞,导致 goroutine 暂停。
        sem <- true

        go func() {
            process(req)    // 一个耗时操作,可能需要很长时间。
            <-sem    // 从通道中接收一个值,让出一个位置,使下一个请求可以运行。
        }()
    }
}

Range

我们在读取通道里的多个值的时候,需要连续的进行读取操作,例如:

package main

import "fmt"

func sum(a []int, c chan int) {
    total := 0

    for _, v := range a {
        total += v
    }

    c <- total
}

func main() {
    a := []int{7, 2, 8, -9, 4, 0}
    b := []int{2, 4, -5, 6, 9}

    c := make(chan int)

    go sum(a, c)
    go sum(b, c)

    x := <-c
    y := <-c

    fmt.Println(x)
    fmt.Println(y)
}

如果通道的值有很多,那么这样会导致程序非常臃肿。而 range 能够让我们以一种更简单的方式来进行连续性读取。

package main

import (
    "fmt"
)

func fibonacci(n int, c chan int) {
    x, y := 1, 2

    for i := 0; i < n; i++ {
        c <- x
        x, y = y, x+y
    }

    close(c)
}

func main() {
    c := make(chan int, 10)

    go fibonacci(cap(c), c)

    for i := range c {
        fmt.Println(i)
    }
}

for i := range c 能够不断的读取通道里面的数据,直到该通道被显式的关闭。

从上面的函数中我们也看到通道可以通过内置的 close(chan) 进行关闭。通道关闭之后,就无法再发送任何数据。

只有发送方才能关闭通道。通常情况下无需关闭它们。只有在需要告诉接收者没有更多的数据的时候才有必要进行关闭,例如中断一个 range

接收方可以通过 v, ok := <-c 语法来判断通道是否被关闭。当没有值可以接收并且通道已经被关闭,那么 ok 会被设置为 false

Select

如果在某个 goroutine 中存在多个 channel,那么我们可以通过 select 来监听 channel 上的数据流动。

select 默认是阻塞的,直到条件分支中的某个可以继续执行,这时就会执行那个条件分支。当多个 channel 都准备好的时候,select 会随机的选择一个执行。

package main

import "fmt"

// 该函数同时监听两个通道
func fibonacci(c, quit chan int) {
    x, y := 0, 1

    for {
        select {
        case c <- x:    // 新 goroutine 中通道 c 先准备好接收数据,因此该分支先满足继续执行条件,所以先执行。
            x, y = y, x + y
        case <-quit:    // 新 goroutine 中会在循环结束之后才向该通道发送数据,所以后执行。
            fmt.Println("quit")
            return
        }
    }
}

func main() {
    c := make(chan int)
    quit := make(chan int)

    // 开启一个新 goroutine
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(<-c)
        }

        quit <- 0
    }()

    fibonacci(c, quit)
}

为了非阻塞的发送或者接收,可使用 default 分支,当 select 中的其他条件分支都没有准备好的时候,default 分支会被执行。

select {
case i := <-c:
  // 使用 i
default:
  // 发生阻塞时,该分支会被执行
}

results matching ""

    No results matching ""