Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from twisted.internet import reactor
- from twisted.web import resource, server
- from random import SystemRandom
- from string import ascii_letters, digits
- import sqlite3
- from time import time
- import cgi
- from datetime import datetime as dt
- db_name = "urls.db"
- tbl_name = "urls"
- log_name = "logs"
- URL_LEN = 10 # number of characters to generate URLs from (maximum 20 for CHARACTER type... change to
- """ For random string generation """
- rdev = SystemRandom()
- ranstr = lambda n: ''.join(rdev.choice(ascii_letters + digits) for _ in range(n))
- """ Turns text into and from bytes... aka it bites them """
- chomp = lambda s: s.encode() if isinstance(s, str) and not isinstance(s, bytes) else s # for python2/3 strings
- unchomp = lambda s: s.decode() if isinstance(s, bytes) and not isinstance(s, str) else s # for python2/3 strings
- to_time = lambda t: dt.fromtimestamp(float(t)).strftime("%d %b %Y, %H:%M:%S")
- base_url = lambda u: '/'.join(u.split("/")[:3])
- class IPLoggerUtils:
- def __init__(self):
- self.con = sqlite3.connect(db_name)
- self.cur = self.con.cursor()
- drop = True # If this is true, then every time you run the program, it will erase all of the logs/ID's
- if drop:
- sql1 = "DROP TABLE IF EXISTS {}".format(tbl_name)
- sql2 = "DROP TABLE IF EXISTS {}".format(log_name)
- self.exec_commit(sql1, sql2)
- sql1 = "CREATE TABLE IF NOT EXISTS {} (url_id CHARACTER, redirect_url TEXT)".format(tbl_name)
- sql2 = "CREATE TABLE IF NOT EXISTS {} (url_id CHARACTER, ip CHARACTER, stamp DATE)".format(log_name)
- self.exec_commit(sql1, sql2)
- def exec_commit(self, *commands):
- for sql in commands:
- self.cur.execute(sql)
- self.con.commit()
- def get_logs(self, url_id):
- url_id = unchomp(url_id)
- self.cur.execute("SELECT * FROM {} WHERE url_id=?".format(log_name), (url_id,))
- rows = self.cur.fetchall()
- return len(rows) > 0, rows
- def create_url(self, url):
- url = unchomp(url)
- r = unchomp(ranstr(URL_LEN))
- while self.shorturl_exists(r)[0]:
- r = unchomp(ranstr(URL_LEN))
- self.cur.execute("INSERT INTO {} VALUES (?, ?)".format(tbl_name), (r, url))
- self.con.commit()
- return r
- def shorturl_exists(self, url_id):
- url_id = unchomp(url_id)
- self.cur.execute("SELECT * FROM {} WHERE url_id=?".format(tbl_name), (url_id,))
- row = self.cur.fetchone()
- return row is not None, row
- def log_ip(self, url_id, ip):
- url_id = unchomp(url_id)
- self.cur.execute("INSERT INTO {} VALUES (?, ?, ?)".format(log_name), (url_id, ip, time()))
- self.con.commit()
- def all_logs(self):
- self.cur.execute("SELECT * FROM {}".format(log_name))
- return self.cur.fetchall()
- utils = IPLoggerUtils()
- class Redirect(resource.Resource):
- """Attempts to redirect with HTTP meta tags. If that fails, it tries JS. If that fails, it just returns a link"""
- redirect_html = """
- <!DOCTYPE html>
- <html lang="en">
- <head>
- <title>URL Redirect</title>
- <meta http-equiv="Refresh" content="0; URL={}">
- <script type="text/javascript">
- window.location.href = "{}"
- </script>
- </head>
- <body>
- <p>If you are not redirected, please click <a href="{}">here</a> to continue.</p>
- </body>
- </html>
- """
- def __init__(self, url):
- self.url = url
- super().__init__()
- def render_GET(self, request):
- return chomp(self.redirect_html.format(self.url, self.url, self.url))
- class IPLogger(resource.Resource):
- def getChild(self, path, request):
- in_db, row = utils.shorturl_exists(path)
- if in_db:
- peer = request.transport.getPeer()
- utils.log_ip(path, peer.host)
- return Redirect(row[1])
- return resource.NoResource(message="Sorry, that shortened URL does not exist.")
- def render_GET(self, request):
- return chomp("Hi")
- class Log(resource.Resource):
- def __init__(self, url_id):
- self.url_id = unchomp(url_id)
- def render_GET(self, request):
- in_db, rows = utils.get_logs(self.url_id)
- if in_db:
- ret = '\n'.join("{} - {} - {}".format(row[0], row[1], to_time(row[2])) for row in rows)
- return chomp(ret)
- return chomp("That ID has not been logged.")
- class IPLogViewer(resource.Resource):
- def getChild(self, path, request):
- return Log(path)
- def render_GET(self, request):
- return chomp("Look for an ID. /log/<your id>")
- class IPLogCreate(resource.Resource):
- def render_GET(self, reqest):
- return chomp("""
- <!DOCTYPE html>
- <html lang="en">
- <head>
- <meta charset="UTF-8">
- <title>URL IP Logger</title>
- </head>
- <body>
- <form action="" method="post">
- Redirect URL:<br>
- <input name="url" type="text" size="100%"><br>
- <input type="submit" value="Create URL" name="submit">
- </form>
- </body>
- </html>
- """)
- def render_POST(self, request):
- burl = base_url(str(request.URLPath()))
- url = unchomp(request.args.get(b"url", b"")[0])
- if url != "":
- url_id = utils.create_url(url)
- return chomp("Direct users to {}/{} - View logs at {}/log/{}".format(burl, url_id, burl, url_id))
- logger = IPLogger()
- logger.putChild(chomp("log"), IPLogViewer())
- logger.putChild(chomp("new"), IPLogCreate())
- logger.putChild(chomp(""), Redirect("new"))
- fac = server.Site(logger)
- reactor.listenTCP(8080, fac)
- reactor.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement