Python 实现http server接收mutipart/form-data文件 方法2
- 1 Server端代码
- 2 测试
1 Server端代码
以下代码亲测有效,仅适应于接收客户端上传的图片,其他文件未曾测试,作者主要应用于平时的自测工具。
说明1: 如有特殊需求,请自行更改,有意见,评论区留言。
说明2: 建议使用这个方法用于生产、成熟软件开发。Python 实现http server接收mutipart/form-data文件 方法1
from dataclasses import replace
from http.server import HTTPServer, BaseHTTPRequestHandler
import json,os,re,chardet,posixpath,codecs
from urllib.parse import unquoteclass Resquest(BaseHTTPRequestHandler):def do_POST(self):print("headers:", self.headers)print("command:",self.command)print("path:",self.path)bdy = self.headers.get_boundary()if not bdy is None:print(" = in headers")boundary = self.headers['content-type'].split("=")[1]print("boundary:",boundary)if "Boundary" in boundary :print("parser image")self.deal_post_data()else :print("can't handle boundary")else :print(" = not in headers")req_datas = self.rfile.read(int(self.headers['content-length'])) print("--------------------接受client发送的数据----------------")res1 = req_datas.decode('utf-8')res = json.loads(req_datas)print(res)#f = open('1.txt', 'w')#f.write(req_datas.decode())#f.close()#print(req_datas)print("--------------------接受client发送的数据完毕------------------")#data1 = {'bbb':'222'}#data = json.dumps(data1)#self.send_response(200)#self.send_header('Content-type', 'application/json')#self.end_headers()#self.wfile.write(data)def deal_post_data(self):boundary = self.headers["Content-Type"].split("=")[1].encode('ascii')print("boundary===", boundary)remain_bytes = int(self.headers['content-length'])print("remain_bytes===", remain_bytes)res = []line = self.rfile.readline()print("line=",line)bdarys = boundary.decode()bdarys = bdarys.replace(" ","")boundary = bdarys.encode()while boundary in line and str(line, encoding = "utf-8")[-4:] != "--\r\n":#line = self.rfile.readline()remain_bytes -= len(line)if boundary not in line:return False, "Content NOT begin with boundary"line = self.rfile.readline()remain_bytes -= len(line)print("line!!!==",str(line)) strLine = str(line) fn = strLine.split(';') [2].replace(' filename=','')fn= fn[:-5]print("fn==",fn)if not fn:print("False, Can't find out file name...")return False, "Can't find out file name..."path = translate_path(self.path)print("path==",path)fname = fn#fname = fname.replace("\\", "\\\\")fname = self.str_to_chinese(fname)print("fname==",fname)fn = os.path.join(path, fname)print("SavefilePath==",fn)dirname = os.path.dirname(fn)if not os.path.exists(dirname):os.makedirs(dirname)line = self.rfile.readline()remain_bytes -= len(line)line = self.rfile.readline()# b'\r\n'remain_bytes -= len(line)try:out = open(fn, 'wb')except IOError:return False, "Can't create file to write, do you have permission to write?"pre_line = self.rfile.readline()print("pre_line", pre_line)remain_bytes -= len(pre_line)print("remain_bytes", remain_bytes)Flag = Truewhile remain_bytes > 0:line = self.rfile.readline()#print("while line", line)if boundary in line:remain_bytes -= len(line)pre_line = pre_line[0:-1]if pre_line.endswith(b'\r'):pre_line = pre_line[0:-1]out.write(pre_line)out.close()#return True, "File '%s' upload success!" % fnres.append("File '%s' upload success!" % fn)Flag = Falsebreakelse:out.write(pre_line)pre_line = lineif pre_line is not None and Flag == True:out.write(pre_line)out.close()res.append("File '%s' upload success!" % fn)#return False, "Unexpect Ends of data."return True, resdef str_to_chinese(self,var):not_end = Truewhile not_end:start1 = var.find("\\x")# print start1if start1 > -1:str1 = var[start1 + 2:start1 + 4]print(str1)start2 = var[start1 + 4:].find("\\x") + start1 + 4if start2 > -1:str2 = var[start2 + 2:start2 + 4]start3 = var[start2 + 4:].find("\\x") + start2 + 4if start3 > -1:str3 = var[start3 + 2:start3 + 4]else:not_end = Falseif start1 > -1 and start2 > -1 and start3 > -1:str_all = str1 + str2 + str3# print str_allstr_all = codecs.decode(str_all, "hex").decode('utf-8')str_re = var[start1:start3 + 4]# print str_all, " " ,str_revar = var.replace(str_re, str_all)# print var.decode('utf-8')return vardef translate_path(path):"""Translate a /-separated PATH to the local filename syntax.Components that mean special things to the local file system(e.g. drive or directory names) are ignored. (XXX They shouldprobably be diagnosed.)"""# abandon query parameterspath = path.split('?', 1)[0]path = path.split('#', 1)[0]path = posixpath.normpath(unquote(path))words = path.split('/')words = filter(None, words)path = os.getcwd()for word in words:drive, word = os.path.splitdrive(word)head, word = os.path.split(word)if word in (os.curdir, os.pardir):continuepath = os.path.join(path, word)return pathif __name__ == '__main__':host = ('192.168.1.16', 18098)server = HTTPServer(host, Resquest)print("Starting server, listen at: %s:%s" % host)server.serve_forever()
2 测试
参见链接文章中的客户端测试方法进行测试。
如有其他需求或问题,烦请评论区留言。
Python 实现http server接收mutipart/form-data文件 方法1