Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python2
- #
- # (C) GNIF
- # http://graland-security.blogspot.com/
- # Consider it as GPLv2
- #
- from struct import pack, unpack
- from select import select
- from binascii import hexlify
- import socket
- import hmac, hashlib
- from Crypto.Cipher import AES
- # when receiving ESP, unprotect it and get payload with a given SA
- # and protect it back with the other SA and forward it
- #
- class ESP_forwarder(object):
- dbg = 1
- # local socket parameters
- local_ip = '192.168.20.5'
- local_port = 4500
- sk_timeout = 5
- sk_buflen = 4096
- # local socket address for ESP (tunnel) clear traffic duplication
- dup_ip = '127.10.10.10'
- # dummy GRE header for carrying IPv4 packet
- gre = '\0\0\x08\0'
- # cli / srv net address
- cli_addr = ('192.168.20.6', 4500)
- srv_addr = ('10.20.30.40', 4500)
- def __init__(self, SA_cli={}, SA_srv={}):
- # SA = {'SPIi':[Kenci, Kauti, SPIi], 'SPIr':[Kencr, Kautr, SPIr]}
- # SA_cli is the SA established with the client, which is the initiator
- # SA_srv is the SA established with the gateway, which is the responder
- self.SA_cli = SA_cli
- self.SA_srv = SA_srv
- # create the UDP socket on port 4500
- self.sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, \
- socket.IPPROTO_UDP)
- self.sk.settimeout(self.sk_timeout)
- self.sk.bind((self.local_ip, self.local_port))
- # running loop condition... bit useless!
- self.stop = False
- def run(self, duplicate_local=False):
- # run a select() loop
- # and transfer ESP incoming packet
- peers = (self.cli_addr, self.srv_addr)
- # for traffic duplication, send clear packet into dummy GRE header
- # to local interface with RAW socket - IPPROTO_GRE is 47
- if duplicate_local:
- self.gre_sk = socket.socket(socket.AF_INET, socket.SOCK_RAW, 47)
- self.gre_sk.bind((self.dup_ip, 0))
- # Loop on select
- while not self.stop:
- recv = select([self.sk], [], [], self.sk_timeout)[0]
- for r in recv:
- buf = ''
- (buf, addr) = r.recvfrom(self.sk_buflen)
- #print('from: %s' % addr)
- if buf and addr in peers:
- buf = self.transfer(buf, duplicate_local)
- if buf:
- dst = peers[(peers.index(addr)+1)%2]
- #print('to: %s' % dst)
- self.sk.sendto(buf, dst)
- def stop(self):
- self.stop = True
- def transfer(self, espbuf, duplicate_local=False):
- # function that gets a packet from a given SPI (cli or srv)
- # unprotect it accordingly
- # and protect it with the corresponding other side SPI (srv or cli)
- spi = espbuf[:4]
- # check if IKEv2 packet received
- if spi == '\0\0\0\0':
- if self.dbg:
- print('[+] Got IKEv2 packet... ignored')
- return ''
- # check if packet incoming from the client
- elif spi == self.SA_cli['SPIi'][2]:
- if self.dbg:
- print('[+] Got ESP packet from cli / initiator')
- # unprotect with the keys from the cli initiator
- buf, (seq, iv) = self.unprotect(espbuf, self.SA_cli['SPIi'][0])
- if duplicate_local:
- self.__duplicate(buf)
- # protect with the keys from the srv initiator
- return self.protect(buf, self.SA_srv['SPIi'], (seq, iv))
- # check if packet incoming from the gw
- elif spi == self.SA_srv['SPIr'][2]:
- if self.dbg:
- print('[+] Got ESP packet from srv / responder')
- # unprotect with the keys from the srv responder
- buf, (seq, iv) = self.unprotect(espbuf, self.SA_srv['SPIr'][0])
- if duplicate_local:
- self.__duplicate(buf)
- # protect with the keys from the cli responder
- return self.protect(buf, self.SA_cli['SPIr'], (seq, iv))
- else:
- if self.dbg:
- print('[+] Got ESP packet with unknown SPI')
- return ''
- def unprotect(self, buf, Kenc):
- # get ESP header (SPI, seq and cipher init) and trailer (MAC)
- spi = buf[0:4]
- seq = unpack('!I', buf[4:8])[0]
- # IV hardcoded for AES-CBC-128
- iv = buf[8:24]
- # MAC length hardcoded for HMAC-96
- mac = buf[-12:]
- # ciphered part
- buf = buf[24:-12]
- # decipher with Kenc, alg is AES-CBC-128
- alg = AES.new(key=Kenc, mode=2, IV=iv)
- buf = alg.decrypt(buf)
- # do not verify MAC...
- if self.dbg > 1:
- print('[DBG] raw unprotected ESP buffer:\n%s\n' % hexlify(buf))
- return buf, (seq, iv)
- def protect(self, buf, (Kenc, Kaut, spi), (seq, iv)):
- # make ESP header
- hdr = ''.join((spi, pack('!I', seq), iv))
- # cipher buffer with Kenc
- alg = AES.new(key=Kenc, mode=2, IV=iv)
- buf = alg.encrypt(buf)
- # make MAC with Kaut, alg is HMAC-SHA1-96
- buf = ''.join((hdr, buf))
- buf = ''.join((buf, hmac.new(Kaut, buf, hashlib.sha1).digest()[0:12]))
- # return the complete ESP payload
- return buf
- def __unpad_espbuf(self, buf):
- # padding (*pad length) + uint8 (pad length) + uint8 (next header)
- return buf[:-(unpack('!B', buf[-2:-1])[0]+2)]
- def __duplicate(self, buf):
- # send GRE frame
- self.gre_sk.sendto(''.join((self.gre, self.__unpad_espbuf(buf))), (self.dup_ip, 0))
- #
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement