Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- # Some code borrowed from http://partiallystapled.com/~gxti/tools/netlink.py
- from collections import namedtuple
- from contextlib import contextmanager
- import socket
- import struct
- # The value of the tentative flag
- IFA_F_TENTATIVE = 0x40
- # Netlink message types
- NLMSG_NOOP = 1
- NLMSG_ERROR = 2
- NLMSG_DONE = 3
- NLMSG_OVERRUN = 4
- # Netlink routing message types
- RTM_NEWADDR = 20
- RTM_GETADDR = 22
- # Flags values
- NLM_F_REQUEST = 1
- NLM_F_MULTI = 2
- NLM_F_ACK = 4
- NLM_F_ECHO = 8
- NLM_F_DUMP_INTR = 16
- # Modifiers to GET request
- NLM_F_ROOT = 0x100
- NLM_F_MATCH = 0x200
- NLM_F_ATOMIC = 0x400
- NLM_F_DUMP = (NLM_F_ROOT | NLM_F_MATCH)
- # Attributes
- IFA_ADDRESS = 1
- # Data structures
- STRUCT_NLMSGHDR = 'IHHII'
- STRUCT_IFADDRMSG = '4BI'
- STRUCT_RTATTR = 'HH'
- @contextmanager
- def socketcontext(*args, **kw):
- sock = socket.socket(*args, **kw)
- yield sock
- sock.close()
- def netlink_pack(msgtype, flags, seq, pid, data):
- '''
- Pack a single netlink message
- '''
- return struct.pack(STRUCT_NLMSGHDR, 16 + len(data),
- msgtype, flags, seq, pid) + data
- def netlink_unpack(data):
- '''
- Unpack a sequence of netlink packets.
- '''
- out = []
- while data:
- length, msgtype, flags, seq, pid = struct.unpack(STRUCT_NLMSGHDR, data[:16])
- assert len(data) >= length
- out.append((msgtype, flags, seq, pid, data[16:length]))
- data = data[length:]
- return out
- def rtattr_unpack(data):
- """Unpack a sequence of netlink attributes."""
- size = struct.calcsize(STRUCT_RTATTR)
- attrs = {}
- while data:
- rta_len, rta_type = struct.unpack(STRUCT_RTATTR, data[:size])
- assert len(data) >= rta_len
- rta_data = data[size:rta_len]
- padded = ((rta_len + 3) / 4) * 4
- attrs[rta_type] = rta_data
- data = data[padded:]
- return attrs
- def get_ipv6_address_flags():
- '''
- The main function to get the flags for all IPv6 addresses
- '''
- with socketcontext(socket.AF_NETLINK, socket.SOCK_RAW, socket.NETLINK_ROUTE) as sock:
- seq = 1
- req_data = struct.pack('Bxxx', socket.AF_INET6)
- msg = netlink_pack(RTM_GETADDR, NLM_F_REQUEST | NLM_F_DUMP, seq, 0, req_data)
- sock.send(msg)
- addr_flags = {}
- while True:
- msgs = sock.recv(262144)
- for msgtype, flags, mseq, pid, data in netlink_unpack(msgs):
- # Check sequence number
- if mseq != seq:
- raise RuntimeError("Netlink message sequence mismatch")
- # Handle different message types
- if msgtype == NLMSG_DONE:
- # The end
- return addr_flags
- elif msgtype in (NLMSG_ERROR, NLMSG_OVERRUN):
- # Bad stuff happened
- raise RuntimeError("Netlink error")
- elif msgtype == RTM_NEWADDR:
- # Decode the address info
- size = struct.calcsize(STRUCT_IFADDRMSG)
- family, prefixlen, flags, scope, index = struct.unpack(STRUCT_IFADDRMSG, data[:size])
- attrs = rtattr_unpack(data[size:])
- # Get the address from the attributes
- addr = attrs.get(IFA_ADDRESS)
- # Store the flags
- addr_flags[addr] = flags
- else:
- # Ignore messages we don't understand
- pass
- def ipv6_address(addr_str):
- '''
- Simple wrapper around socket.inet_pton() so that we can use it in argparse
- '''
- try:
- return socket.inet_pton(socket.AF_INET6, addr_str)
- except socket.error:
- raise ValueError("Invalid IPv6 address: {}".format(addr_str))
- def main(argv):
- '''
- Command line interface to wait for DAD to complete on IPv6 addresses
- '''
- import argparse
- import time
- # Parse arguments
- parser = argparse.ArgumentParser(description="Wait for DAD to complete on IPv6 addresses")
- parser.add_argument('-d', '--debug', action='store_true', help="Show debugging output")
- parser.add_argument('-t', '--timeout', metavar='SEC', type=int, default=10, help="Timeout in case addresses don't leave tentative state")
- parser.add_argument('addresses', metavar='addr', nargs='+', type=ipv6_address, help="IPv6 address to wait for")
- options = parser.parse_args(argv)
- # Show what we are going to do
- if options.debug:
- print("Waiting for the following IPv6 addresses:")
- for addr in options.addresses:
- addr_str = socket.inet_ntop(socket.AF_INET6, addr)
- print("- {}".format(addr_str))
- print("Using a timeout of {} seconds".format(options.timeout))
- # Main loop
- still_waiting = True
- deadline = time.time() + options.timeout
- while still_waiting and time.time() < deadline:
- # Be optimistic: assume we are done waiting
- # We'll change this when we encounter an address in tentative state
- still_waiting = False
- # Get the flags of all IPv6 addresses
- try:
- addr_flags = get_ipv6_address_flags()
- except:
- print("Unable to get address information, aborting!")
- return False
- # Show what we found
- if options.debug:
- print("We found the following addresses:")
- for addr in addr_flags:
- addr_str = socket.inet_ntop(socket.AF_INET6, addr)
- print("- {}: Flags {:#x}".format(addr_str, addr_flags[addr]))
- if not addr_flags:
- print("- None found")
- # Check the flags
- for addr in options.addresses:
- # Check if the given addresses even exist
- if addr not in addr_flags:
- addr_str = socket.inet_ntop(socket.AF_INET6, addr)
- print("Address {} is not present on this system".format(addr_str))
- return False
- # Check the tentative flag
- if addr_flags[addr] & IFA_F_TENTATIVE:
- # Still tentative, we keep waiting
- if options.debug:
- addr_str = socket.inet_ntop(socket.AF_INET6, addr)
- print("Address {} is still tentative, keep waiting".format(addr_str))
- still_waiting = True
- elif options.debug:
- addr_str = socket.inet_ntop(socket.AF_INET6, addr)
- print("Address {} is not tentative, ok!".format(addr_str))
- # Sleep a bit
- time.sleep(0.1)
- # Check state
- if still_waiting:
- print("One or more addresses are still tentative after {} seconds, aborting".format(options.timeout))
- return False
- # Done
- return True
- if __name__ == '__main__':
- import sys
- ok = main(sys.argv[1:])
- if not ok:
- sys.exit(1)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement