Skip to content

Instantly share code, notes, and snippets.

@namnh68
Last active June 8, 2016 08:29
Show Gist options
  • Save namnh68/858e0ea8231e7dc2f7e7b616ff678dce to your computer and use it in GitHub Desktop.
Save namnh68/858e0ea8231e7dc2f7e7b616ff678dce to your computer and use it in GitHub Desktop.
from sqlalchemy import event
from sqlalchemy import DDL
def mysql_cidr_overlap(engine, metadata):
@event.listens_for(metadata, "after_create")
def _create_mysql_proc(target, connection, **kw):
if connection.engine.name != 'mysql':
return
if connection.scalar(
"SELECT ROUTINE_NAME FROM INFORMATION_SCHEMA.ROUTINES "
"WHERE ROUTINE_TYPE='FUNCTION' AND ROUTINE_SCHEMA=DATABASE() AND "
"ROUTINE_NAME=%s",
("cidr_overlap", )
):
connection.execute("DROP FUNCTION cidr_overlap")
connection.execute(
DDL("""
CREATE FUNCTION cidr_overlap (cidr1 varchar(64), cidr2 varchar(64))
RETURNS TINYINT
BEGIN
DECLARE ipv6_1, ipv6_2, bitmask INT;
DECLARE division, modulo, netmask SMALLINT;
DECLARE tmp TINYINT DEFAULT 1;
SET netmask = least(
cast(substring_index(cidr1, '/', -1) as signed integer),
cast(substring_index(cidr2, '/', -1) as signed integer));
SET division = netmask DIV 16;
SET modulo = netmask % 16;
masking : WHILE (division >= 0) DO
SET ipv6_1 = CONV(substring_index(substring_index
(cidr1, ':', tmp), ':', -1), 16, 10);
SET ipv6_2 = CONV(substring_index(substring_index
(cidr2, ':', tmp), ':', -1), 16, 10);
IF (division != 0) THEN
IF (ipv6_1 != ipv6_2) THEN
RETURN 0;
LEAVE masking;
END IF;
ELSE
SET bitmask = pow(2, (16 - modulo)) - 1;
RETURN ipv6_1 & ~bitmask = ipv6_2 & ~bitmask;
END IF;
SET tmp = tmp + 1;
SET division = division - 1;
END WHILE masking;
END
""")
)
if __name__ == '__main__':
from sqlalchemy import Column, Integer, String, create_engine, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session, aliased
from sqlalchemy import event
Base = declarative_base()
class A(Base):
__tablename__ = 'a'
id = Column(Integer, primary_key=True)
cidr = Column(String(30))
event.listen(
A.__table__, "after_create",
DDL("""
CREATE TRIGGER no_overlap_cidr_a
BEFORE INSERT ON a
FOR EACH ROW
BEGIN
DECLARE msg VARCHAR(200);
IF (EXISTS(SELECT * FROM a WHERE cidr_overlap(cidr, NEW.cidr))) THEN
SET msg = CONCAT(
'inserted subnet ', NEW.subnet,
' conflicts with existing subnets');
SIGNAL sqlstate '45000'
SET MESSAGE_TEXT = msg;
END IF;
END
""")
)
e = create_engine("mysql://root:abc123@localhost/test", echo=True)
mysql_cidr_overlap(e, Base.metadata)
Base.metadata.drop_all(e)
Base.metadata.create_all(e)
s = Session(e)
with s.begin_nested():
s.add(A(cidr='fd04:901d:e7de:abcd::/64'))
with s.begin_nested():
s.add(A(cidr='fd04:901d:e7de:abce::/64'))
try:
with s.begin_nested():
s.add(A(cidr='fd04:901d:e7de:abcd::/65'))
except Exception as e:
print "Error! %s" % e
s.commit()
a1, a2 = aliased(A), aliased(A)
# return all non-overlapping CIDR pairs
for a, b in s.query(a1.subnet, a2.subnet).\
filter(~func.cidr_overlap(a1.subnet, a2.subnet)).\
filter(a1.id > a2.id):
print a, b
sfsfsfsfs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment