环境:
python 3.8
代码:
# 爬取博客园内容
# https://www.cnblogs.com/import re
from lxml import etree
import requests
import json
import threading
from queue import Queue
import pymysql
import timeclass HeiMa:def __init__(self):# 请求头self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) ''AppleWebKit/537.36 (KHTML, like Gecko)''Chrome/90.0.4430.212 Safari/537.36'}self.url_queue = Queue() # 网址队列self.html_queue = Queue() # 网页源代码队列self.content_queue = Queue() # 网页数据队列# 创建连接对象self.conn= pymysql.connect(host='127.0.0.1', # 本机就写:localhostport=3306, # 要连接到的数据库端口号,MySQL是3306user='root', # 数据库的用户名password='root', # 数据库的密码database='boke', # 要操作的数据库charset='utf8' # 码表)# 创建游标 - -可执行SQL语句的对象self.cursor = self.conn.cursor()#开始事务self.conn.begin()def get_url_queue(self):url_temp = "https://www.cnblogs.com/sitehome/p/{}"# 构造请求URLurl_list = [url_temp.format(i) for i in range(1, 100)]for url in url_list:# print(url)self.url_queue.put(url) # 将构造的请求URL添加到网址队列中def get_html_queue(self):while True:# 从网址队列中取出请求URLurl = self.url_queue.get()html_source_page = requests.get(url, headers=self.headers).textself.html_queue.put(html_source_page)# 向网址队列发送完成信号self.url_queue.task_done()def parse_html(self):while True:content_list = []html = self.html_queue.get()html_str = etree.HTML(html)node_list = html_str .xpath('//div[@id="post_list"]')title_num = 0for node in node_list:# 文章标题title = node.xpath('./article/section/div/a/text()')[0]# 文章链接url = node.xpath('./article/section/div/a/@href')[0]# 文章作者author = node.xpath('./article/section/footer/a[@class="post-item-author"]/span/text()')[0]# 发布时间(具体日期)release_time = node.xpath('./article/section/footer/span[@class="post-meta-item"]/span/text()')[0]item = {"文章标题": title,"文章链接": url,"文章作者": author,'发布时间': release_time,}content_list.append(item)title_num += 1self.content_queue.put(content_list)self.html_queue.task_done()def save_data(self):while True:content_list = self.content_queue.get()with open("thread-heima.json", "a+", encoding='utf-8')as f:f.write(json.dumps(content_list, ensure_ascii=False, indent=2))self.content_queue.task_done()def save_data_mysql(self):while True:content_list = self.content_queue.get()for item in content_list:pattern = "[\',\"]"title = re.sub(pattern, '-', item['文章标题'])# title = item['文章标题']url = item['文章链接']author = re.sub(pattern, '-', item['文章作者'])# author = item['文章作者']release_time = item['发布时间']# 添加到数据库sql=f'insert into news VALUES (NULL,"{title}","{url}","{author}","{release_time}")'print(sql)self.cursor.execute(sql)self.content_queue.task_done()def run(self):thread_list = []# 构造URL地址线程t_url = threading.Thread(target=self.get_url_queue)thread_list.append(t_url)# 获取网页源代码for page in range(9):t_content = threading.Thread(target=self.get_html_queue)thread_list.append(t_content)# 解析网页数据队列for j in range(9):t_content = threading.Thread(target=self.parse_html)thread_list.append(t_content)# t_save = threading.Thread(target=self.save_data)t_save = threading.Thread(target=self.save_data_mysql)thread_list.append(t_save)for t in thread_list:t.setDaemon(True)t.start()for q in [self.url_queue, self.html_queue, self.content_queue]:# join():在子线程完成运行之前,这个子线程的父类进程一直被阻塞.# 让join()的子线程先执行,暂时不执行主线程的代码,主线程一直被阻塞,直到join()的子线程完成之后才开始执行主线程# print(q)q.join()if __name__ == '__main__':heima = HeiMa()heima.run()try:# 提交事务,把插入的数据写入到数据库heima.conn.commit()except:# 发生异常就回滚事务,保证所有数据都不插入,保证数据的一致性heima.conn.rollback()# 释放数据库资源heima.cursor.close()heima.conn.close()print("抓取完毕!")