基于Fastapi搭建API服务实践案例

文章目录

  • 前言
  • 服务框架结构
    • app.py
    • run_app_server.sh
    • routers/upload.py
    • routers/query_result.py
    • utils/utils.py
    • utils/tasks.py
  • 总结


前言

本文讲述了如何使用fastapi搭建一个属于自己的服务,整个服务使用fastapi框架搭建,celery管理任务队列,slowapi限制请求的频率,主要技术点就是这么几个,其实实践起来也是很简单的,那么我们就开始吧!

服务框架结构

首先给出整个项目的目录结构如下,并对各个目录、文件进行讲解:
app/
├── app.py
├── run_app_server.sh
├── routers/
│ ├── main_router.py
│ └── result_router.py
└── utils/
├── tasks.py
└── utils.py

根目录: app/ 是根目录,包含核心文件和子目录

app.py:主程序入口文件。
run_app_server.sh:运行服务的脚本。

子目录:

routers/:放置路由相关的模块。main_router.py:主路由逻辑。result_router.py:查询结果相关路由。
utils/:放置工具类模块。tasks.py:主要任务处理模块。utils.py:其他通用函数模块。

这种组织结构便于扩展,文件划分清晰,适合中小型 Python 项目。

app.py

首先是app.py,它是基于 FastAPI 构建的 Web 服务的主程序入口。以下是它的代码:

from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from routers import upload, query_result
from slowapi.errors import RateLimitExceededapp = FastAPI()
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)# 注册路由
app.include_router(upload.router, tags=["Main Feature"])
app.include_router(query_result.router, tags=["Result Query"])# 根路由,测试服务是否正常运行
@app.get("/")
@limiter.limit("5/second")  # 设置访问限制
async def root(request: Request):return {"message": "Welcome to My Service"}

初始化 FastAPI 应用:

创建一个 FastAPI 应用实例:app = FastAPI(),这是整个服务的核心对象,用于管理路由、请求、响应以及中间件等。

集成访问限制(Rate Limiting)

使用 slowapi 库和 Limiter 实现对客户端请求的访问频率限制,防止滥用或过载。	
Limiter 的配置:key_func=get_remote_address:通过客户端 IP 地址(或远程地址)来判断访问来源。app.state.limiter = limiter:将访问限制功能挂载到应用状态中。
异常处理:注册 RateLimitExceeded 异常处理器,确保当请求超出限制时返回错误信息。

注册路由:

通过 app.include_router() 注册不同功能模块的路由。upload.router:处理与上传和转录相关的逻辑。query_result.router:处理查询结果的相关逻辑。
使用 tags 为路由分组(便于生成文档时分类)。

根路由:

定义根路径 / 的处理逻辑,用于测试服务是否正常运行。
使用访问限制 @limiter.limit("5/second"),确保每秒最多允许 5 次访问。
返回的 JSON 响应:{"message": "Welcome to the Non-real-time Speech Transcription Service"}

run_app_server.sh

这个脚本用于启动基于 FastAPI 的 Web 服务应用,以下是脚本的功能解析和作用:

#!/bin/bash
# run_app_server.sh# 启动 FastAPI 应用
gunicorn -w 4 -k uvicorn.workers.UvicornWorker app:app --bind 0.0.0.0:6022 --timeout 3000 --reload

Gunicorn 配置:

gunicorn:Python 的一个高性能 WSGI HTTP 服务器,用于运行 Web 应用,能够很好地管理并发请求。
-w 4:指定使用 4 个工作进程,适合利用多核 CPU 的能力来提升性能,主要作用是提升服务的并发量。
-k uvicorn.workers.UvicornWorker:设置 Gunicorn 使用 Uvicorn 的 Worker,专门支持 ASGI 框架(如 FastAPI)。
app:app:第一个 app:表示 Python 文件名(app.py)。第二个 app:表示 FastAPI 实例对象名(在 app.py 中定义的 app 对象)。
--bind 0.0.0.0:6022:指定服务绑定的 IP 地址和端口:0.0.0.0:监听所有网卡接口上的请求。6022:监听的端口号。
--timeout 3000:设置超时时间为 3000 秒,适合处理可能耗时较长的任务(例如文件上传或复杂计算)。
--reload:开启代码热重载,适合开发环境,代码变更后服务会自动重启,正式环境一般不设置这个参数。

