代码
1 #coding:utf-8
2
3 #Python的线程池实现
4
5 import Queue
6 import threading
7 import sys
8 import time
9 import urllib
10
11 #替我们工作的线程池中的线程
12 class MyThread(threading.Thread):
13 def __init__(self, workQueue, resultQueue,timeout=30, **kwargs):
14 threading.Thread.__init__(self, kwargs=kwargs)
15 #线程在结束前等待任务队列多长时间
16 self.timeout = timeout
17 self.setDaemon(True)
18 self.workQueue = workQueue
19 self.resultQueue = resultQueue
20 self.start()
21
22 def run(self):
23 while True:
24 try:
25 #从工作队列中获取一个任务
26 callable, args, kwargs = self.workQueue.get(timeout=self.timeout)
27 #我们要执行的任务
28 res = callable(args, kwargs)
29 #报任务返回的结果放在结果队列中
30 self.resultQueue.put(res+" | "+self.getName())
31 except Queue.Empty: #任务队列空的时候结束此线程
32 break
33 except :
34 print sys.exc_info()
35 raise
36
37 class ThreadPool:
38 def __init__( self, num_of_threads=10):
39 self.workQueue = Queue.Queue()
40 self.resultQueue = Queue.Queue()
41 self.threads = []
42 self.__createThreadPool( num_of_threads )
43
44 def __createThreadPool( self, num_of_threads ):
45 for i in range( num_of_threads ):
46 thread = MyThread( self.workQueue, self.resultQueue )
47 self.threads.append(thread)
48
49 def wait_for_complete(self):
50 #等待所有线程完成。
51 while len(self.threads):
52 thread = self.threads.pop()
53 #等待线程结束
54 if thread.isAlive():#判断线程是否还存活来决定是否调用join
55 thread.join()
56
57 def add_job( self, callable, *args, **kwargs ):
58 self.workQueue.put( (callable,args,kwargs) )
59
60 def test_job(id, sleep = 0.001 ):
61 html = ""
62 try:
63 time.sleep(1)
64 conn = urllib.urlopen('http://www.google.com/')
65 html = conn.read(20)
66 except:
67 print sys.exc_info()
68 return html
69
70 def test():
71 print 'start testing'
72 tp = ThreadPool(10)
73 for i in range(50):
74 time.sleep(0.2)
75 tp.add_job( test_job, i, i*0.001 )
76 tp.wait_for_complete()
77 #处理结果
78 print 'result Queue\'s length == %d '% tp.resultQueue.qsize()
79 while tp.resultQueue.qsize():
80 print tp.resultQueue.get()
81 print 'end testing'
82 if __name__ == '__main__':
83 test()
2
3 #Python的线程池实现
4
5 import Queue
6 import threading
7 import sys
8 import time
9 import urllib
10
11 #替我们工作的线程池中的线程
12 class MyThread(threading.Thread):
13 def __init__(self, workQueue, resultQueue,timeout=30, **kwargs):
14 threading.Thread.__init__(self, kwargs=kwargs)
15 #线程在结束前等待任务队列多长时间
16 self.timeout = timeout
17 self.setDaemon(True)
18 self.workQueue = workQueue
19 self.resultQueue = resultQueue
20 self.start()
21
22 def run(self):
23 while True:
24 try:
25 #从工作队列中获取一个任务
26 callable, args, kwargs = self.workQueue.get(timeout=self.timeout)
27 #我们要执行的任务
28 res = callable(args, kwargs)
29 #报任务返回的结果放在结果队列中
30 self.resultQueue.put(res+" | "+self.getName())
31 except Queue.Empty: #任务队列空的时候结束此线程
32 break
33 except :
34 print sys.exc_info()
35 raise
36
37 class ThreadPool:
38 def __init__( self, num_of_threads=10):
39 self.workQueue = Queue.Queue()
40 self.resultQueue = Queue.Queue()
41 self.threads = []
42 self.__createThreadPool( num_of_threads )
43
44 def __createThreadPool( self, num_of_threads ):
45 for i in range( num_of_threads ):
46 thread = MyThread( self.workQueue, self.resultQueue )
47 self.threads.append(thread)
48
49 def wait_for_complete(self):
50 #等待所有线程完成。
51 while len(self.threads):
52 thread = self.threads.pop()
53 #等待线程结束
54 if thread.isAlive():#判断线程是否还存活来决定是否调用join
55 thread.join()
56
57 def add_job( self, callable, *args, **kwargs ):
58 self.workQueue.put( (callable,args,kwargs) )
59
60 def test_job(id, sleep = 0.001 ):
61 html = ""
62 try:
63 time.sleep(1)
64 conn = urllib.urlopen('http://www.google.com/')
65 html = conn.read(20)
66 except:
67 print sys.exc_info()
68 return html
69
70 def test():
71 print 'start testing'
72 tp = ThreadPool(10)
73 for i in range(50):
74 time.sleep(0.2)
75 tp.add_job( test_job, i, i*0.001 )
76 tp.wait_for_complete()
77 #处理结果
78 print 'result Queue\'s length == %d '% tp.resultQueue.qsize()
79 while tp.resultQueue.qsize():
80 print tp.resultQueue.get()
81 print 'end testing'
82 if __name__ == '__main__':
83 test()