Advertisement
mkv

Untitled

mkv
Sep 4th, 2012
77
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 10.34 KB | None | 0 0
  1. #!/usr/bin/python3
  2. import re
  3. import time
  4. import logging
  5. import html.parser
  6. import urllib.request
  7. import mysql.connector
  8. from urllib.error import HTTPError
  9. from multiprocessing.dummy import Pool
  10. from MultiDict import OrderedMultiDict
  11. from functools import reduce
  12.  
  13. config_mysql = {
  14.     'user'      : 'dry',
  15.     'password'  : 'yea',
  16.     #'password' : 'yea',
  17.     'host'      : '127.0.0.1',
  18.     'database'  : 'dry'
  19. }
  20.  
  21. def passed(s):
  22.     s = time.time() - s;
  23.     while (s > 1700):
  24.         s -= 3600
  25.     while (s < -1700):
  26.         s += 3600
  27.     return s
  28.  
  29. def main():
  30.     mc = mysql.connector.connect(**config_mysql)
  31.     lastpush = 0
  32.     while True:
  33.         while lastpush > 0 and passed(lastpush) < 30:
  34.             time.sleep(0.25)
  35.         lastpush = time.time()
  36.         while passed(lastpush) < 17:
  37.             if exec_update(mc):
  38.                 break
  39.             time.sleep(2)
  40.  
  41. def exec_update(mc):
  42.     ch = 1
  43.     idd = -1
  44.     idm = -1
  45.     oldd = ''
  46.     oldm = ''
  47.     dbg = ' #'
  48.     doit = False
  49.     st = get_status()
  50.     if not '/main.mp3' in st:
  51.         return False
  52.     if not 'Current Song' in st['/main.mp3']:
  53.         return False
  54.     n = AsyncListeners()
  55.     n.relay_list = [
  56.         ('http://stream1.r-a-d.io:1130/','/radio.ogg'),
  57.         ('http://stream2.r-a-d.io:1130/','/relay.mp3'),
  58.         ('http://stream3.r-a-d.io:1130/','/relay.mp3')
  59.     ]
  60.     relayclients = n.async_get_all_listener_count()
  61.     dj = st['/main.mp3']['Stream Title']
  62.     meta = fixmeta(st['/main.mp3']['Current Song'])
  63.     listeners = st['/main.mp3']['Current Listeners']
  64.     q = (
  65.         "select di, ti,                             "
  66.         "   timestampdiff(second, t2, now()) sec    "
  67.         "from np                                    "
  68.         "where ch = %s                              "
  69.         "order by id desc                           "
  70.         "limit 1                                    ")
  71.     with Cursor(mc) as cur:
  72.         cur.execute(q, (ch,))
  73.         for (di, ti, sec) in cur:
  74.             idd, idm = di, ti
  75.             doit = sec > 300 or sec < 50
  76.     doit = doit or idd < 0
  77.     #if not doit:
  78.     #   print("stacking_queue")
  79.     #   return True
  80.     if idd >= 0:
  81.         q = (
  82.             "select v from npd                      "
  83.             "where i = %s limit 1                   ")
  84.         with Cursor(mc) as cur:
  85.             cur.execute(q, (idd,))
  86.             for (v,) in cur:
  87.                 oldd = v
  88.     if idm >= 0:
  89.         q = (
  90.             "select v from npm                      "
  91.             "where i = %s limit 1                   ")
  92.         with Cursor(mc) as cur:
  93.             cur.execute(q, (idm,))
  94.             for (v,) in cur:
  95.                 oldm = v
  96.     if re.match('^[0-9]*$', listeners):
  97.         q = (
  98.             "insert into npl values                 "
  99.             "   ('', now(), %s, %s)                 ")
  100.         with Cursor(mc) as cur:
  101.             cur.execute(q, (listeners, relayclients))
  102.     else:
  103.         print("assert_listeners_numeric")
  104.         return False
  105.     if meta.lower() == oldm.lower():
  106.         q = (
  107.             "update np                              "
  108.             "set t2=now()                           "
  109.             "where ch = %s                          "
  110.             "order by id desc                       "
  111.             "limit 1                                ")
  112.         with Cursor(mc) as cur:
  113.             cur.execute(q, (ch,))
  114.         print("meta_unchanged/{}:{}".format
  115.             (listeners, relayclients))
  116.         return True
  117.     else:
  118.         print("-- ({})".format(oldm))
  119.         print("-- ({})".format(meta))
  120.  
  121.     idm = -1
  122.     q = (
  123.         "select i from npm                          "
  124.         "where v = %s limit 1                       ")
  125.     with Cursor(mc) as cur:
  126.         cur.execute(q, (meta,))
  127.         for (i,) in cur:
  128.             idm = i
  129.     if idm < 0:
  130.         q = (
  131.             "insert into npm values                 "
  132.             "   ('', %s)                            ")
  133.         with Cursor(mc) as cur:
  134.             cur.execute(q, (meta,))
  135.             idm = cur.lastrowid
  136.    
  137.     idd = -1
  138.     q = (
  139.         "select i from npd                          "
  140.         "where v = %s limit 1                       ")
  141.     with Cursor(mc) as cur:
  142.         cur.execute(q, (dj,))
  143.         for (i,) in cur:
  144.             idd = i
  145.     if idd < 0:
  146.         q = (
  147.             "insert into npd values                 "
  148.             "   ('', %s)                            ")
  149.         with Cursor(mc) as cur:
  150.             cur.execute(q, (dj,))
  151.             idd = cur.lastrowid
  152.    
  153.     if idm < 0:
  154.         print("assert_idm_gt_0")
  155.         return True
  156.     if idd < 0:
  157.         print("assert_idd_gt_0")
  158.         return True
  159.     q = (
  160.         "insert into np values                      "
  161.         "   ('', now(), now(), %s, %s, %s)          ")
  162.     penis = -1
  163.     with Cursor(mc) as cur:
  164.         cur.execute(q, (ch, idm, idd))
  165.         penis = cur.lastrowid
  166.     print("-- np_dj{}_meta{}_{}/{}:{}".format
  167.         (idd, idm, penis, listeners, relayclients))
  168.     return True
  169.  
  170.  
  171. class Cursor:
  172.     def __init__(self, connection):
  173.         self.connection = connection
  174.     def __enter__(self):
  175.         self.cursor = self.connection.cursor()
  176.         return self.cursor
  177.     def __exit__(self, type, value, traceback):
  178.         self.cursor.close()
  179.        
  180. def get_listener_count(
  181.       server = 'http://stream0.r-a-d.io:1130',
  182.        mount = "/main.mp3",
  183.      timeout = None
  184.     ):
  185.     try:
  186.         request = urllib.request.Request(server, headers = {'User-Agent' : 'Mozilla'})
  187.         result = urllib.request.urlopen(request, timeout = timeout)
  188.     except:
  189.         logging.debug('HTTP access fault: ' + server_name);
  190.         #raise
  191.     else:
  192.         incoming = result.read().decode('utf8').split('\n')
  193.         parser = StatusParser()
  194.         for line in incoming:
  195.             parser.feed(line)
  196.         parser.close()
  197.         result = parser.result
  198.         if mount in result:
  199.             if 'Current Listeners' in result[mount]:
  200.                 listeners = int(result[mount]['Current Listeners'])
  201.                 return listeners
  202.         else:
  203.             logging.debug('Server does not have listener count: ' + server_name)
  204.             return -1
  205.     logging.debug('Failed to fetch  ' + server_name)
  206.     return -1
  207.  
  208.  
  209. class AsyncListeners(object):
  210.     relay_list = []
  211.     timeout = 10.0
  212.     def __init__(self, threads = 5):
  213.         super(AsyncListeners, self).__init__()
  214.         self.pool = Pool(threads)
  215.     def async_get_all_listener_count(self, timeout = 10.0):
  216.         result_list = []
  217.         counts, timer = 0, 0.0
  218.         for relay, mount in self.relay_list:
  219.             result_list.append(self.pool.apply_async(
  220.                 get_listener_count,
  221.                 kwds = {
  222.                     'server'  : relay,
  223.                     'timeout' : timeout,
  224.                     'mount'   : mount
  225.                 }))
  226.             #print(result_list)
  227.         while True:
  228.             if len(result_list) == 0:
  229.                 break
  230.             for result in result_list[:]:
  231.                 if result.ready():
  232.                     if result.successful():
  233.                         counts += result.get()
  234.                         result_list.remove(result)
  235.                         #print("LOL " + result)
  236.                     else:
  237.                         print("oh shit")
  238.                 #else:
  239.                     # nothing yet
  240.                     #print(result)
  241.             timer += 0.5
  242.             if timer > self.timeout:
  243.                 break
  244.             time.sleep(0.5)
  245.         return counts
  246.  
  247.  
  248. def get_status(icecast_server = 'http://stream0.r-a-d.io:1130/'):
  249.     try:
  250.         request = urllib.request.Request(icecast_server,
  251.             headers = { 'User-Agent' : 'Mozilla' })
  252.         result = urllib.request.urlopen(request)
  253.     except HTTPError as e:
  254.         if e.code == 403: #full server
  255.             logging.warning("Listener limit reached? " + icecast_server)
  256.             f = OrderedMultiDict()
  257.             f['Stream Title'] = 'r/a/dio'
  258.             f['Current Listeners'] = '500'
  259.             f['Current Song'] = 'py'
  260.             return { '/main.mp3' : f }
  261.         else:
  262.             logging.exception("HTTPError " + e.code + " in status fetch")
  263.     except:
  264.         logging.exception("Connection to status page failed")
  265.     else:
  266.         incoming = result.read().decode('utf8').split('\n')
  267.         parser = StatusParser()
  268.         for line in incoming:
  269.             parser.feed(line)
  270.         parser.close()
  271.         result = parser.result
  272. #       if '/main.mp3' in result:
  273. #           all_listeners = get_all_listener_count()
  274. #           total_count = reduce(lambda x,y:
  275. #               x+y if x > 0 and y > 0 else x,
  276. #               list(all_listeners.values()))
  277. #           result[config.icecast_mount]['Current Listeners'] = str(total_count)
  278.         return parser.result or {}
  279.     return {}
  280.  
  281.  
  282. class StatusParser(html.parser.HTMLParser):
  283.     def __init__(self):
  284.         html.parser.HTMLParser.__init__(self)
  285.         self._current_mount = None
  286.         self.result = {}
  287.         self._td = False
  288.         self._mount = False
  289.         self._enter = False
  290.     def handle_starttag(self, tag, attrs):
  291.         attrs = OrderedMultiDict(attrs)
  292.         if (tag == "td"):
  293.             self._td = Tag(attrs)
  294.             self._td['class'] = None
  295.         elif (tag == "h3") and (self._td):
  296.             self._mount = Tag(attrs)
  297.     def handle_endtag(self, tag):
  298.         if (tag == "td"):
  299.             self._td = None
  300.         elif (tag == "h3") and (self._td):
  301.             self._mount = None
  302.         elif (tag == "table") and (self._current_mount):
  303.             if (self._enter):
  304.                 self._enter = False
  305.             else:
  306.                 self._enter = True
  307.     def handle_data(self, data):
  308.         if (self._mount) and (self._td):
  309.             self._current_mount = data.split(" ")[2]
  310.             self.result[self._current_mount] = OrderedMultiDict()
  311.         elif (self._enter) and (self._td) and (self._current_mount):
  312.             if ("streamdata" in self._td.getall("class")):
  313.                 self.result[self._current_mount][self._type] = data
  314.             else:
  315.                 self._type = data[:-1]
  316.  
  317.  
  318. class Tag(object):
  319.     attr = OrderedMultiDict()
  320.     def __init__(self, attrs):
  321.         self.attr = attrs
  322.     def __getattr__(self, name):
  323.         return getattr(self.attr, name)
  324.     def __setitem__(self, name, value):
  325.         self.attr[name] = value
  326.  
  327. def fixmeta(meta):
  328.     meta = meta.encode('iso_8859-1')
  329.     try:
  330.         try:
  331.             meta = meta.decode('utf-8', 'strict')
  332.         except (UnicodeDecodeError):
  333.             meta = meta.decode('sjis', 'xmlcharrefreplace')
  334.     except (TypeError):
  335.         meta = meta.strip()
  336.     return meta
  337.  
  338. if __name__ == '__main__':
  339.     main()
  340.    
  341. # http://dev.mysql.com/doc/connector-python/en/myconnpy_example_connecting.html
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement