最近手头有个需求是这样的,定期检查数据库获取失败任务并且进行重启。最早想到的是添加一个生产者&&消费者队列,但是发现很多棘手的问题。
1.重启任务是调用的一个shell脚本然后在脚本中又调用python程序,所以任务完成的状态回传略纠结。
2.重启任务有多种重启方式,要根据任务的不同FailStat来判断重启方式,这样的话队列中不仅要有任务名称,还需要状态码
3.Python里的原生Queue不会进行去重,可能会导致队列中对失败任务无论重启成功与否会进行多次重跑。
在StackOverflow上看了一些文章,都是推荐拓展Queue,看了看Queue.Queue的源代码,发现果然很适合拓展:
# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held
# Initialize the queue representation
def _init(self, maxsize):
self.queue = deque()
def _qsize(self, len=len):
return len(self.queue)
# Put a new item in the queue
def _put(self, item):
self.queue.append(item)
# Get an item from the queue
def _get(self):
return self.queue.popleft()
So,自己通过字典来实现了一个类似链表的类,然后继承Queue.Queue并重写方法,实现了一个新的有序不重复队列:
#!/usr/bin/env python
# Filename:ordered_map_queue.py
# -*- encoding:utf-8 -*-
import sys
import Queue
class Link():
''' No repeat link '''
def __init__(self):
self.map = {}
self.tail = "head"
self.map["head"] = {"stat":0, "next":"null"}
def __contains__(self,key):
return key in self.map
def __len__(self):
return len(self.map)-1
def isEmpty(self):
if self.getHead() == "null":
return True
else:
return False
def clearLink(self):
self.map.clear()
def getTail(self):
return self.tail
def getHead(self):
return self.map["head"]["next"]
def add(self, string):
# self.test_output("OrderedMapQueue")
args = string.split('\t')
item = args[0]
stat = args[1]
if item not in self.map:
self.map[item] = {"stat":stat, "next":"null"}
self.map[self.tail]["next"] = item
self.tail = item
def pop(self):
if not self.isEmpty():
head_task = self.map["head"]["next"]
rt_value = "%s\t%s" % (head_task, self.map[head_task]["stat"])
self.map["head"]["next"] = self.map[head_task]["next"]
del self.map[head_task]
if head_task == self.tail:
self.tail = "head"
return rt_value
return None
def test_output(self, name=""):
print >>sys.stderr, name
print >>sys.stderr, "-" * 10 + "TEST_OUTPUT" + "-" * 10
print >>sys.stderr, "Tail: %s\nHead: %s\nLength: %s" % (self.getTail(), self.getHead(), self.__len__())
head = "head"
while head != "null":
print >>sys.stderr, "%s\t%s\t%s" % (head, self.map[head]["stat"], self.map[head]["next"])
head = self.map[head]["next"]
print >>sys.stderr, "-" * 31
class OrderedMapQueue(Queue.Queue):
''' ordered-map queue '''
def _init(self, maxsize=0):
self.queue = Link()
def _put(self, item):
self.queue.add(item)
def _get(self):
return self.queue.pop()
def _qsize(self):
return self.queue.__len__()
if __name__ == "__main__":
#mylink = Link()
#mylink.add("task1","-1")
#mylink.add("task2","-2")
#mylink.add("task3","-1")
#mylink.test_output()
myqueue = OrderedMapQueue()
myqueue.put("task2\t-2")
myqueue.put("task3\t-1")
myqueue.put("task1\t-2")
myqueue.put("task3\t-1")
myqueue.put("task3\t-2")
myqueue.queue.test_output()
print myqueue.get()
myqueue.queue.test_output()
print myqueue.get()
myqueue.queue.test_output()
print myqueue.get()
myqueue.queue.test_output()
自己菜鸟一只,其中肯定有不少问题或者可改进的,希望大家能多多支出,我定会认真修改,多谢~
另外,推荐几个拓展Queue的连接: