Advertisement
mkv

Untitled

mkv
Sep 6th, 2012
84
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 13.19 KB | None | 0 0
  1. #!/usr/bin/python3
  2. import re
  3. import time
  4. import logging
  5. import datetime
  6. import html.parser
  7. import urllib.request
  8. import mysql.connector
  9. from html.entities import name2codepoint
  10. from urllib.error import HTTPError
  11. from multiprocessing.dummy import Pool
  12. from MultiDict import OrderedMultiDict
  13. from functools import reduce
  14.  
  15. config_mysql = {
  16.     'user'      : 'ed',
  17.     'password'  : 'pokerkveld',
  18.     'host'      : '127.0.0.1',
  19.     'database'  : 'ed'
  20. }
  21.  
  22. def passed(s):
  23.     s = time.time() - s;
  24.     while (s > 1700):
  25.         s -= 3600
  26.     while (s < -1700):
  27.         s += 3600
  28.     return s
  29.  
  30. def tmain():
  31.     print(get_status('http://127.0.0.1:8000/')['/main.mp3']['Current Song'])
  32.  
  33. def main():
  34.     mc = mysql.connector.connect(**config_mysql)
  35.     lastpush = time.time() - 32
  36.     while True:
  37.         while passed(lastpush) < 30:
  38.             time.sleep(0.25)
  39.         lastpush = time.time() - (passed(lastpush)-30)
  40.         print(datetime.datetime.utcnow().strftime(
  41.             "%y/%m/%d %H:%M:%S.%f")[:-4], end=' ')
  42.         while passed(lastpush) < 17:
  43.             if exec_update(mc):
  44.                 break
  45.             time.sleep(2)
  46.  
  47. def exec_update(mc):
  48.     ch = 1
  49.     idd = -1
  50.     idm = -1
  51.     oldd = ''
  52.     oldm = ''
  53.     dbg = ' #'
  54.     doit = False
  55.     st = get_status()
  56.     if not '/main.mp3' in st:
  57.         return False
  58.     if not 'Current Song' in st['/main.mp3']:
  59.         return False
  60.     n = AsyncListeners()
  61.     n.relay_list = [
  62.         ('http://stream1.r-a-d.io:1130/','/radio.ogg'),
  63.         ('http://stream2.r-a-d.io:1130/','/relay.mp3'),
  64.         ('http://stream3.r-a-d.io:1130/','/relay.mp3')
  65.     ]
  66.     relayclients = n.async_get_all_listener_count()
  67.     dj = st['/main.mp3']['Stream Title']
  68.     meta = fixmeta(st['/main.mp3']['Current Song'])
  69.     listeners = st['/main.mp3']['Current Listeners']
  70.     q = (
  71.         "select di, ti,                             "
  72.         "   timestampdiff(second, t2, now()) sec    "
  73.         "from np                                    "
  74.         "where ch = %s                              "
  75.         "order by id desc                           "
  76.         "limit 1                                    ")
  77.     with Cursor(mc) as cur:
  78.         cur.execute(q, (ch,))
  79.         for (di, ti, sec) in cur:
  80.             idd, idm = di, ti
  81.             doit = sec > 300 or sec < 50
  82.     doit = doit or idd < 0
  83.     #if not doit:
  84.     #   print("stacking_queue")
  85.     #   return True
  86.     if idd >= 0:
  87.         q = (
  88.             "select v from npd                      "
  89.             "where i = %s limit 1                   ")
  90.         with Cursor(mc) as cur:
  91.             cur.execute(q, (idd,))
  92.             for (v,) in cur:
  93.                 oldd = v
  94.     if idm >= 0:
  95.         q = (
  96.             "select v from npm                      "
  97.             "where i = %s limit 1                   ")
  98.         with Cursor(mc) as cur:
  99.             cur.execute(q, (idm,))
  100.             for (v,) in cur:
  101.                 oldm = v
  102.     if re.match('^[0-9]*$', listeners):
  103.         q = (
  104.             "insert into npl values                 "
  105.             "   ('', now(), %s, %s)                 ")
  106.         with Cursor(mc) as cur:
  107.             cur.execute(q, (listeners, relayclients))
  108.     else:
  109.         print("assert_listeners_numeric")
  110.         return False
  111.     if meta.lower() == oldm.lower():
  112.         q = (
  113.             "update np                              "
  114.             "set t2=now()                           "
  115.             "where ch = %s                          "
  116.             "order by id desc                       "
  117.             "limit 1                                ")
  118.         with Cursor(mc) as cur:
  119.             cur.execute(q, (ch,))
  120.         print("meta_unchanged/{}:{}".format
  121.             (listeners, relayclients))
  122.         return True
  123.     else:
  124.         print()
  125.         print("-- ({})".format(oldm))
  126.         print("-- ({})".format(meta))
  127.  
  128.     idm = -1
  129.     q = (
  130.         "select i from npm                          "
  131.         "where v = %s limit 1                       ")
  132.     with Cursor(mc) as cur:
  133.         cur.execute(q, (meta,))
  134.         for (i,) in cur:
  135.             idm = i
  136.     if idm < 0:
  137.         q = (
  138.             "insert into npm values                 "
  139.             "   ('', %s)                            ")
  140.         with Cursor(mc) as cur:
  141.             cur.execute(q, (meta,))
  142.             idm = cur.lastrowid
  143.    
  144.     idd = -1
  145.     q = (
  146.         "select i from npd                          "
  147.         "where v = %s limit 1                       ")
  148.     with Cursor(mc) as cur:
  149.         cur.execute(q, (dj,))
  150.         for (i,) in cur:
  151.             idd = i
  152.     if idd < 0:
  153.         q = (
  154.             "insert into npd values                 "
  155.             "   ('', %s)                            ")
  156.         with Cursor(mc) as cur:
  157.             cur.execute(q, (dj,))
  158.             idd = cur.lastrowid
  159.    
  160.     if idm < 0:
  161.         print("assert_idm_gt_0")
  162.         return True
  163.     if idd < 0:
  164.         print("assert_idd_gt_0")
  165.         return True
  166.     q = (
  167.         "insert into np values                      "
  168.         "   ('', now(), now(), %s, %s, %s)          ")
  169.     penis = -1
  170.     with Cursor(mc) as cur:
  171.         cur.execute(q, (ch, idm, idd))
  172.         penis = cur.lastrowid
  173.     print("-- np_dj{}_meta{}_{}/{}:{}".format
  174.         (idd, idm, penis, listeners, relayclients))
  175.     return True
  176.  
  177.  
  178. class Cursor:
  179.     def __init__(self, connection):
  180.         self.connection = connection
  181.     def __enter__(self):
  182.         self.cursor = self.connection.cursor()
  183.         return self.cursor
  184.     def __exit__(self, type, value, traceback):
  185.         self.cursor.close()
  186.        
  187. def get_listener_count(
  188.       server = 'http://stream0.r-a-d.io:1130',
  189.        mount = "/main.mp3",
  190.      timeout = None
  191.     ):
  192.     try:
  193.         request = urllib.request.Request(server, headers = {'User-Agent' : 'Mozilla'})
  194.         result = urllib.request.urlopen(request, timeout = timeout)
  195.     except:
  196.         logging.debug('HTTP access fault: ' + server_name);
  197.         #raise
  198.     else:
  199.         incoming = result.read().decode('utf8').split('\n')
  200.         parser = StatusParser()
  201.         for line in incoming:
  202.             parser.feed(line)
  203.         parser.close()
  204.         result = parser.result
  205.         if mount in result:
  206.             if 'Current Listeners' in result[mount]:
  207.                 listeners = int(result[mount]['Current Listeners'])
  208.                 return listeners
  209.         else:
  210.             logging.debug('Server does not have listener count: ' + server_name)
  211.             return -1
  212.     logging.debug('Failed to fetch  ' + server_name)
  213.     return -1
  214.  
  215.  
  216. class AsyncListeners(object):
  217.     relay_list = []
  218.     timeout = 10.0
  219.     def __init__(self, threads = 5):
  220.         super(AsyncListeners, self).__init__()
  221.         self.pool = Pool(threads)
  222.     def async_get_all_listener_count(self, timeout = 10.0):
  223.         result_list = []
  224.         counts, timer = 0, 0.0
  225.         for relay, mount in self.relay_list:
  226.             result_list.append(self.pool.apply_async(
  227.                 get_listener_count,
  228.                 kwds = {
  229.                     'server'  : relay,
  230.                     'timeout' : timeout,
  231.                     'mount'   : mount
  232.                 }))
  233.             #print(result_list)
  234.         while True:
  235.             if len(result_list) == 0:
  236.                 break
  237.             for result in result_list[:]:
  238.                 if result.ready():
  239.                     if result.successful():
  240.                         counts += result.get()
  241.                         result_list.remove(result)
  242.                         #print("LOL " + result)
  243.                     else:
  244.                         print("oh shit")
  245.                 #else:
  246.                     # nothing yet
  247.                     #print(result)
  248.             timer += 0.5
  249.             if timer > self.timeout:
  250.                 break
  251.             time.sleep(0.5)
  252.         return counts
  253.  
  254.  
  255. def get_status(icecast_server = 'http://stream0.r-a-d.io:1130/'):
  256.     try:
  257.         request = urllib.request.Request(icecast_server,
  258.             headers = { 'User-Agent' : 'Mozilla' })
  259.         result = urllib.request.urlopen(request)
  260.     except HTTPError as e:
  261.         if e.code == 403: #full server
  262.             logging.warning("Listener limit reached? " + icecast_server)
  263.             f = OrderedMultiDict()
  264.             f['Stream Title'] = 'r/a/dio'
  265.             f['Current Listeners'] = '500'
  266.             f['Current Song'] = 'py'
  267.             return { '/main.mp3' : f }
  268.         else:
  269.             logging.exception("HTTPError " + e.code + " in status fetch")
  270.     except:
  271.         logging.exception("Connection to status page failed")
  272.     else:
  273.         incoming = result.read().decode('utf8').split('\n')
  274.         parser = StatusParser()
  275.         for line in incoming:
  276.             parser.feed(line)
  277.         parser.close()
  278.         result = parser.result
  279. #       if '/main.mp3' in result:
  280. #           all_listeners = get_all_listener_count()
  281. #           total_count = reduce(lambda x,y:
  282. #               x+y if x > 0 and y > 0 else x,
  283. #               list(all_listeners.values()))
  284. #           result[config.icecast_mount]['Current Listeners'] = str(total_count)
  285.         return parser.result or {}
  286.     return {}
  287.  
  288.  
  289. class StatusParser(html.parser.HTMLParser):
  290.     def __init__(self):
  291.         #html.parser.HTMLParser.__init__(self)
  292.         super(StatusParser, self).__init__()
  293.         self.result = {}
  294.         self._td = None         # in a TD? class or 'yes'
  295.         self._h3 = False        # in a H3?
  296.         self._key = None        # content of prev. TD with class
  297.         self._tab = False       # skip next </table> ?
  298.         self._mount = None      # mount name
  299.         self._value = None      # field value
  300.     def handle_starttag(self, tag, attrs):
  301.         #print('{}   ---   {}   ---   {}', self, tag, attrs)
  302.         if tag == 'td':
  303.             self._td = 'yes'
  304.             if len(attrs) > 0:
  305.                 if attrs[0][0] == 'class':
  306.                     self._td = attrs[0][1]
  307.                     if (self._td == 'streamdata'):
  308.                         self._value = ''
  309.         elif tag == 'h3' and self._td:
  310.             self._h3 = True
  311.             self._tab = True
  312.     def handle_endtag(self, tag):
  313.         if tag == 'td':
  314.             if self._value:
  315.                 self.result[self._mount][self._key[:-1]] = self._value
  316.             self._value = None
  317.             self._td = None
  318.         elif tag == 'h3':
  319.             self._h3 = False
  320.         elif tag == 'table':
  321.             if not self._tab:
  322.                 self._mount = None
  323.             self._tab = False
  324.     def handle_data(self, data):
  325.         if self._h3 and self._td:
  326.             self._mount = data.split(' ')[2]
  327.             self.result[self._mount] = OrderedMultiDict()
  328.         elif self._td and self._mount:
  329.             if self._td == 'streamdata':
  330.                 self._value += data
  331.             else:
  332.                 self._key = data
  333.     def handle_charref(self, name):
  334.         if self._value:
  335.             if name.startswith('x'):
  336.                 c = chr(int(name[1:], 16))
  337.             else:
  338.                 c = chr(int(name))
  339.             self._value += c
  340.     def handle_entityref(self, name):
  341.         if self._value:
  342.             self._value += chr(name2codepoint[name])
  343.            
  344. class OldStatusParser(html.parser.HTMLParser):
  345.     def __init__(self):
  346.         html.parser.HTMLParser.__init__(self)
  347.         self._current_mount = None
  348.         self.result = {}
  349.         self._td = False
  350.         self._mount = False
  351.         self._enter = False
  352.     def handle_starttag(self, tag, attrs):
  353.         attrs = OrderedMultiDict(attrs)
  354.         if (tag == "td"):
  355.             self._td = Tag(attrs)
  356.             self._td['class'] = None
  357.         elif (tag == "h3") and (self._td):
  358.             self._mount = Tag(attrs)
  359.     def handle_endtag(self, tag):
  360.         if (tag == "td"):
  361.             self._td = None
  362.         elif (tag == "h3") and (self._td):
  363.             self._mount = None
  364.         elif (tag == "table") and (self._current_mount):
  365.             if (self._enter):
  366.                 self._enter = False
  367.             else:
  368.                 self._enter = True
  369.     def handle_data(self, data):
  370.         if (self._mount) and (self._td):
  371.             self._current_mount = data.split(" ")[2]
  372.             self.result[self._current_mount] = OrderedMultiDict()
  373.         elif (self._enter) and (self._td) and (self._current_mount):
  374.             if ("streamdata" in self._td.getall("class")):
  375.                 self.result[self._current_mount][self._type] = data
  376.             else:
  377.                 self._type = data[:-1]
  378.  
  379.  
  380. class Tag(object):
  381.     attr = OrderedMultiDict()
  382.     def __init__(self, attrs):
  383.         self.attr = attrs
  384.     def __getattr__(self, name):
  385.         return getattr(self.attr, name)
  386.     def __setitem__(self, name, value):
  387.         self.attr[name] = value
  388.  
  389. def fixmeta(meta):
  390.     meta = meta.encode('iso_8859-1')
  391.     try:
  392.         try:
  393.             meta = meta.decode('utf-8', 'strict')
  394.         except (UnicodeDecodeError):
  395.             meta = meta.decode('sjis', 'xmlcharrefreplace')
  396.     except (TypeError):
  397.         meta = meta.strip()
  398.     return meta
  399.  
  400. if __name__ == '__main__':
  401.     main()
  402.    
  403. # http://dev.mysql.com/doc/connector-python/en/myconnpy_example_connecting.html
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement