APScheduler定时器使用【重写SQLAlchemyJobStore版】:django中使用apscheduler,使用mysql做存储后端

一、环境配置

python==3.8.10

包:

APScheduler==3.10.4
Django==3.2.7
djangorestframework==3.15.1
SQLAlchemy==2.0.29
PyMySQL==1.1.0

项目目录情况

gs_scheduler 应用

        commands : 主要用来自定义命令,python manage.py crontab  

        schedulers:所有apscheduler定时器的东西都在里面

        logs:存放定时器任务的日志信息

        views.py和urls.py:对外开放的接口,获取定时任务的基本信息和运行情况

二、django基本配置

根settings.py

#pymysql使用数据库
import pymysql
# pymysql.version_info = (1, 4, 0, "final", 0)  # 确保版本信息被正确设置
pymysql.install_as_MySQLdb()INSTALLED_APPS = ['rest_framework',#restful'gs_scheduler', #注册创建的应用
]#设置数据库
MYSQL_HOST = '127.0.0.1'
MYSQL_PORT = 3306
MYSQL_USER = 'root'
MYSQL_PASSWORD = 'ldc-root'
MYSQL_NAME = 'study_scheduler'DATABASES = {'default': {'ENGINE': 'django.db.backends.mysql','HOST': MYSQL_HOST,'PORT': MYSQL_PORT,'USER': MYSQL_USER,'PASSWORD': MYSQL_PASSWORD,'NAME': MYSQL_NAME,}
}

根urls.py

注册路由

from django.contrib import admin
from django.urls import path,includeurlpatterns = [path('admin/', admin.site.urls),path('api/scheduler/',include('gs_scheduler.urls')),
]

三、配置apscheduler

3.1、schedulers/base.py

主要重写了SQLAlchemyJobStore类,添加上一些其他的字段和数据库表。

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore as _SQLAlchemyJobStore
from apscheduler.jobstores.base import JobLookupError, ConflictingIdError
from apscheduler.util import maybe_ref, datetime_to_utc_timestamp
from datetime import datetime
try:import cPickle as pickle
except ImportError:  # pragma: nocoverimport pickletry:from sqlalchemy import (create_engine, Table, Column, MetaData,delete, Unicode, Float, LargeBinary,String, BigInteger,DATETIME,select,Boolean,text, and_)from sqlalchemy.exc import IntegrityErrorfrom sqlalchemy.sql.expression import null
except ImportError:  # pragma: nocoverraise ImportError('SQLAlchemyJobStore requires SQLAlchemy installed')Datetime_Format = '%Y-%m-%d %H:%M:%S'#重写SQLAlchemyJobStore,用于自定义数据库表
class SQLAlchemyJobStore(_SQLAlchemyJobStore):Jobs_Tablename = 'apscheduler_jobs'#记录定时任务基本信息Jobs_History_Tablename = 'apscheduler_history'#记录定时任务运行历史def __init__(self, url=None, engine=None, tablename='apscheduler_jobs', metadata=None,pickle_protocol=pickle.HIGHEST_PROTOCOL, tableschema=None, engine_options=None):#执行当前父类的父类的初始化方法super(_SQLAlchemyJobStore, self).__init__()self.pickle_protocol = pickle_protocolmetadata = maybe_ref(metadata) or MetaData()if engine:self.engine = maybe_ref(engine)elif url:self.engine = create_engine(url, **(engine_options or {}))else:raise ValueError('Need either "engine" or "url" defined')# 191 = max key length in MySQL for InnoDB/utf8mb4 tables,# 25 = precision that translates to an 8-byte floatself.jobs_t = Table(self.Jobs_Tablename, metadata,Column('id', Unicode(191), primary_key=True),Column('next_run_time', Float(25), index=True),Column('job_state', LargeBinary, nullable=False),Column('trigger',String(256),nullable=True),#新增,记录定时器的定时规则Column('desc',String(256),nullable=True),#新增,记录定时任务的描述信息schema=tableschema)#新增的一张表,记录定时任务运行历史self.jobs_t_history = Table(self.Jobs_History_Tablename,metadata,Column('id',BigInteger(),primary_key=True),Column('job_id',Unicode(191)),Column('run_time',DATETIME(),index=True,nullable=False),Column('is_error',Boolean(),default=0),Column('error_msg',String(256),nullable=True),schema=tableschema)#重写:对新表的创建def start(self, scheduler, alias):super(SQLAlchemyJobStore, self).start(scheduler, alias)#创建表self.jobs_t.create(self.engine, True)self.jobs_t_history.create(self.engine,True)#重写:对新字段的操作def add_job(self, job):#获取当前任务的定时器规则,描述信息trigger,desc = self.get_job_rule_and_desc(job)insert = self.jobs_t.insert().values(**{'id': job.id,'next_run_time': datetime_to_utc_timestamp(job.next_run_time),'job_state': pickle.dumps(job.__getstate__(), self.pickle_protocol),'trigger':trigger,'desc':desc})with self.engine.begin() as connection:try:connection.execute(insert)except IntegrityError:raise ConflictingIdError(job.id)#重写:对新字段的操作def update_job(self, job):trigger,desc = self.get_job_rule_and_desc(job)update = self.jobs_t.update().values(**{'next_run_time': datetime_to_utc_timestamp(job.next_run_time),'job_state': pickle.dumps(job.__getstate__(), self.pickle_protocol),'trigger':trigger,'desc':desc}).where(self.jobs_t.c.id == job.id)with self.engine.begin() as connection:result = connection.execute(update)if result.rowcount == 0:raise JobLookupError(job.id)# 新增方法:提取job的定时器信息和job的描述信息def get_job_rule_and_desc(self, job):the_type, rules = str(job.trigger).split('[')rule = rules.split(']')[0]trigger = '每隔'  # 定时任务的定时规则if the_type == 'date':# rule = '2024-10-10 20:20:12 csl'trigger = '在{} 时间点执行一次'.format(rule.rsplit(' ', 1)[0])elif the_type == 'interval':dic = {0: '小时', 1: '分钟', 2: '秒'}if 'day' in rule:# rule = '1 ady,00:00:00'day, hms = rule.split(',')day = int(day.split('day')[0])trigger += '{}天'.format(day)hms = hms.split(':')else:# rule = '01:01:01'hms = rule.split(':')for i, value in enumerate(hms):value = int(value)if value > 0:trigger += str(value)trigger += dic.get(i)else:trigger += '执行一次'else:# cron,比较复杂,不好判断# rule ="hour='0', minute='0', second='1'"trigger = '{},通过linux系统cron表达式'.format(job.trigger)desc = job.name or ''  # 定时任务描述return (trigger, desc)#新增方法:记录定时任务的运行历史,给scheduler监听器使用def insert_job_history(self,data:dict):''':param data: {'job_id':'x','run_time','is_error':1,'error_msg':'xxxx'}:return:'''insert = self.jobs_t_history.insert().values(**{'job_id': data.get('job_id'),'run_time': data.get('run_time'),'is_error': data.get('is_error'),'error_msg': data.get('error_msg')})with self.engine.begin() as connection:connection.execute(insert)#新增方法:api获取任务下次运行时间def api_get_run_next(self):'''获取每个任务的下次运行时间row:job_id = row[0],next_run=row[1],trigger=row[3],desc=row[4]:return:'''search = self.jobs_t.select().filter_by()with self.engine.begin() as connection:results = connection.execute(search)ret_data = []for row in results:dic = {'job_id':row[0],'next_run':datetime.fromtimestamp(row[1]).strftime(Datetime_Format),'trigger':row[3],'desc':row[4]}ret_data.append(dic)return ret_data#新增方法:api获取任务历史运行记录def api_get_run_history(self):'''获取每个任务运行成功的最近10个记录row :id=row[0],job_id=row[1],run_time=row[2],is_error=row[3],error_msg=row[4]:return:'''job_data_list = self.api_get_run_next()ret_data = []for dic in job_data_list:job_id = dic.get('job_id')history_data = {'job_id': job_id,'run_time': [],}search = self.jobs_t_history.select().filter_by(is_error=0,job_id=job_id).order_by(text('-id')).limit(10)with self.engine.begin() as connection:results = connection.execute(search)for row in results:run_time = datetime.strftime(row[2],Datetime_Format)history_data['run_time'].append(run_time)ret_data.append(history_data)return ret_data#新增方法:api获取任务错误记录def api_get_run_error(self):'''获取每个任务运行失败的最近10个记录row :id=row[0],job_id=row[1],run_time=row[2],is_error=row[3],error_msg=row[4]:return:'''job_data_list = self.api_get_run_next()ret_data = []for dic in job_data_list:job_id = dic.get('job_id')history_data = {'job_id': job_id,'error_run': [],}search = self.jobs_t_history.select().filter_by(is_error=1, job_id=job_id).order_by(text('-id')).limit(5)with self.engine.begin() as connection:results = connection.execute(search)for row in results:id=row[0]job_id=row[1]run_time=datetime.strftime(row[2], Datetime_Format)is_error=row[3]error_msg=row[4]history_data['error_run'].append({'run_time':run_time,'error_msg':error_msg})ret_data.append(history_data)return ret_data#新增方法:清除历史运行记录def delete_before_run_history(self):'''将历史运行记录中,每个任务只保留最近20个运行成功的记录每个任务只保留最近20个运行失败的记录:return:'''# 获取底层数据库连接session = self.engine.connect()# 获取任务表的名称table_name = self.Jobs_History_Tablename# 获取所有任务的job_idjob_data_list = self.api_get_run_next()for dic in job_data_list:job_id = dic.get('job_id')search_success = self.jobs_t_history.select().filter_by(is_error=0, job_id=job_id).order_by(text('-id'))search_error = self.jobs_t_history.select().filter_by(is_error=1, job_id=job_id).order_by(text('-id'))#正常运行的待删除的iddelete_ids_su = []#运行失败的待删除的iddelete_ids_err = []#提交,拿到查询结果with self.engine.begin() as connection:results_su = connection.execute(search_success)results_err = connection.execute(search_error)#把成功的所有的id取出for row in results_su:delete_ids_su.append(row[0])#把运行失败的所有的id取出for row in results_err:delete_ids_err.append(row[0])#删除成功运行的记录if delete_ids_su:delete_ids_su = delete_ids_su[20:]# 构建 SQL 语句delete_query = text(f"DELETE FROM {table_name} WHERE id IN :job_ids")# 执行删除操作session.execute(delete_query, {"job_ids": delete_ids_su})# 提交事务session.commit()#删除失败运行的记录if delete_ids_err:delete_ids_err = delete_ids_err[20:]# 构建 SQL 语句delete_query = text(f"DELETE FROM {table_name} WHERE id IN :job_ids")# 执行删除操作session.execute(delete_query, {"job_ids": delete_ids_err})# 提交事务session.commit()# 关闭数据库连接session.close()

3.2、schedulers/logger.py

用于记录apscheduler定时器的一些日志信息。

import os
import logging
from datetime import datetime, date, timedelta
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler# management目录路径
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
# 日志文件存放的目录
LOGS_DIR = os.path.join(BASE_DIR, 'logs')# 创建logs目录
if not os.path.exists(LOGS_DIR):os.makedirs(LOGS_DIR)def getLogHandlerConsole():'1、日志格式'formatter = logging.Formatter('[%(asctime)s][%(levelname)s][ %(funcName)s function: %(lineno)s line]:%(message)s')'2、输出到控制台处理器'console_handler = logging.StreamHandler()console_handler.setLevel(logging.DEBUG)console_handler.setFormatter(formatter)return console_handlerdef getLogHandlerFile():# 文件名,以日期作为文件名log_file_name = date.today().strftime('%Y-%m-%d.log')# 构建日志文件的路径log_file_str = os.path.join(LOGS_DIR, log_file_name)'1、日志记录格式'# 默认日志等级的设置# logging.basicConfig(level=logging.INFO)# 设置日志的格式:发生时间,日志等级,日志信息文件名,      函数名,行数,日志信息formatter = logging.Formatter('[%(asctime)s][%(levelname)s][%(pathname)s: %(funcName)s function: %(lineno)s line]: %(message)s')'2、基于文件的日志处理器配置'# 创建日志记录器,指明日志保存路径,每个日志的大小,保存日志的上限file_log_handler = RotatingFileHandler(filename=log_file_str,  # 日志文件名maxBytes=1024 * 1024 * 10,  # 文件大小超过10MB后,就会生成一个新的日志文件,日志就写到新的文件中backupCount=10,  # 最大支持总的日志文件数encoding='UTF-8')file_log_handler.setFormatter(formatter)  # 设置日志的格式file_log_handler.setLevel(logging.INFO)  # 设置日志等级return file_log_handler  # 基于文件大小分割日志的方案# 日志记录器1
scheduler_logger = logging.getLogger('apscheduler.scheduler')
scheduler_logger.setLevel(logging.INFO)
scheduler_logger.addHandler(getLogHandlerFile())  # 添加文件日志处理器
scheduler_logger.addHandler(getLogHandlerConsole())  # 添加控制台日志处理器if __name__ == '__main__':scheduler_logger.info('hhhhh')print(os.path.dirname(os.path.dirname(__file__)))

3.3、schedulers/config.py

存放的是apscheduler调度器需要的配置信息。

import os
#from apscheduler.jobstores.memory import MemoryJobStore #内存做后端存储
#from apscheduler.jobstores.redis import RedisJobStore #redis做后端存储
from .base import SQLAlchemyJobStore #mysql等做后端存储
from study_apscheduler import settings
from .logger import scheduler_logger
#mysql://root:ldc-root@127.0.0.1:3306/jobs?charset=utf8
MYSQL_CONFIG = settings.DATABASES.get('default')
MYSQL_USER = MYSQL_CONFIG.get('USER')
MYSQL_PASSWORD = MYSQL_CONFIG.get('PASSWORD')
MYSQL_HOST = MYSQL_CONFIG.get('HOST')
MYSQL_PORT = MYSQL_CONFIG.get('PORT')
MYSQL_NAME = MYSQL_CONFIG.get('NAME')
MYSQL_CHARSET = 'utf8mb4'
URL = 'mysql://{}:{}@{}:{}/{}?charset={}'.format(MYSQL_USER,MYSQL_PASSWORD,MYSQL_HOST,MYSQL_PORT,MYSQL_NAME,MYSQL_CHARSET)
#时区
TIME_ZONE = 'Asia/Shanghai'
#job的默认配置
JOB_DEFAULTS =  {'coalesce': True, #系统挂掉,任务积攒多次为执行,True是合并成一次执行,False是执行所有的次数。 持久化存储才有效'max_instances': 3 # 同一个任务同一时间最多只能有3个实例在运行。}
#job的存储后端
JOB_STORE = {'default': SQLAlchemyJobStore(url=URL)
}#日志处理器
Scheduler_Logger = {'logger':scheduler_logger
}
#监听事件对应的情况
LISTENER={1:'调度程序启动',2:'调度程序关闭',4:'调度程序中任务处理暂停',8:'调度程序中任务处理恢复',16:'将执行器添加到调度程序中',32:'执行器从调度程序中删除',64:'将任务存储添加到调度程序中',128:'任务存储从调度程序中删除',256:'所有任务从所有任务存储中删除或从一个特定的任务存储中删除	',512:'添加新的定时任务',1024:'从任务存储中删除了任务',2048:'从调度程序外部修改了任务',4096:'任务执行成功',8192:'任务在执行期间引发异常',16384:'错误了任务执行',32768:'任务已经提交到执行器中执行',65536:'任务因为达到最大并发执行时,触发的事件'
}

3.4、schedulers/main.py

实例化好调度器,配置日志,监听器、添加定时任务、存储后端等。

# 导入所需的调度器类和触发器类
from apscheduler.schedulers.background import BackgroundScheduler #后台运行
from apscheduler.schedulers.blocking import BlockingScheduler  #主进程运行,需要单独运行
from apscheduler.triggers.interval import IntervalTrigger #时间间隔
from apscheduler.triggers.cron import CronTrigger #复杂的定时任务
from apscheduler.triggers.date import DateTrigger #一次性定时任务
from apscheduler import events
from datetime import datetime
#定时任务
from .task import delete_apscheduler_history
from .task import send_to_big_data
from .task import crontab_task,date_task
#日志
from .config import LISTENER
from .config import TIME_ZONE,JOB_DEFAULTS,JOB_STORE,Scheduler_Logger #调度器配置class TheBlockScheduler(object):TIME_FORMAT = '%Y-%m-%d %H:%M:%S'def __init__(self):self.scheduler = self._scheduler_obj()self.logger = Scheduler_Logger.get('logger')# 1、初始化调度器def _scheduler_obj(self):obj = BlockingScheduler()obj.configure(timezone=TIME_ZONE,  # 时区job_defaults=JOB_DEFAULTS,  # job的默认配置jobstores=JOB_STORE,  # job的存储后端gconfig=Scheduler_Logger, #日志记录相关的的配置)return obj# 2、添加任务def _add_job(self):# 每5分钟执行一次推送告警到大数据仓self.scheduler.add_job(send_to_big_data,trigger=IntervalTrigger(minutes=1),id='send_to_big_data',replace_existing=True,coalesce=True,name='该定时,用于将数据推送到远端大数据系统')# 每天凌晨,清除历史的记录self.scheduler.add_job(delete_apscheduler_history,trigger=CronTrigger(hour=0,minute=0,second=1),id='delete_apscheduler_history',replace_existing=True,coalesce=True,name='该定时器,是将历史定时任务的执行记录进行清除')# 每隔20分钟清除垃圾记录self.scheduler.add_job(crontab_task,trigger=IntervalTrigger(minutes=20),  # 每天晚上零点1秒执行id='crontab_task',replace_existing=True,coalesce=True,name='该定时任务,用于清除xxx表中的垃圾记录')#在指定某个时刻执行一次self.scheduler.add_job(date_task,trigger=DateTrigger(run_date='2024-12-03 10:11:30'),id='date_task',replace_existing=True,coalesce=True,name='该定时任务,在指定时间,进行系统初始化任务',)# 3、添加监听器def _listener(self, event: events):code = event.coderun_time = datetime.now().strftime(self.TIME_FORMAT)msg = LISTENER.get(code)#存储器jobstore = self.scheduler._jobstores['default']job_history_data = {'job_id':None,'run_time':None,'is_error':0,'error_msg':None}if code == 4096:# 成功运行job_id = event.job_idjob_history_data['job_id'] = job_idjob_history_data['run_time'] = run_time# 记录到数据库中jobstore.insert_job_history(job_history_data)elif code == 8192 or code == 16384:# 运行异常了job_id = event.job_idjob_history_data['job_id'] = job_idjob_history_data['run_time'] = run_timejob_history_data['is_error'] = 1job_history_data['error_msg'] = msg# 记录到数据库中jobstore.insert_job_history(job_history_data)elif code in (1,2,4,8,32,128,1024,2048):###调度器启动时self.logger.info(msg)try:job_id = event.job_idif msg:msg = '任务id={},{}'.format(job_id, msg)self.logger.info(msg)except Exception:if msg:self.logger.info(msg)# 4、启动定时器def start(self):# 1、设置定时任务(监听器会先监听到任务添加,再监听到调度器启动)self._add_job()# 2、设置监听器self.scheduler.add_listener(self._listener)# 3、启动调度器try:# print('{},定时器启动成功,等待定时任务执行...'.format(datetime.now().strftime(self.TIME_FORMAT)))self.scheduler.start()except KeyboardInterrupt:self.scheduler.shutdown()#后台线程运行:随django项目一起运行
class TheBackRunScheduler(TheBlockScheduler):def _scheduler_obj(self):obj = BackgroundScheduler()obj.configure(timezone=TIME_ZONE,  # 时区job_defaults=JOB_DEFAULTS,  # job的默认配置jobstores=JOB_STORE,  # job的存储后端gconfig=Scheduler_Logger,  # 日志记录相关的的配置)return objif __name__ == '__main__':#后台启动backScheduler = TheBackRunScheduler()backScheduler.start()

3.5、schedulers/models.py

数据库查询方法,获取定时任务的状态和运行情况。

from .config import JOB_STORE
jobstore = JOB_STORE['default']
#获取定时任务下次运行时间
get_job_run_next = jobstore.api_get_run_next
#获取定时任务最近运行情况
get_job_run_history = jobstore.api_get_run_history
#获取定时任务最近运行错误
get_job_run_error = jobstore.api_get_run_error
#清除历史运行记录
delete_run_history = jobstore.delete_before_run_historyif __name__ == '__main__':pass

3.6、schedulers/task.py

存放定时任务。

import os
from .logger import scheduler_logger as log
from .models import delete_run_history#推送到数据仓的告警信息:5分钟执行一次
def send_to_big_data():log.info('推送到大数据仓')#清除定时任务历史运行记录
def delete_apscheduler_history():#清除历史运行记录delete_run_history()def crontab_task():log.info('crontab定时器')def date_task():log.info('date定时器')if __name__ == '__main__':print(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))

四、自定义命令,启动定时器

4.1、commands/crontab.py

from django.core.management.base import BaseCommand
#导入block运行的代码
from gs_scheduler.management.schedulers.main import TheBlockScheduler#自定义脚本命令:使用,python manage.py crontab 启动
class Command(BaseCommand):# python manage.py crontab运行 就是调用该方法def handle(self, *args, **options):scheduler = TheBlockScheduler()scheduler.start()

五、启动定时器

5.1、以主进程方式启动

python manage.py crontab

5.2、后台进程方式启动

1、修改settings.py

from gs_scheduler.management.schedulers.main import TheBackRunScheduler
scheduler = TheBackRunScheduler()
scheduler.start()

2、启动django项目时,后台运行定时器

六、启动django项目

python manage.py runserver 8800

七、测试

1、获取任务下次运行情况

获取每个任务,下次执行的时间和基本信息。

http://127.0.0.1:8800/api/scheduler/run_next/

2、获取任务运行历史记录

获取到每个任务的最近10次运行成功的记录。

http://127.0.0.1:8800/api/scheduler/run_history/

3、获取任务运行失败的记录

获取每个任务最近5次运行失败的记录。

http://127.0.0.1:8800/api/scheduler/run_error/

八、码云下载源码

码云地址:

django应用定时器: django下使用定时器的方法icon-default.png?t=N7T8https://gitee.com/liuhaizhang/django-application-timer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/11341.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OpenHarmony开发案例【大合集】

OpenHarmony Codelabs 概要简介 为帮助开发者快速熟悉OpenHarmony的能力以及相关的应用开发流程,我们提供了一系列的基于趣味场景的应用示例,即Codelabs,开发者可以根据我们的文档一步步的学习和完成简单项目的开发。 目录 优秀案例 一次开…

小米/红米手机刷机错误:Missmatching image and device

报错: Missmatching image and device。 场景: 该解决方法只适用于手机是通过EMT解锁的。 解决方法: 打开刷机脚本,并注释检测脚本: 刷机脚本根据不同的刷机方式,选择编辑不同的脚本,例如&am…

nginx关于并发的总结

一、环境 1.nginx版本 [rootnode4 ~]# nginx -V nginx version: nginx/1.18.0 built by gcc 4.8.5 20150623 (Red Hat 4.8.5-44) (GCC) configure arguments: --prefix/usr/local/nginx2.操作系统 centos73.硬件配置 [rootnode2 ~]# cat /proc/cpuinfo |grep process proc…

2024年最顶尖的AI驱动SEO工具|TodayAI

在当今数字营销的竞争环境中,获得搜索引擎的高排名至关重要,因为它直接关联到网站的有机流量和品牌的在线影响力。搜索引擎优化(SEO)是提高网站排名的关键方式,通过优化网站内容和结构来符合搜索引擎的算法要求。然而&…

win server服务器 关闭危险端口 135,137,138,139,445的方法

通过防火墙来控制 打开控制面板 选择检查防火墙状态 选择高级设置 选择入站规则,再新建规则 选择端口,下一步 选择端口应用于啥协议,再指定端口,再下一步 选择阻止连接,下一步 下一步 给规则别名一下,方便…

5.12学习总结

一.JAVA聊天室项目 文件发送 使用 Java Socket 实现聊天内容或文件的传输的原理如下: 服务器端启动:聊天室的服务器端在指定的端口上监听客户端的连接。它创建一个 ServerSocket 对象,并通过调用 accept() 方法等待客户端的连接请求。客户…

快速入门:利用Go语言下载Amazon商品信息的步骤详解

概述 在这篇文章中,我们将深入探讨如何利用Go语言这一强大的工具,结合代理IP技术和多线程技术,实现高效下载Amazon的商品信息。首先,让我们来看看为什么选择Go语言作为开发网络爬虫的首选语言。 Go语言在网络开发中的特点 简洁…

grep 整理

grep 整理 1. 正则表达式和通配符2. grep2.1 基本用法2.2 进阶使用2.3 配合正则表达式使用2.4 grep增强版 1. 正则表达式和通配符 首先,我们回顾下正则表达式和通配符相关内容,这有助于接下来的grep学习: 基础正则表达式 RE 字符意义与范例…

项目经理根本不需要考PMP证书,浪费?

我首先会考虑你是否有这方面的需求,这是肯定的。如果你只是为了跟风而考证,因为别人都在考,所以你也跟着考,这样做是毫无意义的。 那么如何判断自己是否有这方面的需求呢? 1、工作 在工作中要考虑三个条件&#xff…

内裤什么牌子的质量好男士?2024男士内裤排行榜出炉

换新内裤这件事情对很难男生来说都挺难的,因为现在的男士内裤品牌比较多,而且还有各种不同材质的区分。尤其是大多数男士对自己穿的内裤都不是特别在意,实际上如果长期不更换内裤,会使内裤的舒适性、透气性降低,而且还…

附录2 创建flask镜像

目录 1 python镜像 2 安装flask 3 把项目文件扔进去 3.1 创建git仓库 3.2 上传文件 3.3 获取git链接 3.4 在容器中git clone 4 启动flask服务 5 将容器保存为镜像 6 映射端口运行镜像 7 遇到的问题 8 Dockerfile创建镜像 1 python镜像 首先找一下fla…

【全开源】keep健身小程序基于FastAdmin+ThinkPHP+UniApp

基于FastAdminUniApp(目前仅支持微信小程序)开发的健身相关行业小程序,程序适用于健身房、瑜伽馆、游泳馆、篮球馆等健身培训场所。平台拥有课程售卖、课程预约、多门店管理、私教预约、教练端、会员卡办理、在线商城、分销模块、页面自定义装…

Charles客户端下载

1.Charles客户端下载: 官网地址:Download a Free Trial of Charles • Charles Web Debugging Proxy 2.下载安装完成后激活 激活网站地址:https://www.zzzmode.com/mytools/charles/ 3.help,选择第一个,激活

Docker基础复习

文章目录 基础Docker基础命令镜像操作命令容器操作命令 案例:安装MySql案例:查看DockerHub,拉取Nginx镜像,并运行容器Docker命令起别名 基础 Docker基础命令 启动Docker systemctl start docker镜像操作命令 从远程仓具下载镜像到本地 docker pull 镜像…

Linux开发--Linux字符设备驱动设计

Linux字符设备驱动设计 概述 驱动的定义与功能 计算机系统中存在着大量的设备, 操作系统要求能够控制和管理这些硬件, 而驱动就是帮助操作系统完成这个任务。 驱动相当于硬件的接口, 它直接操作、 控制着我们的硬件, 操作系统通…

五款商用加密软件推荐 | 商用加密软件排行榜

没有网络安全就没有国家安全。信息安全是国家经济社会稳定运行,广大人民群众利益的保障。 对于公司来讲,数据安全同样是企业可持续发展的重要保障,防止内部核心数据、知识产权的泄露是企业数据安全的重要工作。下面是五款企业常用的加密软件…

网络安全学习路线+自学笔记(超详细) 自学网络安全看这一篇就够了

一、什么是网络安全 网络安全是一种综合性的概念,涵盖了保护计算机系统、网络基础设施和数据免受未经授权的访问、攻击、损害或盗窃的一系列措施和技术。经常听到的 “红队”、“渗透测试” 等就是研究攻击技术,而“蓝队”、“安全运营”、“安全运维”…

正点原子[第二期]Linux之ARM(MX6U)裸机篇学习笔记-15.1,2,3-GPIO中断控制实验

前言: 本文是根据哔哩哔哩网站上“正点原子[第二期]Linux之ARM(MX6U)裸机篇”视频的学习笔记,在这里会记录下正点原子 I.MX6ULL 开发板的配套视频教程所作的实验和学习笔记内容。本文大量引用了正点原子教学视频和链接中的内容。…

C--贪吃蛇

前言 贪吃蛇游戏是一个耳熟能详的小游戏,本次我们讲解他的简单的实现,需要掌握基本的API知识(http://t.csdnimg.cn/uHH6y),简单的C语言知识和基本的数据结构链表 简单的准备工作 蛇的节点 在游戏运⾏的过程中,蛇每次吃⼀个⻝物,蛇的⾝体就会变⻓⼀节&a…

力扣HOT100 - 45. 跳跃游戏 II

解题思路&#xff1a; 贪心 class Solution {public int jump(int[] nums) {int end 0;int maxPosition 0;int steps 0;for (int i 0; i < nums.length - 1; i) {maxPosition Math.max(maxPosition, i nums[i]);if (i end) {end maxPosition;steps;}}return steps;…