如何运行:

# 赋予脚本执行权限(如果未设置权限):
chmod +x run_app_server.sh#运行脚本:
./run_app_server.sh

优势:

使用 Gunicorn + Uvicorn Worker 提供高效的 ASGI 服务。
多工作进程(-w)提升服务并发能力。
可扩展性强,支持多种部署场景(本地或生产环境)。

routers/upload.py

这个脚本是一个基于 FastAPI 的路由模块,这里以音频转写任务为例子,提供一个音频文件上传和转写任务的 API(/upload)。它包含了后台任务管理、参数校验、Redis 数据存储、访问限制等功能。

from fastapi import APIRouter, BackgroundTasks, HTTPException, File, UploadFile, Form, Depends, Request
from slowapi import Limiter
from slowapi.util import get_remote_address
import uuidfrom utils.utils import save_info_to_redis, save_audio, estimate_time, validate_params
from utils.tasks import process_transcription_taskrouter = APIRouter()# 初始化 Limiter
limiter = Limiter(key_func=get_remote_address)@router.post("/upload")
@limiter.limit("20/second")  # 设置访问频率限制
async def upload_transcription(request: Request,params: dict = Depends(validate_params),
):# 生成唯一的order_idorder_id = str(uuid.uuid1())params['order_id'] = order_id# 保存音频audio_path = save_audio(order_id, params['audio_bytes'])params['audio_path'] = audio_path# 预估转写时间task_estimate_time = estimate_time(params.get('roleType'), params.get('duration'))params['taskEstimateTime'] = task_estimate_time# 预设转写结果和状态params['transcribe_result'] = Noneparams['status'] = 0  # 0表示处理中,1表示已完成,-1表示失败params['code'] = '100024'params['descInfo'] = 'Transcription in progress, please check results later.'# 将本次订单的相关信息存入redis数据库save_info_to_redis(params)# 添加后台转写任务process_transcription_task.delay(**params)return {"code": "000000","descInfo": "Success","content": {"orderId": params.get('order_id'), "taskEstimateTime": round(task_estimate_time, 2)}}

模块初始化

router = APIRouter()创建了一个 APIRouter 实例,用于定义该模块的路由。后续的路由(如 /upload)都会注册到该路由器上,可以集成到主应用中。limiter = Limiter(key_func=get_remote_address)初始化 访问限制器,通过客户端 IP 地址(get_remote_address)限制每秒请求频率,保护服务免受高频访问的影响。

/upload 路由

@router.post("/upload")
@limiter.limit("20/second")定义一个 POST 请求的路由 /upload。设置访问限制:每秒最多允许 20 次请求。

API 功能逻辑

参数校验与解析params: dict = Depends(validate_params)使用 FastAPI 的依赖注入机制,通过 validate_params 函数对请求参数进行校验。params 是解析后的字典,包含上传音频任务所需的关键信息(如角色类型、音频时长、音频字节流等)。

生成订单 ID

order_id = str(uuid.uuid1())
params['order_id'] = order_id为当前任务生成一个唯一的订单 ID,用于任务跟踪。

保存音频文件

audio_path = save_audio(order_id, params['audio_bytes'])
params['audio_path'] = audio_path调用 save_audio 函数保存音频文件,并将文件路径存入 params。

预估转写时间

task_estimate_time = estimate_time(params.get('roleType'), params.get('duration'))
params['taskEstimateTime'] = task_estimate_time根据角色类型(roleType)和音频时长(duration)估算转写任务的执行时间。

初始化任务状态

params['transcribe_result'] = None
params['status'] = 0
params['code'] = '100024'
params['descInfo'] = 'Transcription in progress, please check results later.'设置任务的初始状态为处理中(status = 0),转写结果为空。

存储任务信息到 Redis

save_info_to_redis(params)将任务的所有信息存入 Redis,以便后续查询和更新。

后台异步任务

process_transcription_task.delay(**params)调用 Celery 的 delay() 方法,将任务提交到后台队列中,异步处理转写逻辑。

返回响应

return {"code": "000000","descInfo": "Success","content": {"orderId": params.get('order_id'), "taskEstimateTime": round(task_estimate_time, 2)}
}返回任务提交成功的响应,包含订单 ID 和预估的任务时间。

功能总结

路由功能:提供音频文件上传接口。
访问限制:限制请求频率(每秒 20 次),保障服务稳定性。
任务管理:校验参数,生成任务唯一标识。保存音频文件并估算处理时间。初始化任务状态并存储到 Redis。提交转写任务到后台(使用 Celery 或类似的任务队列)。
响应信息:返回成功的订单 ID 和任务估算时间。

使用场景

该路由可用于 非实时语音转写服务,适合以下场景:批量上传音频文件:需要排队处理任务。异步任务管理:用户提交后可稍后查询转写结果。高并发环境:通过限流机制和后台任务队列,防止服务过载。

routers/query_result.py

这个脚本是一个基于 FastAPI 的路由模块,提供了查询转写任务结果的 API(/getResult)。该模块通过 Redis 存储任务数据,并根据任务状态返回相应信息。

from fastapi import APIRouter, HTTPException, Form, Request
from slowapi import Limiter
from slowapi.util import get_remote_address
import redisfrom utils.utils import  raise_http_exceptionrouter = APIRouter()# Redis客户端模拟
redis_client = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True)# 初始化 Limiter
limiter = Limiter(key_func=get_remote_address)@router.post("/getResult")
@limiter.limit("50/second")  # 设置访问限制
async def get_result(request: Request, orderId: str = Form(...)):# 检验工单是否存在if not redis_client.exists(orderId):raise_http_exception(404, "Order ID unknown", '100020')# 模拟获取转录结果task_data = redis_client.hgetall(orderId)orderResult = {"lattice": task_data.get('transcribe_result'),}# 如果status为0,说明工单还在处理中,返回timeRequired字段orderInfo = {"status": task_data.get('status'),"orderId": task_data.get('order_id'),"originalDuration": task_data.get('duration'),"timeRequired": 1000,}if task_data.get('status') != 0:del orderInfo['timeRequired']return {"code": task_data.get('code'),"descInfo": task_data.get('descInfo'),"content": {"orderResult": orderResult, "orderInfo": orderInfo}}

模块初始化

router = APIRouter()创建 APIRouter 实例,用于定义该模块的路由。redis_client = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True)初始化 Redis 客户端,连接到 Redis 服务的 db=1 数据库。decode_responses=True:保证从 Redis 获取的数据是字符串格式。limiter = Limiter(key_func=get_remote_address)初始化访问限制器,通过 get_remote_address 以客户端 IP 作为请求限制的依据。

/getResult 路由

@router.post("/getResult")
@limiter.limit("50/second")定义一个 POST 请求的路由 /getResult。设置访问限制:每秒最多允许 50 次请求。

API 功能逻辑

参数接收与校验orderId: str = Form(...)从请求的表单数据中接收 orderId 参数。Form(...) 表示该参数为必填项。检查订单是否存在if not redis_client.exists(orderId):raise_http_exception(404, "Order ID unknown", '100020')使用 Redis 的 exists() 方法检查 orderId 是否存在。如果订单 ID 不存在,调用 raise_http_exception 抛出 HTTP 404 异常,并附带错误代码 100020。获取订单数据task_data = redis_client.hgetall(orderId)从 Redis 中获取以哈希表存储的任务数据。返回的数据是一个字典,包含转录结果、状态、订单 ID 等信息。组织返回数据转录结果:orderResult = {"lattice": task_data.get('transcribe_result'),}返回转录结果(可能为空,表示任务未完成)。订单信息:orderInfo = {"status": task_data.get('status'),"orderId": task_data.get('order_id'),"originalDuration": task_data.get('duration'),"timeRequired": 1000,}if task_data.get('status') != 0:del orderInfo['timeRequired']返回任务状态、订单 ID 和音频原始时长。如果任务正在处理中(status=0),返回预估所需时间字段 timeRequired。如果任务已完成或失败,则移除 timeRequired 字段。构建响应return {"code": task_data.get('code'),"descInfo": task_data.get('descInfo'),"content": {"orderResult": orderResult, "orderInfo": orderInfo}}返回包含任务状态码(code)、描述信息(descInfo)、转录结果和订单信息的完整响应。

使用场景

该路由用于查询音频转写任务的执行状态和结果,适合以下场景:任务进度查询:用户提交任务后,通过订单 ID 查询任务是否完成。如果任务还在处理中,返回 timeRequired,提示用户稍后再试。结果获取:如果任务已完成,返回转录结果 lattice。高并发支持:使用 Redis 提供快速查询能力,支持高并发访问。设置访问限制(每秒 50 次),保护服务免受过载。

utils/utils.py

这个脚本提供了一些通用的工具函数和依赖,用于处理非实时语音转写服务的参数验证、Redis 数据操作以及音频存储。它是 FastAPI 应用中的辅助模块,主要用于支持上传接口和其他功能。

from fastapi import APIRouter, BackgroundTasks, HTTPException, File, UploadFile, Form, Depends, Request
from typing import Optional
import redis
import os
import reredis_order_data = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True)# 通用函数,简化异常处理逻辑,提高代码可读性和一致性。
def raise_http_exception(status_code:  int, desc_info: str, code: str):"""封装 HTTPException 的通用函数"""raise HTTPException(status_code=status_code,detail={"descInfo": desc_info,"code": code,},)# 自定义依赖项函数,用于验证参数,必传参数Pydantic(FastAPI用于数据验证的库)在后台自动验证
async def validate_params(audioFile: UploadFile = File(...),fileName: str = Form(...),fileSize: int = Form(...),duration: int = Form(...),callbackUrl: Optional[str] = Form(None),hotWord: Optional[str] = Form(None),hotWordId: Optional[int] = Form(None),roleType: Optional[int] = Form(0),audioMode: Optional[str] = Form('fileStream'),audioUrl: Optional[str] = Form(None),standardWav: Optional[int] = Form(0),
):# 验证文件名是否符合要求,系统会自动检验必传参数是否传递,不需要手动验证file_root, file_extension = os.path.splitext(fileName)if file_extension not in ['.pcm', '.mp3', '.wav']:raise_http_exception(400, "File name error, only support pcm, wav, mp3", '100001')# 验证文件大小是否符合要求file_size = audioFile.file.seek(0, 2)audioFile.file.seek(0)  # 重置文件指针if fileSize != file_size:raise_http_exception(400, "File size error", '100002')# 其他的验证逻辑# 估算转写需要的时间,单位毫秒
def estimate_time(roleType: int, duration: int):pass# 保存音频
def save_audio(order_id, audio_bytes):pass# 保存信息到redis
def save_info_to_redis(params):pass

utils/tasks.py

这个脚本定义了一个基于 Celery 的异步任务,用于处理语音转写请求,并通过 Redis 更新工单状态和结果。代码中集成了 HTTP 请求、Redis 数据库操作以及错误处理逻辑。

from celery import Celery
import httpx
import redis
import time
import jsonredis_order_data = redis.StrictRedis(host='localhost', port=6379, db=1, decode_responses=True)
redis_hotWord_data = redis.StrictRedis(host='localhost', port=6379, db=2, decode_responses=True)app = Celery('transcription_tasks', broker='redis://localhost:6379/3')  # 没有设置backend,因为不需要返回结果,只需要异步执行任务@app.task
def process_transcription_task(**params):try:# 获取参数,urlurl = "http://ip:port/transcribe"if params.get('hotWordId') is not None:hotword = redis_hotWord_data.get(params.get('hotWordId'))else:hotword = params.get('hotWord')files = {'file': params.get('audio_bytes')}data = {'roleType': params.get('roleType'),'file_extension': params.get('file_extension'),'standardWav': params.get('standardWav'),'hotWord': hotword,}if hotword is None:del data['hotWord']start = time.time()print("++++++++开始转写")with httpx.Client(timeout=httpx.Timeout(float(params['taskEstimateTime'] / 1000) * 2)) as client:  # timeout设置为任务预估时间的两倍,防止超时 AsyncClientresponse = client.post(url, data=data, files=files)print(f"++++++++转写耗时:{time.time() - start}")print("++++++++转写成功")# 更新转写工单信息redis_order_data.hset(params.get('order_id'),mapping={"status": 1, "transcribe_result": json.dumps(response.json()), "code": "000000","descInfo": "Success"})except Exception as e:# 处理错误情况print('++++++++发生报错')redis_order_data.hset(params.get('order_id'),mapping={"status": -1, "transcribe_result": json.dumps(None), "code": "100018","descInfo": f"An unexpected error occurred: {e}"})

Celery 初始化及启动

#  初始化
app = Celery('transcription_tasks', broker='redis://localhost:6379/3')任务队列名称:transcription_tasks。消息代理(Broker):使用 Redis(第 3 个数据库)作为任务调度系统的消息队列。未设置 Backend:不需要任务执行的返回结果,只是异步执行操作。# 启动
celery -A utils.tasks worker -l info  --concurrency=16concurrency=16表示最大的并发量是16,如果设置为1,当前的服务就是串行执行,只有当前一个任务完成时才能执行下一个任务。utils.tasks是tasks的相对路径,之所以要加上utils是因为要使用相对路径服务才能找到对应的tasks。

异步任务定义

@app.task
def process_transcription_task(**params):任务名称:process_transcription_task。任务目的:处理非实时语音转写,包括文件上传、转写请求和结果存储。任务参数:params 是一个动态的字典,包含上传接口中收集的所有参数信息。

任务流程

