Advertisement
Guest User

Untitled

a guest
May 6th, 2016
68
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.88 KB | None | 0 0
  1. from sqlalchemy import event
  2. from sqlalchemy import DDL
  3.  
  4. def mysql_cidr_overlap(engine, metadata):
  5. @event.listens_for(metadata, "after_create")
  6. def _create_mysql_proc(target, connection, **kw):
  7. if connection.engine.name != 'mysql':
  8. return
  9.  
  10. if connection.scalar(
  11. "SELECT ROUTINE_NAME FROM INFORMATION_SCHEMA.ROUTINES "
  12. "WHERE ROUTINE_TYPE='FUNCTION' AND ROUTINE_SCHEMA=DATABASE() AND "
  13. "ROUTINE_NAME=%s",
  14. ("cidr_overlap", )
  15. ):
  16. connection.execute("DROP FUNCTION cidr_overlap")
  17.  
  18. connection.execute(
  19. DDL("""
  20. CREATE FUNCTION cidr_overlap (cidr1 VARCHAR(30), cidr2 VARCHAR(30))
  21. RETURNS TINYINT
  22. BEGIN
  23. DECLARE bitmask INT;
  24. -- note - Mike is semi-guessing on the math here, needs tests! don't stick
  25. -- into production pls :)
  26. SET bitmask = pow(
  27. 2,
  28. (32 - least(
  29. cast(substring_index(cidr1, '/', -1) as integer),
  30. cast(substring_index(cidr2, '/', -1) as integer)
  31. ))
  32. ) - 1;
  33.  
  34. RETURN
  35. inet_aton(substring_index(cidr1, '/', 1)) & ~bitmask =
  36. inet_aton(substring_index(cidr2, '/', 1)) & ~bitmask;
  37. END
  38. """)
  39. )
  40.  
  41.  
  42.  
  43.  
  44. if __name__ == '__main__':
  45. from sqlalchemy import Column, Integer, String, create_engine, func
  46. from sqlalchemy.ext.declarative import declarative_base
  47. from sqlalchemy.orm import Session, aliased
  48. from sqlalchemy import event
  49.  
  50. Base = declarative_base()
  51.  
  52. class A(Base):
  53. __tablename__ = 'a'
  54. id = Column(Integer, primary_key=True)
  55. subnet = Column(String(30))
  56.  
  57. event.listen(
  58. A.__table__, "after_create",
  59. DDL("""
  60. CREATE TRIGGER no_overlap_cidr_a
  61. BEFORE INSERT ON a
  62. FOR EACH ROW
  63. BEGIN
  64. DECLARE msg VARCHAR(200);
  65. IF (EXISTS(SELECT * FROM a WHERE cidr_overlap(subnet, NEW.subnet))) THEN
  66. SET msg = CONCAT(
  67. 'inserted subnet ', NEW.subnet,
  68. ' conflicts with existing subnets');
  69. SIGNAL sqlstate '45000'
  70. SET MESSAGE_TEXT = msg;
  71. END IF;
  72. END
  73. """)
  74. )
  75.  
  76. e = create_engine("mysql://scott:tiger@localhost/test", echo=True)
  77.  
  78. mysql_cidr_overlap(e, Base.metadata)
  79.  
  80. Base.metadata.drop_all(e)
  81. Base.metadata.create_all(e)
  82.  
  83. s = Session(e)
  84.  
  85. with s.begin_nested():
  86. s.add(A(subnet='192.168.1.0/24'))
  87.  
  88. with s.begin_nested():
  89. s.add(A(subnet='192.168.2.0/24'))
  90.  
  91. try:
  92. with s.begin_nested():
  93. s.add(A(subnet='192.168.2.0/25'))
  94. except Exception as e:
  95. print "Error! %s" % e
  96.  
  97. s.commit()
  98.  
  99. a1, a2 = aliased(A), aliased(A)
  100.  
  101. # return all non-overlapping CIDR pairs
  102. for a, b in s.query(a1.subnet, a2.subnet).\
  103. filter(~func.cidr_overlap(a1.subnet, a2.subnet)).\
  104. filter(a1.id > a2.id):
  105. print a, b
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement