python3异步协程爬虫_Python实现基于协程的异步爬虫

Python实现基于协程的异步爬虫

一、课程介绍

1. 课程来源

本课程核心部分来自《500 lines or less》项目,作者是来自 MongoDB 的工程师 A. Jesse Jiryu Davis 与 Python 之父 Guido van Rossum。项目代码使用 MIT 协议,项目文档使用 http://creativecommons.org/licenses/by/3.0/legalcode 协议。

课程内容在原文档基础上做了稍许修改,增加了部分原理介绍,步骤的拆解分析及源代码注释。

2. 内容简介

传统计算机科学往往将大量精力放在如何追求更有效率的算法上。但如今大部分涉及网络的程序,它们的时间开销主要并不是在计算上,而是在维持多个Socket连接上。亦或是它们的事件循环处理的不够高效导致了更多的时间开销。对于这些程序来说,它们面临的挑战是如何更高效地等待大量的网络事件并进行调度。目前流行的解决方式就是使用异步I/O。

本课程将探讨几种实现爬虫的方法,从传统的线程池到使用协程,每节课实现一个小爬虫。另外学习协程的时候,我们会从原理入手,以ayncio协程库为原型,实现一个简单的异步编程模型。

本课程实现的爬虫为爬一个整站的爬虫,不会爬到站点外面去,且功能较简单,主要目的在于学习原理,提供实现并发与异步的思路,并不适合直接改写作为日常工具使用。

3. 课程知识点

本课程项目完成过程中,我们将学习:

线程池实现并发爬虫

回调方法实现异步爬虫

协程技术的介绍

一个基于协程的异步编程模型

协程实现异步爬虫

二、实验环境

本课程使用Python 3.4,所以本课程内运行py脚本都是使用python3命令。

打开终端,进入 Code 目录,创建 crawler 文件夹, 并将其作为我们的工作目录。

$ cd Code

$ mkdir crawler && cd crawler

环保起见,测试爬虫的网站在本地搭建。

我们使用 Python 2.7 版本官方文档作为测试爬虫用的网站

wget http://labfile.oss.aliyuncs.com/courses/574/python-doc.zip

unzip python-doc.zip

安装serve,一个用起来很方便的静态文件服务器:

sudo npm install -g serve

启动服务器:

serve python-doc

如果访问不了npm的资源,也可以用以下方式开启服务器:

ruby -run -ehttpd python-doc -p 3000

访问localhost:3000查看网站:

wm

三、实验原理

什么是爬虫?

网络爬虫(又被称为网页蜘蛛,网络机器人,在FOAF社区中间,更经常的称为网页追逐者),是一种按照一定的规则,自动地抓取万维网信息的程序或者脚本。

爬虫的工作流程

网络爬虫基本的工作流程是从一个根URL开始,抓取页面,解析页面中所有的URL,将还没有抓取过的URL放入工作队列中,之后继续抓取工作队列中的URL,重复抓取、解析,将解析到的url放入工作队列的步骤,直到工作队列为空为止。

线程池、回调、协程

我们希望通过并发执行来加快爬虫抓取页面的速度。一般的实现方式有三种:

线程池方式:开一个线程池,每当爬虫发现一个新链接,就将链接放入任务队列中,线程池中的线程从任务队列获取一个链接,之后建立socket,完成抓取页面、解析、将新连接放入工作队列的步骤。

回调方式:程序会有一个主循环叫做事件循环,在事件循环中会不断获得事件,通过在事件上注册解除回调函数来达到多任务并发执行的效果。缺点是一旦需要的回调操作变多,代码就会非常散,变得难以维护。

协程方式:同样通过事件循环执行程序,利用了Python 的生成器特性,生成器函数能够中途停止并在之后恢复,那么原本不得不分开写的回调函数就能够写在一个生成器函数中了,这也就实现了协程。

四、实验一:线程池实现爬虫

使用socket抓取页面需要先建立连接,之后发送GET类型的HTTP报文,等待读入,将读到的所有内容存入响应缓存。

def fetch(url):

sock = socket.socket()

sock.connect(('localhost.com', 3000))

request = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url)

sock.send(request.encode('ascii'))

response = b''

chunk = sock.recv(4096)

while chunk:

response += chunk

chunk = sock.recv(4096)

links = parse_links(response)

q.add(links)

默认的socket连接与读写是阻塞式的,在等待读入的这段时间的CPU占用是被完全浪费的。

多线程

默认这部分同学们都是学过的,所以就粗略记几个重点,没学过的同学可以直接参考廖雪峰的教程:廖雪峰的官方网站-Python多线程

导入线程库:

import threading

开启一个线程的方法:

t = 你新建的线程

t.start() #开始运行线程

t.join() #你的当前函数就阻塞在这一步直到线程运行完

建立线程的两种方式:

#第一种:通过函数创建线程

def 函数a():

pass

t = threading.Thread(target=函数a,name=自己随便取的线程名字)

#第二种:继承线程类

class Fetcher(threading.Thread):

def __init__(self):

Thread.__init__(self):

#加这一步后主程序中断退出后子线程也会跟着中断退出

self.daemon = True

def run(self):

#线程运行的函数

pass

t = Fetcher()

线程同时操作一个全局变量时会产生线程竞争所以需要锁:

lock = threading.Lock()

lock.acquire() #获得锁

#..操作全局变量..

lock.release() #释放锁

多线程同步-队列

多线程同步就是多个线程竞争一个全局变量时按顺序读写,一般情况下要用锁,但是使用标准库里的Queue的时候它内部已经实现了锁,不用程序员自己写了。

导入队列类:

from queue import Queue

创建一个队列:

q = Queue(maxsize=0)

maxsize为队列大小,为0默认队列大小可无穷大。

队列是先进先出的数据结构:

q.put(item) #往队列添加一个item,队列满了则阻塞

q.get(item) #从队列得到一个item,队列为空则阻塞

还有相应的不等待的版本,这里略过。

队列不为空,或者为空但是取得item的线程没有告知任务完成时都是处于阻塞状态

q.join() #阻塞直到所有任务完成

线程告知任务完成使用task_done

q.task_done() #在线程内调用

实现线程池

创建thread.py文件作为爬虫程序的文件。

我们使用seen_urls来记录已经解析到的url地址:

seen_urls = set(['/'])

创建Fetcher类:

class Fetcher(Thread):

def __init__(self, tasks):

Thread.__init__(self)

#tasks为任务队列

self.tasks = tasks

self.daemon = True

self.start()

def run(self):

while True:

url = self.tasks.get()

print(url)

sock = socket.socket()

sock.connect(('localhost', 3000))

get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url)

sock.send(get.encode('ascii'))

response = b''

chunk = sock.recv(4096)

while chunk:

response += chunk

chunk = sock.recv(4096)

#解析页面上的所有链接

links = self.parse_links(url, response)

lock.acquire()

#得到新链接加入任务队列与seen_urls中

for link in links.difference(seen_urls):

self.tasks.put(link)

seen_urls.update(links)

lock.release()

#通知任务队列这个线程的任务完成了

self.tasks.task_done()

使用正则库与url解析库来解析抓取的页面,这里图方便用了正则,同学也可以用Beautifulsoup等专门用来解析页面的Python库:

import urllib.parse

import re

在Fetcher中实现parse_links解析页面:

def parse_links(self, fetched_url, response):

if not response:

print('error: {}'.format(fetched_url))

return set()

if not self._is_html(response):

return set()

#通过href属性找到所有链接

urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''',

self.body(response)))

links = set()

for url in urls:

#可能找到的url是相对路径,这时候就需要join一下,绝对路径的话就还是会返回url

normalized = urllib.parse.urljoin(fetched_url, url)

#url的信息会被分段存在parts里

parts = urllib.parse.urlparse(normalized)

if parts.scheme not in ('', 'http', 'https'):

continue

host, port = urllib.parse.splitport(parts.netloc)

if host and host.lower() not in ('localhost'):

continue

#有的页面会通过地址里的#frag后缀在页面内跳转,这里去掉frag的部分

defragmented, frag = urllib.parse.urldefrag(parts.path)

links.add(defragmented)

return links

#得到报文的html正文

def body(self, response):

body = response.split(b'\r\n\r\n', 1)[1]

return body.decode('utf-8')

def _is_html(self, response):

head, body = response.split(b'\r\n\r\n', 1)

headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:])

return headers.get('Content-Type', '').startswith('text/html')

实现线程池类与main的部分:

class ThreadPool:

def __init__(self, num_threads):

self.tasks = Queue()

for _ in range(num_threads):

Fetcher(self.tasks)

def add_task(self, url):

self.tasks.put(url)

def wait_completion(self):

self.tasks.join()

if __name__ == '__main__':

start = time.time()

#开4个线程

pool = ThreadPool(4)

#从根地址开始抓取页面

pool.add_task("/")

pool.wait_completion()

print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))

运行效果

这里先贴出完整代码:

from queue import Queue

from threading import Thread, Lock

import urllib.parse

import socket

import re

import time

seen_urls = set(['/'])

lock = Lock()

class Fetcher(Thread):

def __init__(self, tasks):

Thread.__init__(self)

self.tasks = tasks

self.daemon = True

self.start()

def run(self):

while True:

url = self.tasks.get()

print(url)

sock = socket.socket()

sock.connect(('localhost', 3000))

get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url)

sock.send(get.encode('ascii'))

response = b''

chunk = sock.recv(4096)

while chunk:

response += chunk

chunk = sock.recv(4096)

links = self.parse_links(url, response)

lock.acquire()

for link in links.difference(seen_urls):

self.tasks.put(link)

seen_urls.update(links)

lock.release()

self.tasks.task_done()

def parse_links(self, fetched_url, response):

if not response:

print('error: {}'.format(fetched_url))

return set()

if not self._is_html(response):

return set()

urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''',

self.body(response)))

links = set()

for url in urls:

normalized = urllib.parse.urljoin(fetched_url, url)

parts = urllib.parse.urlparse(normalized)

if parts.scheme not in ('', 'http', 'https'):

continue

host, port = urllib.parse.splitport(parts.netloc)

if host and host.lower() not in ('localhost'):

continue

defragmented, frag = urllib.parse.urldefrag(parts.path)

links.add(defragmented)

return links

def body(self, response):

body = response.split(b'\r\n\r\n', 1)[1]

return body.decode('utf-8')

def _is_html(self, response):

head, body = response.split(b'\r\n\r\n', 1)

headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:])

return headers.get('Content-Type', '').startswith('text/html')

class ThreadPool:

def __init__(self, num_threads):

self.tasks = Queue()

for _ in range(num_threads):

Fetcher(self.tasks)

def add_task(self, url):

self.tasks.put(url)

def wait_completion(self):

self.tasks.join()

if __name__ == '__main__':

start = time.time()

pool = ThreadPool(4)

pool.add_task("/")

pool.wait_completion()

print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))

运行python3 thread.py命令查看效果(记得先开网站服务器):

wm

使用标准库中的线程池

线程池直接使用multiprocessing.pool中的ThreadPool:

代码更改如下:

from multiprocessing.pool import ThreadPool

#...省略中间部分...

#...去掉Fetcher初始化中的self.start()

#...删除自己实现的ThreadPool...

if __name__ == '__main__':

start = time.time()

pool = ThreadPool()

tasks = Queue()

tasks.put("/")

Workers = [Fetcher(tasks) for i in range(4)]

pool.map_async(lambda w:w.run(), Workers)

tasks.join()

pool.close()

print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))

使用ThreadPool时,它处理的对象可以不是线程对象,实际上Fetcher的线程部分ThreadPool根本用不到。因为它自己内部已开了几个线程在等待任务输入。这里偷个懒就只把self.start()去掉了。可以把Fetcher的线程部分全去掉,效果是一样的。

ThreadPool活用了map函数,这里它将每一个Fetcher对象分配给线程池中的一个线程,线程调用了Fetcher的run函数。这里使用map_async是因为不希望它在那一步阻塞,我们希望在任务队列join的地方阻塞,那么到队列为空且任务全部处理完时程序就会继续执行了。

运行python3 thread.py命令查看效果:

wm

线程池实现的缺陷

我们希望爬虫的性能能够进一步提升,但是我们没办法开太多的线程,因为线程的内存开销很大,每创建一个线程可能需要占用50k的内存。以及还有一点,网络程序的时间开销往往花在I/O上,socket I/O 阻塞时的那段时间是完全被浪费了的。那么要如何解决这个问题呢?

下节课你就知道啦,下节课见~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/340605.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

去掉前后空格_mysql批量去掉某个字段字符中的空格

mysql有什么办法批量去掉某个字段字符中的空格&#xff1f;不仅是字符串前后的空格&#xff0c;还包含字符串中间的空格&#xff0c;答案是 replace&#xff0c;使用mysql自带的 replace 函数&#xff0c;另外还有个 trim 函数。 &#xff08;1&#xff09;mysql replace 函数 …

成为Java流大师–第6部分:使用流创建新的数据库应用程序

您是否曾经想开发数据库应用程序的“快速”版本&#xff1f; 在此动手实验文章中&#xff0c;您将学习一种真正简单而直接的方法。 整个Java域模型将自动为您生成。 您只需连接到现有数据库&#xff0c;然后开始使用Java流进行开发。 例如&#xff0c;您将能够在几分钟内为您现…

快速排序 挖坑_由浅入深玩转快速排序算法

由浅入深玩转快速排序算法快速排序可以说是最快的通用排序算法&#xff0c;它甚至被誉为20世纪科学和工程领域的十大算法之一。在众多排序算法中其无论是时间复杂度还是空间复杂度都颇具优势。作为开发工程师&#xff0c;我们很有必要了解它的思想。接下来将由在下为大家一步步…

代码拾取图片某一点的颜色_RPG游戏开发日志7:道具拾取与存放

本项目同步上传于github和coding上&#xff0c;国内读者可以通过在coding下载项目。也欢迎你加入我的UE4学习交流QQ群&#xff1a;872537977。如果你喜欢我写的文章&#xff0c;也希望你点赞、收藏、转发。谢谢&#xff01;如果你喜欢我写的文章&#xff0c;也希望你点赞、收藏…

abp vue如何配置服务地址_DHCP服务如何配置才能尽量减少被攻击的可能

DHCP Snooping是啥&#xff1f;DHCP Snooping是DHCP的一种安全特性&#xff0c;用来保证DHCP客户端能够正确的从DHCP服务器获取IP地址&#xff0c;防止网络中针对DHCP的攻击。DHCP Snooping是如何防止DHCP攻击的呢&#xff1f;DHCP&#xff0c;动态主机配置协议&#xff0c;在I…

通达信缠论买卖点公式_通达信缠论多空主图指标公式

1.高位无量就拿&#xff0c;就算拿错了也要拿着。高位就是说股价处于或者接近历史的最高位。高位无量横盘&#xff0c;是非常典型的上涨中继形态之一&#xff0c;高位的窄幅箱型振荡。2.高位放量出现就要跑,哪怕是跑错也要跑。股价在已经过了一段时间比较大的涨幅后&#xff0c…

多节锂电串联保护板ic_如何有效保护锂电池板,一款优质的MOS管就能解决

锂电池几乎应用于我们日常接触到的各类电器之中&#xff0c;但如何保护锂电池&#xff0c;你又是否知道呢&#xff1f;其实在锂电池保护板&#xff0c;最主要的元器件是IC与MOS。MOS对锂电池板的保护作用非常大&#xff0c;它可以检测过充电&#xff0c;检测过放电&#xff0c;…

使用pp架构形成计算机集群请求的地址无效_干货!史上最详细脑图《大型网站技术架构》...

1. 介绍一下《大型网站技术架构》这本书可能很多人都看过&#xff0c;小编个人觉得真的是非常不错的一本书。看完这本书后&#xff0c;你会对如何设计大型网站架构&#xff0c;有非常清晰的思路。如果还没有读过的小伙伴&#xff0c;赶紧去读一读吧。PS&#xff1a;小编这里有这…

泰坦尼克号数据_数据分析-泰坦尼克号乘客生存率预测

项目背景目标预测一个乘客是否能够在泰坦尼克号事件中幸存。概述1912年4月15日&#xff0c;泰坦尼克号在首次航行期间撞上冰山后沉没&#xff0c;船上共有2224名人员&#xff08;包括乘客和机组人员&#xff09;&#xff0c;共有1502人不幸遇难。造成海难失事的原因之一是乘客和…

linux mysql服务器安装_Linux服务器MySQL安装

Linux服务器MySQL安装1. MySQL官网下载如图&#xff1a;2. 安装MySQL[rootiZ2zebb0428roermd00462Z /]# rpm -ivh https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch#过滤MySQL&#xff1a;[rootiZ2zebb0428roermd00462Z /]#yum repolist all |grep mysql#ena…

