Created
January 9, 2023 13:13
-
-
Save icedraco/1242396fcff91b01162d5dd56d32ea95 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools | |
import os | |
# =============================================================================================== # | |
class LameDhcp(object): | |
"""A cooperative filesystem-based IP reservation mechanism | |
""" | |
#--- CONFIGURATION ---------------------------------------------------------------------------# | |
SUBNET24 = "192.168.255.0" # MUST be a /24 address | |
START_SUFFIX = 10 | |
END_SUFFIX = 250 | |
MAX_RETRIES = 5 | |
LEASE_SUFFIX = ".lease" | |
#--- ------ ----------------------------------------------------------------------------------# | |
def __init__(self, path, myAddr=None, createIfMissing=True): | |
""" | |
Args: | |
path (str): path to the directory where IP lease files will be stored | |
myAddr (str): IP address currently leased to us (leave default if unsure) | |
createIfMissing (bool): create lease directory if missing | |
""" | |
self.__path = path | |
self.__myAddr = myAddr | |
self.__blacklist = set() | |
# handle missing lease path if applicable | |
if not os.path.isdir(path): | |
if createIfMissing: | |
os.makedirs(path) | |
else: | |
raise FileNotFoundError("Lease directory at %r was not found" % path) | |
def hasAddress(self): | |
"""Check if this object had acquired an address | |
Returns: | |
bool: True if we are holding an IP address | |
""" | |
return self.getAddress() is not None | |
def getAddress(self): | |
"""Get our currently assigned IP address | |
Returns: | |
str|None: current IP address or None if none assigned | |
""" | |
return self.__myAddr | |
def blacklist(self, ipAddr): | |
"""Blacklist an IP address to avoid acquiring it | |
Args: | |
ipAddr (str): ip address to blacklist/avoid | |
""" | |
self.__blacklist.add(ipAddr) | |
def release(self): | |
"""Release a previously acquired IP address | |
Returns: | |
bool: False if release attempt failed due to an error | |
""" | |
if not self.hasAddress(): | |
return True # already released | |
rc = self.llReleaseAddress(self.__myAddr) | |
if rc < 0: | |
return False | |
self.__myAddr = None | |
return True | |
def acquire(self, note=""): | |
"""Acquire an available IP address in the pre-configured range | |
Args: | |
note (str): note to add to the lease file (e.g., "reserved by build-script-1248") | |
Returns: | |
bool: True if new IP address successfully acquired | |
""" | |
if not self.release(): | |
return False # error trying to release existing IP address | |
it = self.__getFreeIpIterator(maxLoops=self.MAX_RETRIES) | |
# keep polling the iterator until it stops, or until we reach MAX_RETRIES | |
for _, candidateIp in itertools.izip(xrange(self.MAX_RETRIES+1), it): | |
rc = self.llAcquireAddress(candidateIp, note) | |
if rc > 0: | |
self.__myAddr = candidateIp | |
return True # address secured | |
return False # failed to secure an address | |
#--- LOW-LEVEL ACCESS ------------------------------------------------------------------------# | |
def llReleaseAddress(self, ipAddr): | |
"""Release a specific IP address | |
Args: | |
ipAddr (str): ip address to release | |
Returns: | |
int: return code | |
1 - released | |
0 - lease not found (already released) | |
-1 - error releasing address | |
""" | |
leasePath = self.__getLeasePath(ipAddr) | |
try: | |
os.unlink(leasePath) | |
except OSError as ex: | |
if ex.errno == 2: # "No such file or directory" | |
return 0 # lease not found | |
else: | |
return -1 # error deleting lease file | |
except Exception: | |
return -1 # error deleting lease file | |
else: | |
return 1 # released | |
def llAcquireAddress(self, ipAddr, note=""): | |
"""Acquire a specific IP address | |
Args: | |
ipAddr (str): ip address to acquire | |
note (str): (optional) note to add to the lease file (e.g., "reserved by build-script-1248") | |
Returns: | |
int: return code | |
1 - address acquired | |
0 - address already taken | |
-1 - error acquiring address | |
""" | |
leasePath = self.__getLeasePath(ipAddr) | |
# create file | |
try: | |
fd = os.open(leasePath, os.O_WRONLY|os.O_CREAT|os.O_EXCL) | |
except OSError as ex: | |
if ex.errno == 17: # "File exists" | |
return 0 # already taken | |
else: | |
return -1 # error | |
# write note to file | |
if note: | |
try: | |
os.write(fd, note+"\n") | |
except Exception as ex: | |
os.close(fd) | |
os.unlink(leasePath) | |
return -1 # error writing note | |
# finalize | |
os.close(fd) | |
return 1 # success | |
#--- HELP METHODS ----------------------------------------------------------------------------# | |
@classmethod | |
def __getIp(cls, suffix): | |
assert 0 <= suffix <= 255, "ip suffix %r is out of range" % suffix | |
return "%s.%d" % (cls.SUBNET24.rsplit(".", 1)[0], suffix) | |
def __getFreeIpIterator(self, maxLoops=MAX_RETRIES): | |
for _ in xrange(maxLoops): | |
# create a lazy list of IP addresses from ip-range suffix numbers | |
candidateAddresses = (self.__getIp(suffix) for suffix in xrange(self.START_SUFFIX, self.END_SUFFIX+1)) | |
# now find those that don't have a lease file | |
for ipAddr in candidateAddresses: | |
blacklisted = ipAddr in self.__blacklist | |
if not blacklisted and not os.path.exists(self.__getLeasePath(ipAddr)): | |
yield ipAddr | |
def __getLeasePath(self, ipAddr): | |
""" | |
Example: | |
self.__getLeasePath("10.0.0.3") == "/tmp/lame-dhcp-leases/10.0.0.3.lease" | |
Args: | |
ipAddr (str): ip address (e.g., "192.168.0.1") | |
Returns: | |
str: full path to the lease file for the given address | |
""" | |
return os.path.join(self.__path, "%s%s" % (ipAddr, self.LEASE_SUFFIX)) | |
# =============================================================================================== # | |
if __name__ == "__main__": | |
dhcp = LameDhcp(path="/tmp/lamedhcp", createIfMissing=True) | |
if not dhcp.acquire(note="Test"): | |
raise Exception("Cannot acquire IP address") | |
print "Acquired %s!" % dhcp.getAddress() | |
import atexit | |
@atexit.register | |
def releaseVmIpAddr(): | |
addr = dhcp.getAddress() | |
if dhcp.release(): | |
print "Released VM IP address %s" % addr | |
else: | |
print "WARNING: Unable to release VM IP address %s" % addr | |
print "DONE" | |
raise SystemExit(0) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment