Python雪花id生成算法

2021-07-02 发表在 编程语言 1328
    # coding: utf-8
    import time
    
    
    class InvalidSystemClock(Exception):
        """
        时钟回拨异常
        """
        pass
    
    # 64位ID的划分
    WORKER_ID_BITS = 5
    DATACENTER_ID_BITS = 5
    SEQUENCE_BITS = 12
    
    # 最大取值计算
    MAX_WORKER_ID = -1 ^ (-1 << WORKER_ID_BITS)  # 2**5-1 0b11111
    MAX_DATACENTER_ID = -1 ^ (-1 << DATACENTER_ID_BITS)
    
    # 移位偏移计算
    WOKER_ID_SHIFT = SEQUENCE_BITS
    DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS
    TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS
    
    # 序号循环掩码
    SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS)
    
    # 开始时间截 (2015-01-01)
    TWEPOCH = 1420041600000
    
    
    class IdWorker(object):
        """
        用于生成IDs
        """
        def __init__(self, datacenter_id, worker_id, sequence=0):
            """
            初始化
            :param datacenter_id: 数据中心(机器区域)ID
            :param worker_id: 机器ID
            :param sequence: 其实序号
            """
            # sanity check
            if worker_id > MAX_WORKER_ID or worker_id < 0:
                raise ValueError('worker_id值越界')
    
            if datacenter_id > MAX_DATACENTER_ID or datacenter_id < 0:
                raise ValueError('datacenter_id值越界')
    
            self.worker_id = worker_id
            self.datacenter_id = datacenter_id
            self.sequence = sequence
    
            self.last_timestamp = -1  # 上次计算的时间戳
    
        def _gen_timestamp(self):
            """
            生成整数时间戳
            :return:int timestamp
            """
            return int(time.time() * 1000)
    
        def get_id(self):
            """
            获取新ID
            :return:
            """
            timestamp = self._gen_timestamp()
    
            # 时钟回拨
            if timestamp < self.last_timestamp:
                raise InvalidSystemClock
    
            if timestamp == self.last_timestamp:
                self.sequence = (self.sequence + 1) & SEQUENCE_MASK
                if self.sequence == 0:
                    timestamp = self._til_next_millis(self.last_timestamp)
            else:
                self.sequence = 0
    
            self.last_timestamp = timestamp
    
            new_id = ((timestamp - TWEPOCH) << TIMESTAMP_LEFT_SHIFT) | (self.datacenter_id << DATACENTER_ID_SHIFT) | \
                     (self.worker_id << WOKER_ID_SHIFT) | self.sequence
            return new_id
    
        def _til_next_millis(self, last_timestamp):
            """
            等到下一毫秒
            """
            timestamp = self._gen_timestamp()
            while timestamp <= last_timestamp:
                timestamp = self._gen_timestamp()
            return timestamp
    
    
    if __name__ == '__main__':
        worker = IdWorker(0, 0)
        for i in range(1,1000):
            print(worker.get_id())
    
    作者:Java笔记
    本站使用「署名 4.0 国际」创作共享协议,转载请在文章明显位置注明作者及出处。
    评论
    登录以后才可以发布评论哦, 点击登录 发布评论
    评论列表 3人参与,3条评论
    1111
    2021-09-17 15:27:14
    2222
    2021-07-06 20:59:22
    1111
    2021-07-06 20:59:19