代码经过网络搜索,综合算是原创吧.py脚本为服务端
项目文件在
https://github.com/jinjie412/service_client_socket
import socketserver
import json
import base64
import os
from te import OPMysql,Caltime
import time
#数据库操作
opm = OPMysql()
#加密发包内容
def crypt(source,key):
from itertools import cycle
result=''
temp=cycle(key)
for ch in source:
result=result+chr(ord(ch)^ord(next(temp))^ord('1'))
return result
def jiami(f,key):
if os.path.isfile(f):
f=open(f,encoding='gbk')
content=f.read()
# print(content)
f.close()
print(content)
# content=content.encode(encoding='gbk')
content=crypt(content,key)
print(content)
content=crypt(content,key)
print(content)
content=content.encode(encoding='gbk')
print(content)
file_size = len(content)
return file_size,content
else:
return None,False
#数据库搜索客户端发送来的信息有效性
def yz(datajs):
sql = "select id,user,dqsj,rjbh from kh where user='%s' and rjbh='%s'" %(datajs['user'],datajs['rjbh'])
res = opm.op_select(sql)
#验证用户有效性
if res:
# print(res)
timeArray = time.strptime(res[2], "%Y-%m-%d %H:%M:%S")
# print(timeArray)
#验证用户卡是否到期
dt=Caltime(time.localtime(),timeArray)
return dt
else:
return False
class myTCPhandler(socketserver.BaseRequestHandler):
def handle(self):
while True:
self.data = self.request.recv(1024).decode('gbk', 'ignore').strip()
if not self.data : break
self.data=base64.b64decode(self.data)
print(self.data)
datajs = json.loads(self.data)
print(datajs)
if yz(datajs):
#发送数据长度,发送数据内容
dsize,data = jiami('./js/'+datajs["jb"],datajs['key'])
if dsize:
#向客户端发送数据长度
self.feedback_data = ('%d' % (dsize,)).encode('gbk')
self.request.sendall(self.feedback_data)
#等待客户端返回
feedback = self.request.recv(100).decode('gbk', 'ignore').strip() #等待客户端确认
print(feedback)
if int(feedback) == dsize:
#发送所有数据
self.request.sendall(data)
else:
print('open file error')
else:
print('erro')
self.feedback_data = 'erro'.encode('gbk')
self.request.sendall(base64.b64encode(self.feedback_data))
def finish(self):
print("client is disconnect!")
host = '127.0.0.1'
port = 9090
server = socketserver.ThreadingTCPServer((host,port),myTCPhandler)
server.serve_forever()
te.py 文件
使用了数据库连接池,网络搜索
import MySQLdb
from DBUtils.PooledDB import PooledDB
import time
import datetime
mysqlInfo = {
"host": 'localhost',
"user": 'root',
"passwd": '000000',
"db": 'w',
"port": 3306,
"charset": 'utf8'
}
class OPMysql(object):
__pool = None
def __init__(self):
# 构造函数,创建数据库连接、游标
self.coon = OPMysql.getmysqlconn()
self.cur = self.coon.cursor()
# 数据库连接池连接
@staticmethod
def getmysqlconn():
if OPMysql.__pool is None:
__pool = PooledDB(creator=MySQLdb, mincached=5, maxcached=20, host=mysqlInfo['host'], user=mysqlInfo['user'], passwd=mysqlInfo['passwd'], db=mysqlInfo['db'], port=mysqlInfo['port'], charset=mysqlInfo['charset'])
print(__pool)
return __pool.connection()
# 插入\更新\删除sql
def op_insert(self, sql):
pass
# 查询
def op_select(self, sql):
try:
# print('op_select', sql)
self.cur.execute(sql) # 执行sql
select_res = self.cur.fetchone() # 返回结果为字典
# select_res =self.cur.fetchall()
return select_res
except Exception as e:
print(str(e))
return False
#释放资源
def dispose(self):
self.coon.close()
self.cur.close()
#计算到期时间
def Caltime(date1,date2):
date1=datetime.datetime(date1[0],date1[1],date1[2],date1[3],date1[4],date1[5])
date2=datetime.datetime(date2[0],date2[1],date2[2],date2[3],date2[4],date2[5])
return date2>date1
if __name__ == '__main__':
#申请资源
opm = OPMysql()
sql = "select id,user,dqsj,rjbh from w_kh where user='000' and rjbh='000'"
res = opm.op_select(sql)
print(res)
timeArray = time.strptime(res[2], "%Y-%m-%d %H:%M:%S")
print(timeArray)
dt=Caltime(time.localtime(),timeArray)
print(dt)
最后于 2018-5-5 12:27
被进口坚果编辑
,原因: