goroutine并发扫描MySQL表_goroutine 并发之搜索文件内容

golang并发编程 - 例子解析

February 26, 2013

最近在看《Programming in Go》, 其中关于并发编程写得很不错, 受益非浅, 其中有一些例子是需要多思考才能想明白的, 所以我打算记录下来, 强化一下思路

《Programming in Go》在 Chapter 7. Concurrent Programming 里面一共用3个例子来讲述并发编程的3个模式, 第一个是 filter , 筛选出后缀名和文件大小文件列表, 还算简单就不说, 然后第二个是升级版, 正则版 filter , 不同的是他是根据正则搜索出文件的文本并且列出来. 这个例子我起初看是有点蒙的, 这样写是没错, 但是为什么要这样写, 他的设计思路是什么, 和其他方法相比他有什么优势, 这些都不清楚, 于是决定好好分析一下. 实际上这个例子实现的功能并不复杂, 所以我的文章实际上是在讨论怎么产生出和作者相似的思路.

如果不考虑用 goroutine 的话, 思路其实很简单:

1. 列出文件列表, 编译正则.

2. 遍历文件, 打开并遍历每行, 如果正则能匹配, 记录下来.

3. 列出来.

如果用 goroutine , 就会有以下思路:

1. 在得到文件路径数组之后, 分发任务给N个核.

2. 每个核负责打开文件, 将符合条件的那行文本写入到 `channel`

3. 主线程等待并接收`channel`的结果. 显示出来, 完毕

** 然后下文才是重点 **

1. channel关闭的时机

在go中, channel 是不会自动关闭的, 所以需要在我们使用完之后手动去关闭, 而且如果使用for语法来遍历channel每次得到的数据, 如果channel没有关闭的话会陷入死循环. 在 goroutine 中会造成 deadlock

for job := range jobs {

fmt.Println(job)

}

如果没close, 会触发dead lock. 因为for...range...会自动阻塞直到读取到数据或者channel关闭, 没close的话就会导致整个channel处于睡眠状态. channel关闭后, 就不允许写入(缓冲的数据还在, 还可以读取), 所以, channel 关闭的时机很重要.

2. 分发任务

我所知道任务分发方法有两种:

第一种是固定分配, 如果说我想计算1+2+3+...+100, 然后分成4份, 也就是 1+2+..+25, ..., ..., 86+87+...+100, 然后再将结果累加起来.

还有一种是抢占式的, 这里需要使用一个队列, 将所有任务写入队列, 然后开N个goroutine, 每个goroutine从队列读取任务(要确保线程安全), 处理, 完成后再继续读取任务. 不再是固定分配, 自己那份做完了就休息了, 所以看来第二种要好一点.

采用第二种方式的话, 对应go的做法, 那就是使用一个channel, 命名为 jobs, 将所有的任务写入进去, 写入完毕之后关闭这个 channel, 当然, 因为是N核, 系统能同时处理的任务我们设置为N个(也就是我们使用了N个goroutine), 那么声明 jobs 是缓冲区长度为N的 channel.

Buffered channel 和普通的 channel 的差别是他可以同时容纳多个单位数据, 当缓存的数据单位数量等于 channel 容量的时候, 再执行写入将会阻塞, 否则都是及时处理的.

3. 结果集

当我们将数据处理后, 就需要将结果收集起来. 需要注意的是, 这些操作不是在主 goroutine 执行, 所以我们需要通过 channel 传递给主 goroutine . 所以只需要在外部声明一个名为 results 的 channel . 然后在主 goroutine 通过 for 来显示, 这时候就会发现一个问题, 这个 results 关闭的时机问题. 正确的关闭时机是写入所有的 Result 之后. 但是别忘了我们同时开了多个 goroutine , 所以 results 应该在 执行任务的 goroutine 完成信号累计到N个 这个时机关闭. 所以我们再引入一个名叫 done 的 channel 来解决. 每个 goroutine 发送完 result 后会写入一次done, 然后我们就可以遍历 done , 遍历之后说明全部完成了, 再执行显示.

Result 的数据结构

type Result struct {

filenamestringlino int

linestring}

书中的 cgrep1 就是这样的

func awaitCompletion(done

close(results)

}

但是这样有可能造成死锁, 因为书中 results 缓冲区长度限定为最大1000个, 也就是超过1000个 result 的时候再打算写入 result 会等待取出 result 后才执行, done 也不会写入, 而 awaitCompletion 是等到所有 goroutine 都完成了才会取出 results, 而且当 result 非常大的时候因为内存的缘故也是不可能一次性取出的. 所以就需要在读取 results 的同时读取 done, 当读取 done 次数大于 N 后关闭 results, 所以, 因为要在多个 channel 中同时读取, 所以需要使用 select.

下面是书中的 cgrep3 , 改进版:

