面向管理后台的系统中,经常会有文件导入的需求。常规的做法就是同步等待,但在业务关系复杂(多表数据校验)、数据量较大的情况下,管理人员只能等结果,也可能会等到超时。
使用异步的话,将导入数据的功能与后端接口解耦,避免接口超时导致的任务中止,也无需前端只为了拿个结果一直保持连接等待。
前端在上传文件后,后端接口将导入任务推送(MQ、管道...)出去,然后直接返回前端。导入服务接到任务执行导入,并根据需求将实时导入状态维护到缓存中。前端查询/轮询后端从缓存取出当前导入状态。
流程图如下:
异步导入.png
简单的PHP + Swoole后端代码示例(实际就两个接口方法upFile、importStatus,和Task的导入处理):
/**
* Created by PhpStorm.
* User: wen
* Date: 2018/12/8
* Time: 11:09 PM
*/
require 'vendor/autoload.php';
use Swoole\Http\Server;
const BASE_DIR = __DIR__;
// 路由定义
$router = [
'GET' => [
'/importStatus' => 'importStatus' // 查询导入状态
],
'POST' => [
'/upFile' => 'upFile' // 上传导入文件
]
];
// ----SWOOLE-HTTP服务设置部分
$http = new Server("127.0.0.1", 9501);
$http->set([
'worker_num' => 2,
'task_worker_num' => 4,
]);
$http->on('request', function ($request, $response) use ($router, $http) {
$funName = $router[$request->server['request_method']][$request->server['request_uri']] ?? 'NotFound';
if (!function_exists($funName)){
return backJson($response, null, 404, 'ROUTER NOT FOUND');
}
try{
$funName($request, $response, $http);
}catch (Exception $e){
return backJson($response, null, 500, $e->getMessage());
}
});
// 实际导入操作
$http->on('Task', function (swoole_server $serv, $task_id, $from_id, $data) {
$redis = getNewRedis();
$status = [
'step' => 1, // 文件准备处理
'progressRate' => '',
'info' => [],
];
$redis->set($data, json_encode($status, JSON_UNESCAPED_UNICODE));
// 读取文件 使用了PhpOffice\PhpSpreadsheet解析EXCEL
$spreadsheet = \PhpOffice\PhpSpreadsheet\IOFactory::load(BASE_DIR . '/' . $data);
$sheetData = $spreadsheet->getActiveSheet()->toArray(null, true, true, true);
$count = count($sheetData);
$status['step'] = 2; // 文件解析完成
$status['progressRate'] = "解析到文件数据{$count}条";
$redis->set($data, json_encode($status, JSON_UNESCAPED_UNICODE));
$names = [];
var_dump($sheetData);
foreach ($sheetData as $k => $item){
if (1==$k) continue; // 第一行为表头
if (empty($item['A'])) {
unset($sheetData[$k]);
$status['info'][] = "第{$k}行姓名为空";
continue;
}
$names[] = $item['A'];
}
$redis->set($data, json_encode($status, JSON_UNESCAPED_UNICODE));
// TODO: 验证数据库name是否已存在 插入等业务处理...(此处代码省略)
// TODO: 将进度维护到redis
});
$http->on('Finish', function () {});
// ----基础函数部分
function getNewRedis(){
($redis = new \Redis())->connect('127.0.0.1');
return $redis;
}
function backJson($response, $content, $statusCode=200, $msg=''){
$response->header('Content-Type', 'application/json');
$jsonData = [
'statusCode' => $statusCode,
'content' => $content,
'msg' => $msg,
];
$response->end(json_encode($jsonData, JSON_UNESCAPED_UNICODE));
return true;
}
function NotFound($request, $response){
return backJson($response, null, 404, 'ROUTER NOT FOUND');
}
// ----接口方法
// 上传文件
function upFile($request, $response, $server){
$file = $request->files['file'] ?? null;
if (empty($file)) { throw new Exception('未收到上传文件'); }
$importSN = md5($file['tmp_name'] . time()) . '.' . pathinfo($file['name'])['extension'];
$bol = move_uploaded_file($file['tmp_name'], BASE_DIR . '/' . $importSN);
if (false === $bol) { throw new Exception('文件处理异常'); }
$status = [
'step' => '0',
'progressRate' => '',
'info' => [],
];
getNewRedis()->set($importSN, json_encode($status, JSON_UNESCAPED_UNICODE));
$server->task($importSN);
return backJson($response, ['importSN'=>$importSN]);
}
// 查询导入状态
function importStatus($request, $response){
$importSN = $request->get['importSN'] ?? null;
if (!$importSN){ throw new Exception('导入任务编号不正确'); }
$redis = getNewRedis();
$content = $redis->get($importSN);
if (!$content){ throw new Exception('未查询到任务'); }
return backJson($response, json_decode($content));
}
$http->start();
PostMan访问示例:
屏幕快照 2018-12-09 上午4.06.17.png
屏幕快照 2018-12-09 上午4.04.16.png
屏幕快照 2018-12-09 上午4.04.50.png
这里主要是任务投递的渠道,如Channel、MQ服务、Unix Socket等。
Channel:最简单好用,同服务进程内通信,进程挂了就都gg
MQ服务:独立服务,简单通用,可以多服务器,可靠性高
Unix Socket:单服务器内进程间通信,偏底层