sql查询
import pymysql
import pandas as pduser = ''
password = ''
dbName = ''
dbHost = ''
dbPort = 8888
con = pymysql. connect( host= dbHost, port= dbPort, user= user, password= password, database= dbName, charset= 'utf8' )
cursor = con. cursor( )
head = [ "Id" , "Url" ]
t0, t1, name = '' , '' , ''
sql_select = "SELECT id, Url " \"FROM xxx " \"WHERE createTime >= ('{}') and createTime <= ('{}') and name = ('{}')" . format ( t0, t1, name)
cursor. execute( sql_select)
cds = cursor. fetchall( )
df = pd. DataFrame( cds)
cursor. close( )
con. close( )
日志功能
import logging
import osif os. path. exists( 'log_retry.log' ) : os. remove( 'log_retry.log' ) logging. basicConfig( level= logging. INFO, format = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s' , datefmt= '%a, %d %b %Y %H:%M:%S' , filename= 'log_retry.log' , filemode= 'w' )
count = 0
try : logging. info( '############# TOTAL number ############:{}, ' . format ( count) )
except : logging. error( )
kafka传输
from kafka import KafkaProducer
import json
def kfk_send ( msg) : kafka_topic = '' kafka_bootstrap_servers = [ '172.25.214.75:9092' , '172.25.214.76:9092' , '172.25.214.78:9092' ] producer = KafkaProducer( bootstrap_servers= kafka_bootstrap_servers, value_serializer= lambda v: json. dumps( v) . encode( 'utf-8' ) ) producer. send( kafka_topic, value= msg) producer. flush( ) head = [ ]
value = [ ]
ndata = dict ( zip ( head, value) )
kfk_send( ndata)
flask服务,异步执行(服务及时返回+耗时任务),使用线程池
from concurrent. futures import ThreadPoolExecutor
from flask import Flask, request
import json
from time import sleepexecutor = ThreadPoolExecutor( max_workers= 4 )
app = Flask( __name__) def task ( p1, p2) : print ( "Hello" ) @app. route ( "/" , methods= [ "POST" ] )
def main ( ) : request_dict = json. loads( request. data) p1 = request_dict[ "p1" ] p2 = request_dict[ "p2" ] executor. submit( task, p1, p2) sleep( 3 ) return "Get your POST!!!" if __name__ == '__main__' : app. run( )
python多进程
from multiprocessing import Processdef infer ( i, filelist) : print ( i, filelist) if __name__ == '__main__' : img_list = [ ] num_process = 5 num = int ( len ( img_list) / num_process) process_list = [ ] for i in range ( num_process) : filelist = img_list[ i * num: ( i + 1 ) * num] if i == num_process - 1 : filelist = img_list[ i * num: ] process_list. append( Process( target= infer, args= ( i, filelist) ) ) [ p. start( ) for p in process_list] [ p. join( ) for p in process_list]