Last active
December 20, 2023 19:38
-
-
Save KokoseiJ/6901d0de99f42c6686c0c6840eff05f3 to your computer and use it in GitHub Desktop.
Customizable and portable Snowflake generator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
from math import floor | |
class SnowflakeGenerator: | |
def __init__(self, mid=None, time_start=1288834974657): | |
self.time_start = time_start | |
self.mid = mid if mid else os.getpid() | |
self.seq = 0 | |
self.last_msec = 0 | |
def get(self): | |
now = floor(time.time() * 1000) | |
if now - self.time_start > (2**41 - 1): | |
raise ValueError("Time larger than 2^41! The end is near...") | |
if now < self.last_msec: | |
wait = (self.last_msec - now) / 1000 | |
time.sleep(wait) | |
now = floor(time.time() * 1000) | |
if self.seq == 4096: | |
if self.last_msec == now: | |
wait = (now + 1 - time.time() * 1000) / 1000 | |
time.sleep(wait) | |
now = floor(time.time() * 1000) | |
self.seq = 0 | |
snowflake = self._construct(now - self.time_start, self.mid, self.seq) | |
self.seq += 1 | |
self.last_msec = now | |
return snowflake | |
def parse(self, snowflake): | |
seq = snowflake & (2**12 - 1) | |
mid = (snowflake >> 12) & (2**10 - 1) | |
time_ms = (snowflake >> 22) & (2**41 - 1) | |
timestamp = (time_ms + self.time_start) / 1000 | |
return timestamp, mid, seq | |
def _construct(self, now, mid, seq): | |
snowflake = 0 | |
snowflake += now << 22 | |
snowflake += mid << 12 | |
snowflake += seq | |
return snowflake |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment