Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from sqlalchemy import event
- from sqlalchemy import DDL
- def mysql_cidr_overlap(engine, metadata):
- @event.listens_for(metadata, "after_create")
- def _create_mysql_proc(target, connection, **kw):
- if connection.engine.name != 'mysql':
- return
- if connection.scalar(
- "SELECT ROUTINE_NAME FROM INFORMATION_SCHEMA.ROUTINES "
- "WHERE ROUTINE_TYPE='FUNCTION' AND ROUTINE_SCHEMA=DATABASE() AND "
- "ROUTINE_NAME=%s",
- ("cidr_overlap", )
- ):
- connection.execute("DROP FUNCTION cidr_overlap")
- connection.execute(
- DDL("""
- CREATE FUNCTION cidr_overlap (cidr1 VARCHAR(30), cidr2 VARCHAR(30))
- RETURNS TINYINT
- BEGIN
- DECLARE bitmask INT;
- -- note - Mike is semi-guessing on the math here, needs tests! don't stick
- -- into production pls :)
- SET bitmask = pow(
- 2,
- (32 - least(
- cast(substring_index(cidr1, '/', -1) as integer),
- cast(substring_index(cidr2, '/', -1) as integer)
- ))
- ) - 1;
- RETURN
- inet_aton(substring_index(cidr1, '/', 1)) & ~bitmask =
- inet_aton(substring_index(cidr2, '/', 1)) & ~bitmask;
- END
- """)
- )
- if __name__ == '__main__':
- from sqlalchemy import Column, Integer, String, create_engine, func
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy.orm import Session, aliased
- from sqlalchemy import event
- Base = declarative_base()
- class A(Base):
- __tablename__ = 'a'
- id = Column(Integer, primary_key=True)
- subnet = Column(String(30))
- event.listen(
- A.__table__, "after_create",
- DDL("""
- CREATE TRIGGER no_overlap_cidr_a
- BEFORE INSERT ON a
- FOR EACH ROW
- BEGIN
- DECLARE msg VARCHAR(200);
- IF (EXISTS(SELECT * FROM a WHERE cidr_overlap(subnet, NEW.subnet))) THEN
- SET msg = CONCAT(
- 'inserted subnet ', NEW.subnet,
- ' conflicts with existing subnets');
- SIGNAL sqlstate '45000'
- SET MESSAGE_TEXT = msg;
- END IF;
- END
- """)
- )
- e = create_engine("mysql://scott:tiger@localhost/test", echo=True)
- mysql_cidr_overlap(e, Base.metadata)
- Base.metadata.drop_all(e)
- Base.metadata.create_all(e)
- s = Session(e)
- with s.begin_nested():
- s.add(A(subnet='192.168.1.0/24'))
- with s.begin_nested():
- s.add(A(subnet='192.168.2.0/24'))
- try:
- with s.begin_nested():
- s.add(A(subnet='192.168.2.0/25'))
- except Exception as e:
- print "Error! %s" % e
- s.commit()
- a1, a2 = aliased(A), aliased(A)
- # return all non-overlapping CIDR pairs
- for a, b in s.query(a1.subnet, a2.subnet).\
- filter(~func.cidr_overlap(a1.subnet, a2.subnet)).\
- filter(a1.id > a2.id):
- print a, b
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement