Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import threading
- import time
- from netaddr import IPNetwork, IPAddress
- import os
- import struct
- from ctypes import *
- import socket
- #from decoding_icmp import * #we need IP and ICMP class, defined in this file
- #our ip header
- class IP(Structure):
- _fields_ = [
- ("ihl", c_ubyte, 4), #ihl is internet header length i.e size of ip header (offset to the data) in block of 32 bits (i.e. 1 means 32 bits)
- ("version", c_ubyte, 4),
- ('tos', c_ubyte),
- ('len', c_ushort),
- ('id', c_ushort),
- ('offset', c_ushort),
- ('ttl', c_ubyte),
- ('protocol_num', c_ubyte),
- ("sum", c_ushort),
- ("src", c_ulong),
- ("dst", c_ulong)
- ]
- def __new__(self, socket_buffer = None):
- return self.from_buffer_copy(socket_buffer)
- def __init__(self, socket_buffer = None):
- #map protocol constants to their names
- self.protocol_map = {1:'ICMP', 6:'TCP', 17:"UDP"} #user defined dictionary
- #human readable ip addresses from binary to string
- self.src_address = socket.inet_ntoa(struct.pack('<L', self.src)) #< means little endian and L means unsigned long
- self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst)) #pack converts given data to format like x0100x0000....
- #human readable protocol
- try:
- self.protocol = self.protocol_map[self.protocol_num]
- except:
- self.protocol = str(self.protocol_num)
- class ICMP(Structure):
- _fields_ = [
- ('type', c_ubyte),
- ('code', c_ubyte),
- ('checksum', c_ushort),
- ('unused', c_ushort),
- ('next_hop_mtu', c_ushort)
- ]
- def __new__(self, socket_buffer):
- return self.from_buffer_copy(socket_buffer)
- def __init__(self, socket_buffer):
- pass
- #host to listen on
- host = '192.168.0.106'
- #subnet to target
- subnet = '192.168.0.0/24'
- #magic string for which we'll check ICMP responses
- magic_message = "PYTHONRULES!"
- #this sprays out of the UDP datagrams
- def udp_sender(subnet, magic_message):
- time.sleep(5)
- sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- for ip in IPNetwork(subnet):
- sender.sendto(magic_message, (("%s" % ip), 65212))
- if os.name == 'nt':
- socket_protocol = socket.IPPROTO_IP
- else:
- socket_protocol = socket.IPPROTO_ICMP
- sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
- sniffer.bind((host, 0)) #0 for random port
- sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
- if os.name == 'nt':
- sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) #can sniff all packets only on windows
- t = threading.Thread(target = udp_sender, args = (subnet, magic_message))
- t.start()
- try:
- while True:
- #read in a packet
- raw_buffer = sniffer.recvfrom(65565)[0] #a pair(recvd_data, sender_addr) is returned. [0] access the first element of the pair
- #create an IP header from the first 20 bytes of the buffer i.e. 160 bits
- ip_header = IP(raw_buffer[0:20])
- #print "Protocol: %s %s -> %s" % (ip_header.protocol, ip_header.src_address, ip_header.dst_address)
- if ip_header.protocol == 'ICMP':
- #calculate where our icmp packet starts
- offset = ip_header.ihl * 4 #in ihl, 1 means 32 bits
- buf = raw_buffer[offset : (offset + sizeof(ICMP))]
- #create our ICMP structure
- icmp_header = ICMP(buf)
- #print "SOURCE: %s DESTINATION: %s ICMP -> Type: %d code: %d" % (ip_header.src_address, ip_header.dst_address, icmp_header.type, icmp_header.code)
- #check for type 3 code and type
- if icmp_header.code == 3 and icmp_header.type == 3:
- #make sure host is in our subnet
- if IPAddress(ip_header.src_address) in IPNetwork(subnet):
- #make sure it is our magic msg
- if raw_buffer[len(raw_buffer) - len(magic_message):] == magic_message:
- print "host up: %s" % ip_header.src_address
- #if ip_header.protocol == 'UDP':
- #print "UDP :::: SRC: %s and DST: %s" % (ip_header.src_address, ip_header.dst_address)
- except KeyboardInterrupt:
- #if using windows close promiscuous mode
- if os.name == 'nt':
- sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
Add Comment
Please, Sign In to add comment