爬取cos猎人
数据库管理主要分为4个模块,代理获取模块,代理储存模块,代理测试模块,爬取模块
cos猎人已经倒闭,所以放出爬虫源码
api.py 为爬虫评分提供接口支持
import requests
import concurrent.futures
import redis
import random
import flask # 导入flask模块
from flask import request # 获取url地址中查询参数
from flask import jsonify # 可以把对象转换为字符串
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379
REDIS_DATABASE = 0
REDISOBJECT = 'proxysss'"""时间间隔配置"""
GETTER_PROXY = 60*5
VERIFY_PROXY = 60*3class RedisClient:def __init__(self, host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DATABASE):self.db = redis.Redis(host=host, port=port, db=db, decode_responses=True)def exists(self, proxy):"""判断传入代理有没有存输到数据库有TRUE,没有Falseis比not优先级高"""return not self.db.zscore(REDISOBJECT, proxy) is Nonedef add(self, proxy, score=10):"""添加代理到数据库,设置初始分数为10分决定是否加入新代理"""if not self.exists(proxy):return self.db.zadd(REDISOBJECT, {proxy: score})def random(self):"""随机选择一个代理尝试获取评分为100分的代理获取指定范围的代理如果数据库没有代理就提示数据库为空"""proxies = self.db.zrangebyscore(REDISOBJECT, 100, 100)if len(proxies):return random.choice(proxies)proxies = self.db.zrangebyscore(REDISOBJECT, 1, 99)if len(proxies):return random.choice(proxies)print("-----数据库为空----")def decrease(self, proxy):"""传入代理如果检测不过关,降低代理分数"""self.db.zincrby(REDISOBJECT, -10, proxy)score = self.db.zscore(REDISOBJECT, proxy) # 查询分数if score <= 0:self.db.zrem(REDISOBJECT, proxy) # 删除代理def max(self, proxy):"""检测代理可用,就将代理设置最大分数"""return self.db.zadd(REDISOBJECT, {proxy: 100})def count(self):"""获取数据库中代理的数量"""return self.db.zcard(REDISOBJECT)def all(self):"""获取所有代理,返回列表"""proxies = self.db.zrangebyscore(REDISOBJECT,1,100)if proxies:return proxieselse:print('-----数据库无代理----')def count_for_num(self,number):"""指定数量获取代理,返回一个列表"""all_proxies = self.all()proxies = random.sample(all_proxies,k=number)#随机取数据,不重样return proxiesdef get_proxy():return requests.get("http://127.0.0.1:5010/all").json()def delete_proxy(proxy):requests.get("http://127.0.0.1:5010/delete/?proxy={}".format(proxy))# getHtml()
# def verify_thread_pool():
# """线程池检测代理
# 1.从数据库中取到所有代理
# 2.用线程池检测代理"""
# proxies_list = client.all() # 列表
# with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
# for proxy in proxies_list:
# executor.submit(verify_proxy, proxy)#
#
# TEST_URL = "https://www.baidu.com/"
# headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36'}
#
#
#
#
# def verify_proxy(proxy):
# """检测代理是否可用"""
# proxies = {
# "http": "http://" + proxy,
# "https": "https://" + proxy
# }
# try:
# response = requests.get(url=TEST_URL, headers=headers, proxies=proxies, timeout=2)
# if response.status_code in [200, 206, 302]:
# """#判断请求返回的状态码是否成功
# 请求成功设为100分,调用max
# 请求不成功,将代理降分,调用decrease"""
# client.max(proxy)
# print("***代理可用***", proxy)
# else:
# client.decrease(proxy)
# print("--状态码不合法--", proxy)
# except:
# """请求超时,表示代理不可用"""
# client.decrease(proxy)
# print("===请求超时===")
#
# # 检测速度太慢,引入多任务,多线程
# def verify_thread_pool():
# """线程池检测代理
# 1.从数据库中取到所有代理
# 2.用线程池检测代理"""
# proxies_list = client.all() # 列表
# with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
# for proxy in proxies_list:
# executor.submit(verify_proxy, proxy)
#
# if __name__ == '__main__':
# # proxy = [
# # '45.234.63.220:999',
# # '60.168.255.69:8060',
# # '65.108.27.185:3128',
# # '62.162.91.205:3129',
# # '37.34.236.15:80'
# # ]
# # for pro in proxy:
# # verify_proxy(pro)
# verify_thread_pool()
getter.py从数据库抽取一个代理
import requests
def get_one_proxy():return requests.get("http://127.0.0.1:5000/all")
print(get_one_proxy().text)
sever.py搭建本地服务器供调用
import flask # 导入flask模块
from api import RedisClient
from flask import request # 获取url地址中查询参数
from flask import jsonify # 可以把对象转换为字符串app = flask.Flask(__name__)client = RedisClient()
@app.route('/')
# 将下面的函数挂载到路由
def index():"""视图函数:http://demo.spiderpy.cn/get/视图函数返回的数据,只能返回字符串类型的数据"""return '<h2>欢迎来到代理池</h2>'@app.route('/get')
def get_proxy():"""随机获取一个代理,调用数据库random模块"""one_proxy = client.random()return one_proxy@app.route('/getcount')
def get_any_proxy():"""获取指定数量一个代理,调用数据库的 count_for_num()拿到查询参数的值又可能用户没有传递查询参数,num返回为空"""num = request.args.get('num', '')if not num:"""没有获取到查询参数"""num = 1else:num = int(num)any_proxy = client.count_for_num(num)return jsonify(any_proxy)@app.route('/getnum')
def get_count_proxy():"""获取所有代理数量,调用数据库count方法"""count_proxy = client.count()return f"代理可用的数量为:{count_proxy}个"@app.route('/getall')
def get_all_proxy():"""获取所有代理,调用数据库的all()"""all_proxy = client.all()return jsonify(all_proxy)if __name__ == '__main__':"""运行实例化的app对象"""app.run()
test_self.py和tests.py对已经储存的代理质量进行检测
记不清哪个效果更好
import timeimport requests
from api import RedisClient
clients = RedisClient()
def get_proxy():return requests.get("http://127.0.0.1:5000/getall")a = get_proxy()
a = a.json()
# print(a)
# for b in a:
# print(b)
# print(type(a))def getHtml():# retry_count = 1for proxy in a:# print(proxy)try:html = requests.get('http://www.example.com', proxies={"http": "http://{}".format(proxy)},timeout=4)# print(html.text)if html.status_code in [200, 206, 302]:print(proxy,":可以使用")clients.add(proxy)# 使用代理访问except Exception:print("代理不可用", proxy)clients.decrease(proxy)# 删除代理池中代理# delete_proxy(proxy)
while True:getHtml()time.sleep(60*2)
进程池爬取cos猎人.py 主爬虫代码
from typing import List, Any
from concurrent.futures import ThreadPoolExecutor
import requests
import os
from lxml import etree
import re
if not os.path.exists('./img'):os.makedirs("img")
def get_one_proxy():return requests.get("http://127.0.0.1:5000/get")
proxies = get_one_proxy().text
# proxies = ''def down_img(img_url):for urls in img_url:response = requests.get(url=urls, headers=headers)name = urls.split("/")[-1]with open("./img/"+f'{name}', 'wb') as f:f.write(response.content)headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.58","referer": "https://www.coshunter.pro/simo"
}a = 252#354
while a < 355:url = f"https://www.coshunter.pro/shop/buy/page/{a}"res = requests.get(url, headers=headers, proxies={"http": "http://{}".format(proxies)})res.encoding = "utf-8"html = re.findall(r'<a class="link-block" href="(.*?)"></a>',res.text)urls = html[:-1]# print(urls)for i in urls:res = requests.get(i, headers=headers, proxies={"http": "http://{}".format(proxies)})img_url = re.findall(r'<figure class="wp-block-image.*src="(.*?)"',res.text)print(img_url)with ThreadPoolExecutor(10) as t:t.submit(down_img,img_url)print(a)a += 1