func waitAndProcessResults(timeout int64, done

finish:= time.After(time.Duration(timeout))for working := workers; working > 0; {

select {//Blocking

case result :=

case

}for{

select {//Nonblocking

case result :=

default:

return}

}

}

看到这里, 我就有个疑问, 为什么在全部完成之后(done都接收到N个了), 还要再遍历出 results, 直到读取不到才算读取完成呢(我反应一向比较慢^_^)? 于是我做了个实验, 去掉了后面再次循环的部分, 发现有时会遗漏掉数据(我用4个测试文件...), 证明这段代码是有用的!!!

我的想法是, 他是在处理完 result, 然后写入 results, 写完了才发送 done, 也就是在收到所有的 done 之后, 所有的数据应该是已经处理完成的. 为了验证这个想法, 我写了一下代码:

for working := workers; working > 0; {

select {//Blocking

case result :=

//received result

case

if working <= 0{

println(len(results))

}

}

}

然后看到输出的数是大于0的, 也就是说在接收到全部 done 之后, results 还有数据在缓冲区中, 然后在看看发送result 的代码, 突然就明白了

func doJobs(done chan

job.Do(lineRx)

}done

}

我把写入和读取想当然认为一起发生了, 因为有缓冲区的缘故, doJobs在发送进 results 的缓冲区之后就立刻发送 done 了, 但是写入的数据有没有被处理, 是不知道的, 所以在接收到所有 done 之后, results 缓冲区还有数据, 需要再循环一遍.

附我的代码一份:

package main

import ("bufio"

"fmt"

"log"

"os"

"regexp"

"runtime")

type Job struct {

filenamestringresults chan

}

type Result struct {

filenamestringlinestringlino int

}var worker = runtime.NumCPU()

func main() {//config cpu number

runtime.GOMAXPROCS(worker)

files:= os.Args[2:]

regex, err := regexp.Compile(os.Args[1])if err !=nil {log.Fatal(err)return}//任务列表, 并发数目为CPU个数

jobs := make(chan Job,worker)//结果

results := make(chan Result, minimum(1000,len(files)))

defer close(results)//标记完成

dones := make(chan int,worker)

defer close(dones)

go addJob(files, jobs,results)for i := 0; i < worker; i++{

go doJob(jobs, regex,dones)

}

awaitForCloseResult(dones,results)

}

func addJob(files []string, jobs chan

jobs

}

close(jobs)

}

func doJob(jobs

job.Do(regex)

}

dones

func awaitForCloseResult(dones

working:= 0MyForLable:

for{

select {case result :=

if working >=worker {if rlen := len(results); rlen > 0{

println("----------------------------------")

println("left:",rlen)

println("----------------------------------")for i := 1; i <= rlen; i++{

println(

}

}breakMyForLable

}

}

}

}

func (j*Job) Do(re *regexp.Regexp) {

f, err := os.Open(j.filename)if err !=nil {

println(err)return}

defer f.Close()

b:= bufio.NewReader(f)

lino:= 0

for{

line, _, err := b.ReadLine()if re.Match(line) {

j.results

}if err !=nil {break}

lino+= 1}

}

func minimum(a,b int) int {if a >b {returnb

}returna

}

func println(o...interface{}) {

fmt.Println(o...)

}

转自:http://chenye.org/goroutine-note.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/378800.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软件开发模型和软件过程模型_什么是软件和软件过程?

软件开发模型和软件过程模型软件 (Software) Software is a set of instructions which instructs the computer for performing different operations. Software is nothing else but a general name for computer programs. 软件是一组指令&#xff0c;指导计算机执行不同的操…

甲骨文CEO埃里森称将在Sun裁员1000人

据国外网站报道&#xff0c;甲骨文CEO拉利埃里森周三表示&#xff0c;在完成对Sun的收购后&#xff0c;将对该公司裁员1000人。不过他同时也表示&#xff0c;未来几个月还要新雇2000人加强Sun的业务。 分析师们曾预测甲骨文完成收购后&#xff0c;将在Sun大裁员。不过埃里森和甲…

改变Jupyter的默认项目路径

开始接触Jupyter&#xff0c;看见它默认的工作路径是C盘&#xff0c;很难受想换下工作空间路径 管理员身份打开你的Anaconda Prompt 输入jupyter notebook --generate-config&#xff0c;找到你的配置文件位置 修改一下路径即可 一般情况到这一步就已经修改成功了&#xff…

arm-linux-gcc/ld/objcopy/objdump使用总结[zz]

地址&#xff1a;http://hi.baidu.com/xiaoyue1800/item/a11a2c4a26da4b04c11613d9arm-linux工具的功能如下&#xff1a;arm-linux-addr2line 把程序地址转换为文件名和行号。在命令行中给它一个地址和一个可执行文件名&#xff0c;它就会使用这个可执行文件的调试信息指出在给…

图像分割-LOG检测器和DOG检测器

边缘检测是以较小的算子为基础的&#xff0c;具有两个建议 1、灰度变化与图像尺寸无关&#xff0c;因此检测要求使用不同尺寸的算子。 2、灰度的突然变化会在一阶导数产生波峰波谷&#xff0c;在二阶导数产生零交叉 大的算子检测模糊边缘&#xff0c;小的算子检测锐度集中的细节…

java const string_深入研究Java String

开始写 Java 一年来&#xff0c;一直都是遇到什么问题再去解决&#xff0c;还没有主动的深入的去学习过 Java 语言的特性和深入阅读 JDK 的源码。既然决定今后靠 Java吃饭&#xff0c;还是得花些心思在上面&#xff0c;放弃一些打游戏的时间&#xff0c;系统深入的去学习。Java…

python 示例_带有示例的Python字典update()方法

python 示例字典update()方法 (Dictionary update() Method) update() method is used to update the dictionary by inserting new items to the dictionary. update()方法用于通过将新项目插入字典来更新字典。 Syntax: 句法&#xff1a; dictionary_name.setdefault(itera…

Rsync 使用指南

Rsync是个相当棒的同步工具&#xff0c;比如&#xff1a;1. 如何做本地两个目录之间的同步&#xff1f;rsync -av --delete --force ~/Desktop/Miscs/ /media/disk/DesktopMiscs 这样就可以做~/Desktop/Miscs目录的镜像了。/media/disk是我的移动硬盘的挂载点。这里关键有个问题…

C++——统计多行单个字符类型个数

键盘输入n个字符&#xff0c;请分别统计大写字母、小写字母、数字、其他字符的个数并输出&#xff1b;还需要输出所有数字字符之和 【输入形式】 第一行为一个整数n(100 > n > 0)&#xff0c;接下来n行每行一个字符 【输出形式】 输出第1行为4个整数&#xff0c;分别…

安卓项目4

经历两天的琢磨&#xff0c;终于把android连接服务器端php&#xff0c;读取mysql这一块弄好了。 先说说这几天遇到的问题。 http://wenku.baidu.com/view/87ca3bfa700abb68a982fbca.html 这是我参照的资料&#xff0c;原先我一度认为是不能实例化ServiceLink类&#xff0c;后来…

system getenv_Java System类getenv()方法及示例

system getenv系统类getenv()方法 (System class getenv() method) getenv() method is available in java.lang package. getenv()方法在java.lang包中可用。 getenv() method is used to return an unmodifiable Map of the current environment variable in key-value pairs…

用ASP获取客户端IP地址的方法

要想透过代理服务器取得客户端的真实IP地址&#xff0c;就要使用 Request.ServerVariables("HTTP_X_FORWARDED_FOR") 来读取。不过要注意的事&#xff0c;并不是每个代理服务器都能用 Request.ServerVariables("HTTP_X_FORWARDED_FOR") 来读取客户端的真实…

C++——已知a+b、 a+c、b+c、 a+b+c,求a、b、 c

有三个非负整数a、b、 C,现按随机顺序给出它们的两两和以及总和4个整数&#xff0c;即ab、 ac、bc、 abc, 注意,给出的4个数的顺序是随机的&#xff0c;请根据这四个数求出a、b、c是多少? [输入形式] 输入为一-行4个正整数, x1、 x2、x3、 x4 (0≤xi≤10^9) &#xff0c;表示…

DDD:DomainEvent、ApplicationEvent、Command

Command&#xff1a;纵向传递&#xff0c;跨分层&#xff0c;在控制器层和应用层之间传递。 DomainEvent&#xff1a;横向传递&#xff0c;跨聚合&#xff0c;在一个DLL中。 ApplicationEvent&#xff1a;横向传递&#xff0c;跨模块&#xff0c;在不同的DLL中。转载于:https:/…

表示和描述-边界追踪

边界追踪目标&#xff1a; 输入&#xff1a;某一区域的点 输出&#xff1a;这一区域的点的坐标序列&#xff08;顺时针或逆时针&#xff09; Moore边界追踪法&#xff1a; 两个前提条件&#xff1a; 1、图像为二值化后的图像&#xff08;目标为1&#xff0c;背景为0&#xff0…

视频的读取与处理

读取本地视频&#xff0c;以灰度视频输出 import cv2vc cv2.VideoCapture(E:\Jupyter_workspace\study\data/a.mp4)#视频路径根据实际情况而定#检查是否打开正确 if vc.isOpened():open,fream vc.read()#read()返回两个参数&#xff0c;第一个参数为打开成功与否True or Fal…

更灵活的定位内存地址的方法05 - 零基础入门学习汇编语言36

第七章&#xff1a;更灵活的定位内存地址的方法05 让编程改变世界 Change the world by program 问题7.8 [codesyntax lang"asm"] assume cs:codesg,ds:datasg datasg segment db ibm db dec db dos db vax …

nextgaussian_Java Random nextGaussian()方法与示例

nextgaussian随机类nextGaussian()方法 (Random Class nextGaussian() method) nextGaussian() method is available in java.util package. nextGaussian()方法在java.util包中可用。 nextGaussian() method is used to generate the next pseudo-random Gaussian double valu…

Java PriorityQueue clear()方法与示例

PriorityQueue类clear()方法 (PriorityQueue Class clear() method) clear() method is available in java.util package. clear()方法在java.util包中可用。 clear() method is used to remove all the objects from this PriorityQueue. clear()方法用于从此PriorityQueue中删…