对于channel来说可以使用for range语法来不断地读值,但是如果当全部值已经读取完毕之后,for range就会阻塞。这个时候我们可以使用close(),来手动的关闭channel,此时for range就会退出了。

例如:

func main() {
    var msg chan int
    var wg sync.WaitGroup
    wg.Add(1)
    msg = make(chan int, 3)
    go func(msg chan int) {
        defer wg.Done()
        for data := range msg {
            fmt.Println(data)
        }
        fmt.Println("所有消息都读完啦!")
    }(msg)
    for i := 0; i < 3; i ++ {
        msg <- i;
    }
    close(msg)
    wg.Wait()
}

运行结果如下:

0
1
2
所有消息都读完啦!
有一个需要注意的地方,对于一个close了的channel来说可以继续读值,但是不能向里写值

在go语言中,两个协程之间进行通信,使用的是一种消息队列的形式。消息的生产者将消息放入消息队列,然后消费者从消息队列中获取消息。

go提供了一个数据类型和语法糖来完成这种操作 channel<-

例如:

func main() {
    var msg chan string
    msg = make(chan string, 1)
    msg <- "哈喽,哈喽!"
    data := <- msg
    fmt.Println(data)
}
channel的使用需要初始化,并且需要显性的指明缓冲区大小
chan 后面跟着的是channel中放的数据类型,这个同样需要显性的指明

运行结果应该是:

哈喽,哈喽!

当缓冲区大小为0时,不能直接的在一个协程中写入和读取

例如:

func main() {
    var msg chan string
    msg = make(chan string, 0) // 缓冲区大小设为0
    msg <- "哈喽,哈喽!"
    data := <- msg
    fmt.Println(data)
}

运行之后会报死锁:

fatal error: all goroutines are asleep - deadlock!

此时,由于go语言有一种happen-before的机制,让我们可以使用一个另外的goroutine来读取,例如:

func main() {
    var msg chan string
    var wg sync.WaitGroup
    wg.Add(1)
    msg = make(chan string, 0) // 缓冲区大小设为0
    go func(msg chan string) {
        defer wg.Done()
        data := <- msg
        fmt.Println(data)
    }(msg)
    msg <- "哈喽,哈喽!"
    wg.Wait()
}

运行结果为:

哈喽,哈喽!

读写锁,RWMuex

常用的有四个方法 Lock(), RLock(), Unlock(),RUnlock(),分别是加写锁,加读锁,释放写锁以及释放读锁

对于写锁来说,会阻塞其他的写锁和读锁

对于读锁来说,不会阻塞读锁(可以同时有多个读锁存在),但是会阻塞写锁

同时因为读锁可以有多个,所以当存在一个读和写的goroutine时,写的协程只能等全部的读锁被释放时才能获取。

应用场景就是在一个系统中,大部分时间是读的请求大于写的请求。

所以读是可以并发的,但是显然写和读、写和写之间只能是串行,不能同时进行。

例如:

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    rwlock sync.RWMutex
    wg     sync.WaitGroup
)

func main() {
    wg.Add(6)
    go func() {
        defer wg.Done()
        time.Sleep(time.Second) // 为了让读锁先被获取
        rwlock.Lock()
        defer rwlock.Unlock()
        fmt.Println("获取到写锁")
        time.Sleep(3 * time.Second)
    }()

    for range 5 {
        go func() {
            defer wg.Done()
            for {
                rwlock.RLock()
                fmt.Println("获取到读锁")
                time.Sleep(500 * time.Millisecond)
                rwlock.RUnlock()
            }
        }()
    }

    wg.Wait()
}

在这个例子中一共是启了6个goroutine,其中一个负责写,另外五个负责读,在读的goroutine中有一个for循环不断获取读锁,在写的goroutine中当获取到写锁之后休眠3秒,让效果更直观。

对于

假如说现在有一个公共的变量var count int

同时有两个方法Add() 和 Sub() Add方法可以将count加10000次,Sub方法可以将count减10000次。

这个问题来说,除了使用mutex之外也可以使用atomic包中的Addint32()方法来实现。

例如:

var count int32
var wg sync.WaitGroup

func add() {
    defer wg.Done()
    for range 100000 {
        atomic.AddInt32(&count, 1)
    }
}

func sub() {
    defer wg.Done()
    for range 100000 {
        atomic.AddInt32(&count, -1)
    }
}

func main() {
    wg.Add(2)
    go add()
    go sub()
    wg.Wait()
    fmt.Println(count)
}

运行之后的结果:

0

假如说现在有一个公共的变量var count int

同时有两个方法Add() 和 Sub() Add方法可以将count加10000次,Sub方法可以将count减10000次。

现在同时启动两个goroutine来运行这两个方法例如:

var count int
var wg sync.WaitGroup

func add() {
    defer wg.Done()
    for range 100000 {
        count += 1
    }
}

func sub() {
    defer wg.Done()
    for range 100000 {
        count -= 1
    }
}

func main() {
    wg.Add(2)
    go add()
    go sub()
    wg.Wait()
    fmt.Println(count)
}

运行之后会发现count的值不是预期中的零,而是每次运行之后都是一个随机的值。

这是因为事实上对count进行加1的操作并不是一个原子操作,而是可以分为3各部分:

加载count -> count + 1 -> 写入count

同理,对count减一也是三个部分:

加载count -> count - 1 -> 写入count

而在两个并发的过程当中,这六个独立的操作是可以不按照既定的顺序进行,每次执行的顺序都不唯一,会相互之间有交叉,所以这就造成了,我们每次运行之后count的结果为一个随机值的原因。

为了解决这种对公共资源的竞争情况,可以使用互斥锁,也就是 mutex

互斥锁可以让一个过程变为原子操作,即对于上面让count加1这个操作,如果这个 count += 1 被锁包裹,那么这个代码所表示的三个部分会变为一个整体。当然这是对于加了同一个锁的多个协程而言的。

比如如果有两个互斥锁lock1和lock2,在函数add()中 count += 1 被lock1包裹,而在函数sub()中 count -= 1 被lock2包裹,此时再运行仍然会打印出随机数。因此在使用互斥锁时,对于多个想要使用共同的公共资源的协程来说需要加上同一把锁。

例如:

var count int
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
    defer wg.Done()
    for range 100000 {
        lock.Lock()
        count += 1
        lock.Unlock()
    }
}

func sub() {
    defer wg.Done()
    for range 100000 {
        lock.Lock()
        count -= 1
        lock.Unlock()
    }
}

func main() {
    wg.Add(2)
    go add()
    go sub()
    wg.Wait()
    fmt.Println(count)
}

运行结果为:

0

而对于使用不同的锁的情况例如:

var (
    count int
    wg    sync.WaitGroup
    lock  sync.Mutex
    lock1 sync.Mutex
)

func add() {
    defer wg.Done()
    for range 100000 {
        lock.Lock()
        count += 1
        lock.Unlock()
    }
}

func sub() {
    defer wg.Done()
    for range 100000 {
        lock1.Lock()
        count -= 1
        lock1.Unlock()
    }
}

func main() {
    wg.Add(2)
    go add()
    go sub()
    wg.Wait()
    fmt.Println(count)
}

运行结果仍然为一个随机数