goroutinue和channel
- 需求
- 传统方式实现
- goroutinue
- 进程和线程说明
- 并发和并行
- go协程和go主线程
- MPG
- 设置Go运行的cpu数
- channel(管道)-看个需求
- 使用互斥锁、写锁
- channel 实现
- 使用select可以解决从管道取数据的阻塞问题(无需手动关闭channel了)
- goroutinue中使用了recover,解决协程中出现panic,导致程序崩溃问题
- 素数问题
需求
要求:统计 1-20000的数字中,哪些是素数?
分析思路:
(1)传统的方法中,就是使用一个循环,循环的判断各个数是不是素数
(2)使用并发或并行的方式,将统计素数的任务分配给多个goroutine去完成,这时就会使用到goroutinue
传统方式实现
func main() {now := time.Now()printPrime(1000000)seconds := time.Since(now).Seconds()fmt.Println("time : ", seconds)
}// 打印素数
// 素数定义:仅能被1和它本身整除的大于1的自然数func printPrime(n int) {for i := 2; i <= n; i++ {isPrime(i)}
}// 判断是否是素数func isPrime(n int) bool {if n <= 1 {return false}sqrt := int(math.Sqrt(float64(n))) // 更高效for i := 2; i <= sqrt; i++ {if n%i == 0 {return false}}fmt.Println(n)return true
}
单核 大约需要0.9s左右
goroutinue
进程和线程说明
- 进程就是程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基本单元
- 线程是进城的一个执行实例,是程序执行的最小单元,他是比进程更小的能独立运行的基本单位
- 一个进程可以创建核销多个线程,同一个进程中的多个线程可以并发执行
- 一个程序至少有一个进程,一个进程至少有一个线程
并发和并行
- 多个线程在单核上执行,就是并发
- 多个线程在多核上运行,就是并行
go协程和go主线程
- Go主线程:一个狗线程上可以起多个协程,协程是轻量级的线程
- Go协程的特点
- 有独立的栈空间
- 共享程序堆空间
- 调度由用户控制
- 协程是轻量级的线程
MPG
- 主线程是一个物理线程,直接作用在cpu上。是重量级的,非常耗费cpu资源
- 协程是从主线程开启的,是轻量级别的线程,是逻辑太,对资源消耗相对小
- Golang 的协程机制是重要的特点,可以轻松的开启上万个协程。其他编程语言的并发机制一般基于线程的,开启过多的线程,资源耗费大,这里就凸显Golang在并发的优势了
MPG模式基本介绍
M: 操作系统主线程 是物理线程
P: 协程执行需要的上下文
G: 协程
- 当前程序由三个M,如果三个M都在一个cpu运行,就是并发,如果在不同的cpu运行,就是并行
- M1、M2、M3 正在执行一个G,M1协程队列有三个,M2协程队列有三个,M3线程队列有两个
- go协程是 轻量级的线程,是逻辑态的,Go可以容易的启动上万个协程
- 其他程序c/Java的多线程,往往是内核态的,比较重量级,几千个线程可能耗光cpu
1. 分成两个部分来看
2. 原来的情况是M0主线程正在执行Go协程,另外有三个协程在队列等待
3. 如果Go协程阻塞,比如读文件或数据库等
4. 这是就会创建M1主线程(也可能从已有线程中取出M1),并且将等待的3个协程挂到M1下开始执行,M0的主线程下的GO仍然执行文件IO的读写
5.这样MPG调度模拟,可以既让Go执行,同时也不会让队列的其他协程一直阻塞,仍然可以并发或并行执行
6. 等到Go不阻塞了,M0会被放到空闲的主线程继续执行(从已有线程中取),同时GO又会被唤醒
设置Go运行的cpu数
go1.8后,默认让程序运行在多个核上,可以不用设置了
fmt.Println("CPU", runtime.NumCPU())
runtime.GOMAXPROCS(19)
channel(管道)-看个需求
现在要计算1-200的各个数的阶乘,并且把各个数的阶乘写入到map中,最后显示出来。要求使用goroutinue实现
分析:
- map不是并发安全,应该会有安全问题
- 加锁或者使用channel通信实现
var wg sync.WaitGroup
func main() {wg.Add(200)calcJiCheng(200)
}func calcJiCheng(n int) {m := make(map[int]int64)for i := 1; i <= n; i++ {go jiCheng(i, m)}wg.Wait() // 使用同步等待组,阻塞主线程fmt.Println(m)
}func jiCheng(n int, m map[int]int64) {var sum int64 = 1for i := 1; i <= n; i++ {sum *= int64(i)}m[n] = sumwg.Done()
}
fatal error: concurrent map writes 对map存在并发写操作
java 解决方案,使用 ConcurrentHashMap 或者 手动加锁
使用互斥锁、写锁
var wg sync.WaitGroup// var mutex sync.Mutex // 互斥锁 实现
var mutex sync.RWMutex // 写锁实现func main() {wg.Add(200)calcJiCheng(200)
}func calcJiCheng(n int) {m := make(map[int]int64)for i := 1; i <= n; i++ {go jiCheng(i, m)}wg.Wait() // 使用同步等待组,阻塞主线程fmt.Println(m)
}func jiCheng(n int, m map[int]int64) {defer mutex.Unlock()mutex.Lock()var sum int64 = 1for i := 1; i <= n; i++ {sum *= int64(i)}m[n] = sumwg.Done()
}
这是低水平程序猿首选,哈哈,高级程序猿使用channel解决
channel 实现
- channel 本质上是一个数据结构 队列
- 数据是先进先出
- 线程安全,多goroutinue访问时,不需要加锁,就是说channel本身就是线程安全的
- channel时有类型的,一个string的channel只能存放string类型数据
无缓冲管道 var ch = make(chan [2]int64)
有写必有读,缺少一个必死锁,
使用range读必须在写入最后一个关闭通道
缓冲管道 var ch = make(chan [2]int64,3)
写入超过缓冲值,必死锁
没有值读取,必死锁
使用range读必须在写入最后一个关闭通道
管道的读写是一个阻塞操作,我更愿意把其当作消息队列来用
var wg sync.WaitGroup
var ch = make(chan [2]int64)func main() {wg.Add(200)calcJiCheng(200)
}func calcJiCheng(n int) {m := make(map[int]int64)for i := 1; i <= n; i++ {go jiCheng(i, n)}for a := range ch {var num int64 = a[0]var value int64 = a[1]m[int(num)] = value}fmt.Println(m)
}func jiCheng(n int, end int) {var sum int64 = 1for i := 1; i <= n; i++ {sum *= int64(i)}ch <- [2]int64{int64(n), sum}wg.Done()// 关闭channelif n == end {close(ch)}
}
使用select可以解决从管道取数据的阻塞问题(无需手动关闭channel了)
func main() {intChan := make(chan int, 10)for i := 0; i < 10; i++ {intChan <- i}strChan := make(chan string, 5)for i := 0; i < 5; i++ {strChan <- strconv.Itoa(i)}// 传统的方法在遍历管道的时候,如果不关闭会阻塞而导致 deadlock// 问题 在实际开发中,可能我们不好确定什么时候关闭该管道// 可以使用select 方式 解决for {select {case v := <-intChan:fmt.Printf("从intChan读取的数据 %d\n", v)case v := <-strChan:fmt.Printf("从strChan读取的数据 %s\n", v)default:fmt.Printf("取不到了,不玩了 ")return}}
}
goroutinue中使用了recover,解决协程中出现panic,导致程序崩溃问题
如果我们起了一个协程,但是这个协程出现了panic,如果我们没有捕获这个panic,就会造成程序崩溃,这是我们在goroutinue中使用recover来捕获panic,进行处理,这样即时这个协程发生问题,但主线程不受影响,继续执行
func main() {go hi()go say()time.Sleep(time.Second * 2)
}
func say() {println("say 3333333")}
func hi() {defer func() {if err := recover(); err != nil {fmt.Println("程序出戳了", err)}}()fmt.Println("hi")var a map[string]inta["2"] = 3}
素数问题
var wg sync.WaitGroup
var ch = make(chan int)
var done = make(chan struct{})func main() {now := time.Now()wg.Add(1000)go printPrime(1000)go func() {for i := range ch {fmt.Println("主线程", i)}close(done)}()wg.Wait()close(ch)<-doneseconds := time.Since(now).Seconds()fmt.Println("time : ", seconds)
}// 打印素数
// 素数定义:仅能被1和它本身整除的大于1的自然数func printPrime(n int) {for i := 1; i <= n; i++ {go isPrime(i, n)}
}// 判断是否是素数func isPrime(n int, end int) bool {defer func() {wg.Done()}()if n <= 1 {return false}sqrt := int(math.Sqrt(float64(n))) // 更高效for i := 2; i <= sqrt; i++ {if n%i == 0 {return false}}ch <- nfmt.Println(n)return true
}