Python进阶(1)
文章目录
- Python进阶(1)
- 一:发送请求(GET/POST)
- 1.1、方式一:http.client,以https方式为例
- 1.1.1、GET
- 1.1.2、POST
- 1.2、方式二:requests,以https方式为例
- 1.2.1、GET
- 1.2.2、POST
- 二、整合LOG日志
- 三、整合定时任务
- 四、写CSV文件
- 五、整合Redis
- 六、整合Mysql
- 七、AES加密
- 八、国密GMSM2,以JS方式
- 九、滑块验证码,滑块距离破解
Python版本:Python 3.10.0
一:发送请求(GET/POST)
1.1、方式一:http.client,以https方式为例
所需库文件:依赖内置库文件
1.1.1、GET
import http.client
import jsondef test(sessionId,pdate,p_):conn = http.client.HTTPSConnection("服务地址,例:abc.com.cn", "端口号 例 8080")payload = ''headers = {'Cookie' : sessionId,'Accept' : 'application/json, text/javascript, */*; q=0.01','User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36'}conn.request("GET", "/test.do?pdate=" + pdate + '&_=' + p_, payload, headers)res = conn.getresponse()data = res.read()dataStr = data.decode("utf-8")return json.loads(dataStr)
1.1.2、POST
import http.client
import jsondef get():conn = http.client.HTTPSConnection("服务地址,例:abc.com.cn")payload = {'type': 'type0'}headers = {'Content-Type': 'application/json;charset=UTF-8','Accept' : 'application/json, text/plain, */*','User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36'}conn.request("POST", "/get", json.dumps(payload), headers)res = conn.getresponse()data = res.read()return json.loads(data.decode("utf-8"))
1.2、方式二:requests,以https方式为例
所需库文件:requests
pip install requests
1.2.1、GET
import requests
import jsondef changeDate(sessionId,pdate,p_):url = "https://abc.com.cn:8080/test.do?pdate=" + pdate + '&_=' + p_payload={}headers = {'Cookie' : sessionId,'Accept' : 'application/json, text/javascript, */*; q=0.01','User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36'}response = requests.request("GET", url, headers=headers, data=payload)dataStr = response.textreturn json.loads(dataStr)
1.2.2、POST
import requests
import jsondef get():url = "https://abc.com.cn/get"payload = json.dumps({"type": "type0"})headers = {'Content-Type': 'application/json;charset=UTF-8','Accept' : 'application/json, text/plain, */*','User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36'}response = requests.request("POST", url, headers=headers, data=payload)jsonData = json.loads(response.text)return jsonData
二、整合LOG日志
需求:一天一个日志文件,保存30天
所需库文件:不需要下载库文件,依赖内置库文件
LOG配置,单例
import os
import logging
import re
from logging.handlers import TimedRotatingFileHandler# ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓当前配置需要更改↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
folderPath = './log'
log_name = 'appLog'
Log = Nonedef setup_log():global Logif Log is not None:return Logelse:isExist = os.path.exists(folderPath)if not isExist:os.makedirs(folderPath)# 创建logger对象。传入logger名字logger = logging.getLogger(log_name)log_path = os.path.join(folderPath + '/',log_name)# 设置日志记录等级logger.setLevel(logging.INFO)# interval 滚动周期,# when="MIDNIGHT", interval=1 表示每天0点为更新点,每天生成一个文件# backupCount 表示日志保存个数file_handler = TimedRotatingFileHandler(filename=log_path, when="MIDNIGHT", interval=1, backupCount=30)# filename="mylog" suffix设置,会生成文件名为mylog.2020-02-25.logfile_handler.suffix = "%Y-%m-%d.log"# extMatch是编译好正则表达式,用于匹配日志文件名后缀# 需要注意的是suffix和extMatch一定要匹配的上,如果不匹配,过期日志不会被删除。file_handler.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}.log$")# 定义日志输出格式file_handler.setFormatter(logging.Formatter("[%(asctime)s] [%(process)d] [%(levelname)s] - %(module)s.%(funcName)s (%(filename)s:%(lineno)d) - %(message)s"))logger.addHandler(file_handler)Log = loggerreturn logger
其他文件引用即可
abc.py
import NativeLogging# 使用日志记录器
logger = NativeLogging.setup_log()def start():logger.info('--------------------------------------')
三、整合定时任务
需求:每分钟执行一次
所需库文件:apscheduler
pip install apscheduler
from apscheduler.schedulers.blocking import BlockingSchedulerdef start():# 在这里定义你想要执行的任务逻辑print("Task executed!")scheduler = BlockingScheduler()
scheduler.add_job(start, 'cron', minute='*/1')try:scheduler.start()
except KeyboardInterrupt:pass
四、写CSV文件
需求:将两列多行数据写入到CSV文件
所需库文件:依赖内置库文件
import csvcsvFolder = './csv'# 写入到CSV文件
def writeCsv(currentYmd,data):outArr = []innArr = ['title1','title2']outArr.append(innArr)for element in data:innArr = [element['key1'],element['key2']]outArr.append(innArr)isExistFolderAndCreate(csvFolder)filename = csvFolder + '/' + currentYmd.replace('-','') + '.csv'with open(filename, 'w', newline='') as csvfile:writer = csv.writer(csvfile)writer.writerows(outArr)print('CSV文件写入成功')# 判断文件目录是否存在并创建
def isExistFolderAndCreate(folderPath):isExist = os.path.exists(folderPath)if not isExist:os.makedirs(folderPath)
五、整合Redis
需求:与Java Redis工具类一样,Redis只连接一次,通过共通方法调用
所需库文件:redis
pip install redis
import redis# ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓当前配置需要更改↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
host = 'localhost'
port = 6379
username = None
password = Noneclass RedisConfig:_instance = Nonedef __new__(cls, host, port, db, username=None, password=None):if not cls._instance:cls._instance = super().__new__(cls)return cls._instancedef __init__(self, host, port, db, username=None, password=None):if not hasattr(self, 'redis'):self.redis = redis.Redis(host=host, port=port, db=db, username=username, password=password)def publish(self, channel, message):self.redis.publish(channel, message)def set(self, key, value):self.redis.set(key, value)def get(self, key):return self.redis.get(key).decode('utf-8') if self.redis.get(key) else ''def publish(channel, message):# 创建 RedisConfig 实例并连接到 RedisredisConfig = RedisConfig(host=host, port=port, db=0, username=username, password=password)# 调用 publish() 方法进行发布redisConfig.publish(channel, message)def setString(key, value):# 创建 RedisConfig 实例并连接到 RedisredisConfig = RedisConfig(host=host, port=port, db=0, username=username, password=password)# 调用 publish() 方法进行发布redisConfig.set(key, value)def getString(key):# 创建 RedisConfig 实例并连接到 RedisredisConfig = RedisConfig(host=host, port=port, db=0, username=username, password=password)# 调用 publish() 方法进行发布return redisConfig.get(key)
六、整合Mysql
需求:与Java Mybatis一样,通过连接池,共通方法调用
所需库文件:mysql-connector-python
pip install mysql-connector-python
import mysql.connector
from mysql.connector import pooling# ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓当前配置需要更改↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
host = 'localhost'
username = 'root'
password = '123456'
database = 'db'class MySQLUtils:def __init__(self, host, user, password, database, pool_name='mypool', pool_size=5):self.host = hostself.user = userself.password = passwordself.database = databaseself.pool_name = pool_nameself.pool_size = pool_sizeself.pool = Nonedef connect(self):self.pool = mysql.connector.pooling.MySQLConnectionPool(pool_name=self.pool_name,pool_size=self.pool_size,host=self.host,user=self.user,password=self.password,database=self.database)def insert_data(self, sql, data):if self.pool is None:self.connect()connection = self.pool.get_connection()cursor = connection.cursor()insert_query = sqlcursor.execute(insert_query, data)connection.commit()cursor.close()connection.close()def insert_batch_data(self, sql, data):if self.pool is None:self.connect()connection = self.pool.get_connection()cursor = connection.cursor()insert_query = sqlcursor.executemany(insert_query, data)connection.commit()cursor.close()connection.close()def close(self):if self.pool is not None:self.pool.closeall()# 创建全局的 MySQLUtils 实例
utils = MySQLUtils(host, username, password, database)def insertBatch(sql, data):global utilsutils.insert_batch_data(sql, data)# sql = "INSERT INTO table (company_id, value1, value2) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE company_id = VALUES(company_id), value2 = values(value2)"
# insDataArr = [(1,2,3),(1,2,3)]
七、AES加密
需求:一段文字,需要通过AES进行加密处理
所需库文件:pycryptodome
pip install pycryptodome
from Crypto.Cipher import AES
import base64def Encrypt(secretKey,jsonXy):key = secretKey.encode('utf-8')iv = b'0000000000000000'cipher = AES.new(key, AES.MODE_ECB)plaintext = jsonXy.encode('utf-8')# 明文需要按照AES块大小(16)进行补全text_len = len(plaintext)padding_len = AES.block_size - text_len % AES.block_sizepadding = bytes([padding_len]) * padding_lenplaintext += paddingciphertext = cipher.encrypt(plaintext)# 返回base64格式的密文return base64.b64encode(ciphertext).decode('utf-8')
八、国密GMSM2,以JS方式
需求:一段文字,需要通过国密GMSM2进行加密处理,但是Python中的gmsm2加密不符合需求,JS中的库文件加密符合要求,通过Python调用JS代码的方式
所需库文件:PyExecJS
pip install PyExecJS
安装nodejs,版本v16.15.0
npm install -g sm-crypto
import execjs# ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓当前配置需要更改↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
cwd=r"C:/Users/Administrator/AppData/Roaming/npm/node_modules"js = """const smCrypto = require("sm-crypto");function doEncrypt(msgString, publicKey) {let msg = msgString;if (typeof (msgString) !== "string") {msg = JSON.stringify(msgString);}let sm2 = smCrypto.sm2;// 1 - C1C3C2; 0 - C1C2C3; 默认为1let cipherMode = 1; // 特别注意,此处前后端需保持一致// 加密结果let encryptData = sm2.doEncrypt(msg, publicKey, cipherMode);// 加密后的密文前需要添加04,后端才能正常解密console.log("04" + encryptData)return "04" + encryptData;}
"""
def execute_sm2(pwd,key):# ctx = execjs.compile(js)ctx = execjs.compile(js, cwd=cwd)res = ctx.call('doEncrypt', pwd, key)# print(res)return res;
九、滑块验证码,滑块距离破解
需求:计算网站登录时,滑块验证中小滑块在大背景中所需要移动的距离,图片为base64
计算出的该距离在有些场景的滑块验证码中并不是绝对正确,可能需要在计算出的距离之后 加减 偏差值,比如计算的距离为156,实际距离是166,需要 + 10,这个加减的偏差值在此场景的验证码中通用,只需要对该计算的距离加减即可,因为此场景所有的验证码可能都会出现此问题
所需库文件:opencv-python ,Pillow
pip install opencv-python Pillow
import base64
import cv2
import numpy as np
from PIL import Image
from io import BytesIOdef blockPuzzleCal(large_image_base64,small_image_base64):# Base64编码的大图和小图large_image_base64 = large_image_base64small_image_base64 = small_image_base64# 解码大图large_image_data = base64.b64decode(large_image_base64)large_image = Image.open(BytesIO(large_image_data))large_image = cv2.cvtColor(np.array(large_image), cv2.COLOR_RGB2BGR)# 解码小图small_image_data = base64.b64decode(small_image_base64)small_image = Image.open(BytesIO(small_image_data))small_image = cv2.cvtColor(np.array(small_image), cv2.COLOR_RGB2BGR)# 使用OpenCV的模板匹配result = cv2.matchTemplate(large_image, small_image, cv2.TM_CCOEFF_NORMED)min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)# 输出最匹配的坐标return max_loc