语言:python
库:
threading
queue
pybloom_live
import threading
import queue
import requests
from pybloom_live import BloomFilter
import one_friend_data # 获取单个用户信息# 创建布隆过滤器
bloom = BloomFilter(capacity=200000000, error_rate=0.000000001)class Get_User:def __init__(self):self.q = queue.Queue()self.max_thread = 20 # 最大线程数self.processed_users = set() # 已处理过的用户集合self.processed_lock = threading.Lock() # 线程锁self.base_get_user_friend_url = '' # 获取好友列表地址self.k = 1 # 用户计数def process_item(self, utk, depth): # 对元素的处理params = {'': '',}headers = {'': '',}print("请求: ", self.base_get_user_friend_url, utk)response = requests.get(self.base_get_user_friend_url, params=params, headers=headers, timeout=2)return response.json(), depth# 消费者 广度优先搜索def consumer(self):while True:try:utk, depth = self.q.get(timeout=5000) # 取出头部元素with self.processed_lock:if utk in bloom: # 使用锁保护对 bloom 的访问continuebloom.add(utk)if depth < self.max_depth:try:user_data = self.process_item(utk, depth)one_user_name, one_utk, one_user_area = one_friend_data.main(utk)print("用户名:", one_user_name, "utk:", one_utk, "用户地区:", one_user_area)except:self.q.put((utk, depth)) # 访问失败则重新入队continuefriends, depth_orgin = user_data[0].get('following', []), user_data[1]for friend in friends:friend_utk = friend.get('utk')if friend_utk:with self.processed_lock:if friend_utk in bloom: # 使用锁保护对 bloom 的访问continuebloom.add(utk)print("-------------------------------------------------------")print(f"第{depth_orgin + 1}代: 第{self.k}个: {friend_utk} ---- {friend['nickname']}")self.q.put((friend_utk, depth_orgin + 1))self.k = self.k + 1except queue.Empty:print("队列已空,结束所有线程")breakprint("消费者已完成")def run_threads(self):start_utk = 'cre6czj4ph'self.q.put((start_utk, 0))self.max_depth = 50 # 最大深度consumer_threads = [threading.Thread(target=self.consumer) for _ in range(self.max_thread)]for thread in consumer_threads:thread.start()for thread in consumer_threads:thread.join()print("所有线程已完成")if __name__ == "__main__":get_user = Get_User()get_user.run_threads()