创建股票数据库
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2018-09-04 14:34:59
# @Author : Michael Li
# @Version : $V2.0$import pandas as pd
import numpy as np
import datetime
import random
import pymssql
from sqlalchemy import create_engine
import tushare as ts
import logging
from time import sleep
from queue import LifoQueue
import threading
#以上是需要使用的Python包,没有安装的请pip install XXXXclass stock(object):"""get stock information"""def __init__(self):self.host = '127.0.0.1'self.user = 'sa'self.password = 'test'self.port = 3306 #端口self.database = 'stock'self.Daily_tableName = 'daily'self.Daily_basic_tableName = 'daily_basic'#开始和结束时间仅用于获取区间数据,代码中有用于获取当前日期的函数。self.startTime = '20100101' #开始时间self.endTime = datetime.datetime.now().strftime('%Y%m%d') #结束时间#以下为两个队列,用于支撑多线程。self.ts_code_queue = LifoQueue() #ts_code队列,用于获取指定代码的股票信息。self.trade_cal_queue = LifoQueue() #trade_cal交易日期队列,用于获取指定时间的股票信息。ts.set_token('')self.pro = ts.pro_api()def log(self):''' 日志功能函数'''logger = logging.getLogger()logger.setLevel(logging.INFO)handler = logging.FileHandler(r'D:/abc.log', encoding='utf-8')# handlerStream = logging.StreamHandler()formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")handler.setFormatter(formatter)logger.addHandler(handler)# logger.addHandler(handlerStream)logging.info('开始获取%s的数据' % (self.getDatetime()))def insertMysql(self, tableName, data):conn = pymssql.connect(host = '.',user ='sa',password = 'test',database = 'stock',charset ='utf8')'''创建数据库连接,需要先在数据库中建立相应的表和表结构'''engine = create_engine('mssql+pymssql://%s:%s@%s/%s' % (self.user, self.password, self.host, self.database))'''将获取到的数据插入到数据库中,if_exists=append为向后添加,index=False为不保存DF自动生成的index'''data.to_sql(tableName,con = engine,if_exists='append', index=False)def getDatetime(self):'''获取当天日期的函数 '''taday = datetime.datetime.now().strftime('%Y%m%d')return tadaydef getTrade_cal(self):# 获取各大交易所交易日历数据,默认提取的是上交所(注意start_date和end_date的值,获取周期数据可使用self.startTime和self.endTime)#官方文档:https://tushare.pro/document/2?doc_id=26trade_cal = self.pro.query('trade_cal', exchange='SZSE', start_date=self.startTime,end_date=self.endTime, is_open=1)#将日期列转换为list,便于使用队列trade_cals = trade_cal['cal_date'].tolist()#循环将每个交易日塞进队列中for trade_cal in trade_cals:self.trade_cal_queue.put(trade_cal, True, 2)logging.info('Trade_cal 队列中共计:%s 条信息' % (self.trade_cal_queue.qsize()))def getList(self):'''获取获取基础信息数据,包括股票代码、名称、上市日期、退市日期等'''stock_basic = self.pro.query('stock_basic', exchange_id='', is_hs='N',fields='ts_code,symbol,name,fullname,list_date,list_status')stock_basic.set_index('ts_code', inplace=True)for ts_code in stock_basic.index.tolist()[0:20]:self.ts_code_queue.put(ts_code, True, 2)logging.info('共计%s条数据' % (self.ts_code_queue.qsize()))#备注:此函数不是经常使用。def getDaily_code(self):'''获取指定股票代码的数据'''print('子线程(%s)启动' % (threading.current_thread().name))while not self.ts_code_queue.empty():#访问队列,队列为空时退出。code = self.ts_code_queue.get(True, 3)#使用tushare获取(code是从队列中取出的ts_code)#Tushare 官方文档:https://tushare.pro/document/2?doc_id=27daily = self.pro.query('daily', ts_code=code,start_date=self.startTime, end_date=self.endTime)daily['trade_date'] = pd.to_datetime(daily['trade_date'], format='%Y-%m-%d')#将获取到的数据保存至数据库self.insertMysql(self.Daily_tableName, daily)# print(code)sleep(random.randint(1, 2))logging.info('------>%s的getDaily_code数据全部搞定<------' %(self.getDatetime()))def getDaily_date(self):'''获指定日期或日期范围的股票数据'''logging.info('线程(%s)启动' % (threading.current_thread().name))while not self.trade_cal_queue.empty():#队列功能同上个函数date = self.trade_cal_queue.get(True, 2)#使用tushare获取(date是从队列中取出的日期)#Tushare 官方文档:https://tushare.pro/document/2?doc_id=27daily = self.pro.query('daily', start_date=date)daily['trade_date'] = pd.to_datetime(daily['trade_date'], format='%Y-%m-%d')self.insertMysql(self.Daily_tableName, daily)logging.info('--> %s 的数据共计:%s 条 <--' %(date, len(daily)))sleep(random.randint(1, 2))logging.info('线程(%s)结束' % (threading.current_thread().name))def getDaily_basic(self):'''获取Daily_basic数据'''#Tushare 官方文档:https://tushare.pro/document/2?doc_id=32daily_basic = self.pro.query('daily_basic', ts_code='002427',trade_date=self.getDatetime())#简单的数据清洗,将空值填充为0daily_basic = daily_basic.fillna(0)self.insertMysql(self.Daily_basic_tableName, daily_basic)logging.info('------>%s的Daily_basic的数据搞定<------' %(self.getDatetime()))#print(daily_basic)def main(self):#这里算是总调度吧self.log() #日志self.getDaily_basic() # 获取Daily_basicself.getTrade_cal() #获取日期(可是周期,也可是单天日期)#启动多线程(4个)for x in range(4):x += 1t1 = threading.Thread(target=self.getDaily_date, name='getDaily_date %d 号进程' % (x))t1.start()t1.join()if __name__ == '__main__':#大功告成,实例化后启动main函数。stock = stock()stock.main()