import timeclass Snowflake:# 定义Snowflake算法的各个参数def __init__(self, worker_id: int, datacenter_id: int, sequence: int = 0):# 计算位数self._worker_id_bits = 5self._datacenter_id_bits = 5self._sequence_bits = 12# 定义位偏移量self._worker_id_shift = self._sequence_bitsself._datacenter_id_shift = self._sequence_bits + self._worker_id_bits# 计算最大IDself._max_worker_id = ~(-1 << self._worker_id_bits)self._max_datacenter_id = ~(-1 << self._datacenter_id_bits)self._max_sequence = ~(-1 << self._sequence_bits)# 初始化参数self.worker_id = worker_idself.datacenter_id = datacenter_idself.sequence = sequenceself.last_timestamp = -1# 生成下一个唯一IDdef generate_id(self):# 获取当前时间戳timestamp = int(time.time() * 1000)# 如果当前时间小于上次生成ID的时间戳,则抛出异常if timestamp < self.last_timestamp:raise ValueError("Invalid system clock!")# 如果当前时间戳与上次时间戳相同,则自增序列号if timestamp == self.last_timestamp:self.sequence = (self.sequence + 1) & self._max_sequence# 如果序列号等于0,则需要进入下一毫秒重新生成IDif self.sequence == 0:timestamp = self._wait_next_millis(self.last_timestamp)else:self.sequence = 0# 保存最后生成ID的时间戳self.last_timestamp = timestamp# 生成最终的唯一IDunique_id = ((timestamp << self._datacenter_id_shift) |(self.datacenter_id << self._worker_id_shift) |self.worker_id << self._sequence_bits |self.sequence)return unique_id# 阻塞到下一个毫秒,直到获得新的时间戳def _wait_next_millis(self, last_timestamp):timestamp = int(time.time() * 1000)while timestamp <= last_timestamp:timestamp = int(time.time() * 1000)return timestamp# 测试方法
if __name__ == '__main__':snowflake = Snowflake(worker_id=1, datacenter_id=1)for _ in range(10):unique_id = snowflake.generate_id()print(unique_id)
在上述代码中,我们创建了一个名为Snowflake
的类,该类用于生成Snowflake算法的唯一ID。它具有以下主要方法和属性:
__init__(self, worker_id: int, datacenter_id: int, sequence: int = 0)
:初始化Snowflake算法的参数,包括工作节点ID、数据中心ID和序列号的位数。generate_id(self)
:生成下一个唯一ID。_wait_next_millis(self, last_timestamp)
:阻塞到下一个毫秒,直到获得新的时间戳。
在测试方法中,我们创建了一个Snowflake
实例,并使用其generate_id()
方法生成了10个唯一ID,并打印出来。