python3 array为什么不能放不同类型的数据_来自俄罗斯的凶猛彪悍的分析数据库ClickHouse...

点击上方蓝色字体&#xff0c;选择“设为星标”回复”资源“获取更多资源大数据技术与架构点击右侧关注&#xff0c;大数据开发领域最强公众号&#xff01;暴走大数据点击右侧关注&#xff0c;暴走大数据&#xff01;ClickHouse相关文章推荐&#xff1a;战斗民族开源 | ClickHo…

mysql锁表查询_Mysql数据库锁情况下开启备份导致数据库无法访问处理分享

[背景简介]MySQL是一种开放源代码的关系型数据库管理系统(RDBMS)&#xff0c;因为其速度、可靠性和适应性而备受关注。大多数人都认为在不需要事务化处理的情况下&#xff0c;MySQL是管理内容最好的选择。mysql虽然功能未必很强大&#xff0c;但因为它的开源、广泛传播&#xf…

mysql直接执行文件格式_Windows 环境下执行 .sql 格式文件方式

windows 命令行中有2种执行 .sql 文件的方式&#xff1a;直接行文件 和 先进入mysql命令行然后执行文件。具体操作如下:1. 直接在windows命令行执行。打开windows命令行(winR打开运行窗口然后输入cmd&#xff0c;回车)&#xff0c;进入mysql的本机地址&#xff0c;如果配置了环…

Java大数据处理的流行框架

大数据挑战 在公司需要处理不断增长的数据量的各个领域中&#xff0c;对大数据的概念有不同的理解。 在大多数这些情况下&#xff0c;需要以某种方式设计所考虑的系统&#xff0c;以便能够处理该数据&#xff0c;而不会随着数据大小的增加而牺牲吞吐量。 从本质上讲&#xff0c…

带有Prometheus的Spring Boot和测微表第6部分:保护指标

以前&#xff0c;我们使用Prometheus成功启动了Spring Boot应用程序。 Spring应用程序中的一个端点正在公开我们的指标数据&#xff0c;以便Prometheus能够检索它们。 想到的主要问题是如何保护此信息。 Spring已经为我们提供了强大的安全框架 因此&#xff0c;将其轻松用于…

使用AWS Elastic Beanstalk轻松进行Spring Boot部署

朋友不允许朋友写用户身份验证。 厌倦了管理自己的用户&#xff1f; 立即尝试Okta的API和Java SDK。 在几分钟之内即可对任何应用程序中的用户进行身份验证&#xff0c;管理和保护。 几乎所有应用程序都依赖于身份验证。 开发人员以及雇用他们的公司都想确认谁在发出请求&…

mysql报错乱码_连接mysql服务器报错时,出现乱码

页头用了header(content-type:text/html;charsetutf-8);try{$this->dbonew PDO($dsn,$dbuser,$dbpassword);}catch(Exception $e){echo $e->getMessage();}连接失败时会报错&#xff0c;但是乱码&#xff0c;IE下编码查看是UTF-8&#xff0c;但是是乱码&#xff0c;如果选…

zookeeper 负载_ZooKeeper,策展人以及微服务负载平衡的工作方式

zookeeper 负载Zookeeper如何确保每个工人都能从工作委托经理那里愉快地完成工作。 Apache ZooKeeper是注册&#xff0c;管理和发现在不同计算机上运行的服务的工具。 当我们必须处理具有许多节点的分布式系统时&#xff0c;它是技术堆栈中必不可少的成员&#xff0c;这些节点…

高效的企业测试-集成测试(3/6)

本系列的这一部分将展示如何通过代码级以及系统级集成测试来验证我们的应用程序。 &#xff08;代码级&#xff09;集成测试 集成测试一词有时在不同的上下文中使用不同。 根据Wikipedia的定义&#xff0c;我指的是在代码级别上验证多个组件之间相互作用的测试。 通常&#x…

带Prometheus的Spring Boot和测微表第4部分:基础项目

在以前的文章中&#xff0c;我们介绍了Spring Micrometer和InfluxDB。 所以你要问我为什么普罗米修斯。 原因是Prometheus在InfluxDB的拉模型与推模型上进行操作。 这意味着&#xff0c;如果将千分尺与InfluxDB一起使用&#xff0c;则在将结果推送到数据库中时肯定会有一些开…