1. 热词加载
if params.get('hotWordId') is not None:hotword = redis_hotWord_data.get(params.get('hotWordId'))
else:hotword = params.get('hotWord')优先从 Redis 中根据 hotWordId 加载热词数据。如果 hotWordId 为空,使用用户上传的 hotWord。2. HTTP 请求准备files = {'file': params.get('audio_bytes')}data = {'roleType': params.get('roleType'),'file_extension': params.get('file_extension'),'standardWav': params.get('standardWav'),'hotWord': hotword,}if hotword is None:del data['hotWord']音频文件:通过 audio_bytes 传递。请求体数据:包含 roleType(角色类型)、文件扩展名(file_extension)、是否标准化(standardWav)等信息。如果 hotWord 为空,则从数据中移除对应字段。3. 发送转写请求with httpx.Client(timeout=httpx.Timeout(float(params['taskEstimateTime'] / 1000) * 2)) as client:response = client.post(url, data=data, files=files)HTTP 客户端:使用 httpx.Client 发送 POST 请求。超时时间:设置为预估任务时间的两倍,防止任务因延迟而超时。请求 URL:假设是一个转写服务的接口(http://ip:port/transcribe)。4. 处理成功响应redis_order_data.hset(params.get('order_id'),mapping={"status": 1, "transcribe_result": json.dumps(response.json()), "code": "000000","descInfo": "Success"})更新 Redis 中的工单信息:status:设置为 1,表示任务完成。transcribe_result:存储转写结果(JSON 格式)。code 和 descInfo:标记为成功。5. 错误处理except Exception as e:redis_order_data.hset(params.get('order_id'),mapping={"status": -1, "transcribe_result": json.dumps(None), "code": "100018","descInfo": f"An unexpected error occurred: {e}"})捕获所有异常,更新 Redis:status:设置为 -1,表示任务失败。transcribe_result:结果为空。code 和 descInfo:标记为失败,并记录错误信息。

关键特性

1. 异步任务处理通过 Celery 实现异步任务分发和执行,解耦了任务处理逻辑和主程序。提高系统的并发能力,特别适合需要耗时的任务(如音频转写)。
2. 状态管理使用 Redis 存储工单的状态信息,支持快速读取和更新。工单状态分为:0:处理中。1:处理完成。-1:处理失败。
3. 动态超时时间根据任务预估时间设置超时时间,提高了接口调用的稳定性和灵活性。

总结

该服务以高效、模块化和异步化的架构设计,满足了非实时语音转写的需求,并具有良好的可扩展性和稳定性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/890971.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【每日学点鸿蒙知识】hvigor升级、Dialog动画、LocalStorage无效、页面与子组件的生命周期、cookie设置

1、HarmonyOS 编译工具hvigor如何升级到"hvigorVersion": "4.2.0"版本? 可以手动更新到指定版本,参考链接如下:https://developer.huawei.com/consumer/cn/doc/harmonyos-guides-V5/ide-hvigor-plugin-V5 2、HarmonyOS…

【AI学习】DeepSeek-V3 技术报告学习:总体架构

翻了一下DeepSeek-V3 技术报告学习,太长,只是大概翻了一下,其中Multi-Token Prediction的技术就很亮眼。 摘要 本文介绍了DeepSeek-V3,这是一个拥有671B总参数的强大混合专家(MoE)语言模型,每…

C语言-数据结构-图

目录 一,图的概念 1,图的定义 2,图的基本术语 二,图的存储结构 1,邻接矩阵 2,邻接表 三,图的遍历 1,深度优先搜索 2,广度优先搜素 四,生成树和最小生成树 1,生成树的特点: 2,最小生成树 (1)普利姆算法Prim (2)普里姆算法思路 五,最短路径 1,Dijkstra算法 2,Fl…

C语言-数据结构-查找

目录 一,查找的概念 二,线性查找 1,顺序查找 2,折半查找 3,分块查找 三,树表的查找 1,二叉排序树 (1)查找方式: (2)、二叉排序树的插入和生成 (3)、二叉排序树的删除 2,平衡二叉树 (1)、什么是平衡二叉树 (2)、平衡二叉树的插入调整 (1)L…

【微信小程序】4plus|搜索框-历史搜索 | 我的咖啡店-综合实训

升级版1-清空全部的再次确认 实现功能: 历史搜索记录展示-历史搜索记录展示10条点击跳转-点击历史搜索记录可同步到搜索框并自动搜索全部删除-可一次性全部删除历史搜索记录全部删除-有再次确认操作展示 进行搜索后留下搜索记录 点击垃圾桶图标,显示【清空全部】 点击【清…

macrodroid通过http请求控制手机运行宏

macrodroid adb命令 adb shell pm grant com.arlosoft.macrodroid android.permission.WRITE_SECURE_SETTINGS例:http请求手机播放指定MP3文件 声音素材_电量过低提醒 新建一个宏 添加触发器-连接-http服务器请求 路径随意填,最好不要有特殊符号,不然浏览器识别链接会出错,…

【CSS in Depth 2 精译_098】17.3:CSS 动画延迟技术与填充模式设置 + 17.4:通过 CSS 动画传递意图的秘诀

当前内容所在位置(可进入专栏查看其他译好的章节内容) 第五部分 添加动效 ✔️【第 17 章 动画】 ✔️ 17.1 关键帧17.2 3D 变换下的动画设置 17.2.1 添加动画前页面布局的构建17.2.2 为布局添加动画 17.3 动画延迟与填充模式 ✔️17.4 通过动画传递意图…

慧集通客户案例:致远OA与熵基考勤机集成方案

本原型公司是一家专注大健康产业的综合性高新科技形实体企业,按照单位的战略业务布局,围绕“做强做优、世界一流”的目标,加快内外部资源整合、加强业务协同、优化资源配置,有序推进大健康及相关产业的有机融合,加快构…

深度学习笔记(6)——循环神经网络RNN

循环神经网络 RNN 核心思想:RNN内部有一个“内部状态”,随着序列处理而更新 h t f W ( h t − 1 , x t ) h_tf_W(h_{t-1},x_t) ht​fW​(ht−1​,xt​) h t h_t ht​是new state, h t − 1 h_{t-1} ht−1​是old state, x t x_t xt​是当前时间步的输入,所有时间步共享 f W…

电脑卡顿救星,Mem Reduct 智能清理 10%以上内存

作为一款专业的内存优化工具,Mem Reduct凭借其强大的功能和极致的性能表现,成为众多用户管理系统内存的首选软件。它采用先进的内存管理算法,通过调用系统底层API接口,能够智能识别并清理各类内存占用,包括但不限于系统…

kibana启动报错:Invalid character in header content [“kbn-name“]

启动时候kibana报错: 打开 kibana配置文件,config/kibana.yml,配置上server.name即可,如下:

短视频矩阵系统后端源码搭建实战与技术详解,支持OEM

一、引言 随着短视频行业的蓬勃发展,短视频矩阵系统成为了众多企业和创作者进行多平台内容运营的有力工具。后端作为整个系统的核心支撑,负责处理复杂的业务逻辑、数据存储与交互,其搭建的质量直接影响着系统的性能、稳定性和可扩展性。本文将…

sql group by 多个字段例子

有表如下; 获取某年份、某地区、某产品的销售总额, 或者根据需要把字段顺序换一下; insert into sales (product, year, region, amount) values (飞机,2000,东部,5); insert into sales (product, year, region, amount) values (飞机,2001,…

RBAC权限控制

1、Spring Security 是一个功能强大的Java安全框架,它提供了全面的安全认证和授权的支持。 2 SpringSecurity配置类(源码逐行解析) Spring Security的配置类是实现安全控制的核心部分 开启Spring Security各种功能,以确保Web应…

ArcGIS Pro地形图四至角图经纬度标注与格网标注

今天来看看ArcGIS Pro 如何在地形图上设置四至角点的经纬度。方里网标注。如下图的地形图左下角经纬度标注。 如下图方里网的标注 如下为本期要介绍的例图,如下: 图片可点击放大 接下来我们来介绍一下 推荐学习:GIS入门模型构建器Arcpy批量…

Kubernetes Gateway API-2-跨命名空间路由

1 跨命名空间路由 Gateway API 具有跨命名空间路由的核心支持。当多个用户或团队共享底层网络基础设施时,这很有用,但必须对控制和配置进行分段,以尽量减少访问和容错域。 Gateway 和 Route(HTTPRoute,TCPRoute,GRPCRoute) 可以部署到不同的命名空间中,路由可以跨命名空间…

Web API和Web Services的区分

前些年一提及自动化测试,大多是指UI界面层的自动化测试。近几年,随着分层自动化测试概念的兴起,以及自动化测试自身的发展与细分,自动化测试包含了更多的内容。 API(Application ProgrammingInterface,应用程序编程接…

使用c#制作坐标

1、建立坐标 2、坐标系的放大缩小 3、标定刻度 4、实时显示鼠标在坐标系上的坐标 using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Drawing.Drawing2D; using System.Linq; using S…

JVM实战—JVM内存设置与对象分配流转

1.JVM内存划分的原理细节 (1)背景引入 接下来介绍JVM内存的分代模型:新生代、老年代、永久代。现在已知代码里创建的对象,都会进入到Java堆内存中。如下所示,main()方法会周期性执行loadReplicasFromDisk()方法来加载副本数据。 public class…

Debian 12 安装配置 fail2ban 保护 SSH 访问

背景介绍 双十一的时候薅羊毛租了台腾讯云的虚机, 是真便宜, 只是没想到才跑了一个月, 系统里面就收集到了巨多的 SSH 恶意登录失败记录. 只能说, 互联网真的是太不安全了. 之前有用过 fail2ban 在 CentOS 7 上面做过防护, 不过那已经是好久好久之前的故事了, 好多方法已经不…