Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3
- import re
- import time
- import logging
- import datetime
- import html.parser
- import urllib.request
- import mysql.connector
- from html.entities import name2codepoint
- from urllib.error import HTTPError
- from multiprocessing.dummy import Pool
- from MultiDict import OrderedMultiDict
- from functools import reduce
- config_mysql = {
- 'user' : 'ed',
- 'password' : 'pokerkveld',
- 'host' : '127.0.0.1',
- 'database' : 'ed'
- }
- def passed(s):
- s = time.time() - s;
- while (s > 1700):
- s -= 3600
- while (s < -1700):
- s += 3600
- return s
- def tmain():
- print(get_status('http://127.0.0.1:8000/')['/main.mp3']['Current Song'])
- def main():
- mc = mysql.connector.connect(**config_mysql)
- lastpush = time.time() - 32
- while True:
- while passed(lastpush) < 30:
- time.sleep(0.25)
- lastpush = time.time() - (passed(lastpush)-30)
- print(datetime.datetime.utcnow().strftime(
- "%y/%m/%d %H:%M:%S.%f")[:-4], end=' ')
- while passed(lastpush) < 17:
- if exec_update(mc):
- break
- time.sleep(2)
- def exec_update(mc):
- ch = 1
- idd = -1
- idm = -1
- oldd = ''
- oldm = ''
- dbg = ' #'
- doit = False
- st = get_status()
- if not '/main.mp3' in st:
- return False
- if not 'Current Song' in st['/main.mp3']:
- return False
- n = AsyncListeners()
- n.relay_list = [
- ('http://stream1.r-a-d.io:1130/','/radio.ogg'),
- ('http://stream2.r-a-d.io:1130/','/relay.mp3'),
- ('http://stream3.r-a-d.io:1130/','/relay.mp3')
- ]
- relayclients = n.async_get_all_listener_count()
- dj = st['/main.mp3']['Stream Title']
- meta = fixmeta(st['/main.mp3']['Current Song'])
- listeners = st['/main.mp3']['Current Listeners']
- q = (
- "select di, ti, "
- " timestampdiff(second, t2, now()) sec "
- "from np "
- "where ch = %s "
- "order by id desc "
- "limit 1 ")
- with Cursor(mc) as cur:
- cur.execute(q, (ch,))
- for (di, ti, sec) in cur:
- idd, idm = di, ti
- doit = sec > 300 or sec < 50
- doit = doit or idd < 0
- #if not doit:
- # print("stacking_queue")
- # return True
- if idd >= 0:
- q = (
- "select v from npd "
- "where i = %s limit 1 ")
- with Cursor(mc) as cur:
- cur.execute(q, (idd,))
- for (v,) in cur:
- oldd = v
- if idm >= 0:
- q = (
- "select v from npm "
- "where i = %s limit 1 ")
- with Cursor(mc) as cur:
- cur.execute(q, (idm,))
- for (v,) in cur:
- oldm = v
- if re.match('^[0-9]*$', listeners):
- q = (
- "insert into npl values "
- " ('', now(), %s, %s) ")
- with Cursor(mc) as cur:
- cur.execute(q, (listeners, relayclients))
- else:
- print("assert_listeners_numeric")
- return False
- if meta.lower() == oldm.lower():
- q = (
- "update np "
- "set t2=now() "
- "where ch = %s "
- "order by id desc "
- "limit 1 ")
- with Cursor(mc) as cur:
- cur.execute(q, (ch,))
- print("meta_unchanged/{}:{}".format
- (listeners, relayclients))
- return True
- else:
- print()
- print("-- ({})".format(oldm))
- print("-- ({})".format(meta))
- idm = -1
- q = (
- "select i from npm "
- "where v = %s limit 1 ")
- with Cursor(mc) as cur:
- cur.execute(q, (meta,))
- for (i,) in cur:
- idm = i
- if idm < 0:
- q = (
- "insert into npm values "
- " ('', %s) ")
- with Cursor(mc) as cur:
- cur.execute(q, (meta,))
- idm = cur.lastrowid
- idd = -1
- q = (
- "select i from npd "
- "where v = %s limit 1 ")
- with Cursor(mc) as cur:
- cur.execute(q, (dj,))
- for (i,) in cur:
- idd = i
- if idd < 0:
- q = (
- "insert into npd values "
- " ('', %s) ")
- with Cursor(mc) as cur:
- cur.execute(q, (dj,))
- idd = cur.lastrowid
- if idm < 0:
- print("assert_idm_gt_0")
- return True
- if idd < 0:
- print("assert_idd_gt_0")
- return True
- q = (
- "insert into np values "
- " ('', now(), now(), %s, %s, %s) ")
- penis = -1
- with Cursor(mc) as cur:
- cur.execute(q, (ch, idm, idd))
- penis = cur.lastrowid
- print("-- np_dj{}_meta{}_{}/{}:{}".format
- (idd, idm, penis, listeners, relayclients))
- return True
- class Cursor:
- def __init__(self, connection):
- self.connection = connection
- def __enter__(self):
- self.cursor = self.connection.cursor()
- return self.cursor
- def __exit__(self, type, value, traceback):
- self.cursor.close()
- def get_listener_count(
- server = 'http://stream0.r-a-d.io:1130',
- mount = "/main.mp3",
- timeout = None
- ):
- try:
- request = urllib.request.Request(server, headers = {'User-Agent' : 'Mozilla'})
- result = urllib.request.urlopen(request, timeout = timeout)
- except:
- logging.debug('HTTP access fault: ' + server_name);
- #raise
- else:
- incoming = result.read().decode('utf8').split('\n')
- parser = StatusParser()
- for line in incoming:
- parser.feed(line)
- parser.close()
- result = parser.result
- if mount in result:
- if 'Current Listeners' in result[mount]:
- listeners = int(result[mount]['Current Listeners'])
- return listeners
- else:
- logging.debug('Server does not have listener count: ' + server_name)
- return -1
- logging.debug('Failed to fetch ' + server_name)
- return -1
- class AsyncListeners(object):
- relay_list = []
- timeout = 10.0
- def __init__(self, threads = 5):
- super(AsyncListeners, self).__init__()
- self.pool = Pool(threads)
- def async_get_all_listener_count(self, timeout = 10.0):
- result_list = []
- counts, timer = 0, 0.0
- for relay, mount in self.relay_list:
- result_list.append(self.pool.apply_async(
- get_listener_count,
- kwds = {
- 'server' : relay,
- 'timeout' : timeout,
- 'mount' : mount
- }))
- #print(result_list)
- while True:
- if len(result_list) == 0:
- break
- for result in result_list[:]:
- if result.ready():
- if result.successful():
- counts += result.get()
- result_list.remove(result)
- #print("LOL " + result)
- else:
- print("oh shit")
- #else:
- # nothing yet
- #print(result)
- timer += 0.5
- if timer > self.timeout:
- break
- time.sleep(0.5)
- return counts
- def get_status(icecast_server = 'http://stream0.r-a-d.io:1130/'):
- try:
- request = urllib.request.Request(icecast_server,
- headers = { 'User-Agent' : 'Mozilla' })
- result = urllib.request.urlopen(request)
- except HTTPError as e:
- if e.code == 403: #full server
- logging.warning("Listener limit reached? " + icecast_server)
- f = OrderedMultiDict()
- f['Stream Title'] = 'r/a/dio'
- f['Current Listeners'] = '500'
- f['Current Song'] = 'py'
- return { '/main.mp3' : f }
- else:
- logging.exception("HTTPError " + e.code + " in status fetch")
- except:
- logging.exception("Connection to status page failed")
- else:
- incoming = result.read().decode('utf8').split('\n')
- parser = StatusParser()
- for line in incoming:
- parser.feed(line)
- parser.close()
- result = parser.result
- # if '/main.mp3' in result:
- # all_listeners = get_all_listener_count()
- # total_count = reduce(lambda x,y:
- # x+y if x > 0 and y > 0 else x,
- # list(all_listeners.values()))
- # result[config.icecast_mount]['Current Listeners'] = str(total_count)
- return parser.result or {}
- return {}
- class StatusParser(html.parser.HTMLParser):
- def __init__(self):
- #html.parser.HTMLParser.__init__(self)
- super(StatusParser, self).__init__()
- self.result = {}
- self._td = None # in a TD? class or 'yes'
- self._h3 = False # in a H3?
- self._key = None # content of prev. TD with class
- self._tab = False # skip next </table> ?
- self._mount = None # mount name
- self._value = None # field value
- def handle_starttag(self, tag, attrs):
- #print('{} --- {} --- {}', self, tag, attrs)
- if tag == 'td':
- self._td = 'yes'
- if len(attrs) > 0:
- if attrs[0][0] == 'class':
- self._td = attrs[0][1]
- if (self._td == 'streamdata'):
- self._value = ''
- elif tag == 'h3' and self._td:
- self._h3 = True
- self._tab = True
- def handle_endtag(self, tag):
- if tag == 'td':
- if self._value:
- self.result[self._mount][self._key[:-1]] = self._value
- self._value = None
- self._td = None
- elif tag == 'h3':
- self._h3 = False
- elif tag == 'table':
- if not self._tab:
- self._mount = None
- self._tab = False
- def handle_data(self, data):
- if self._h3 and self._td:
- self._mount = data.split(' ')[2]
- self.result[self._mount] = OrderedMultiDict()
- elif self._td and self._mount:
- if self._td == 'streamdata':
- self._value += data
- else:
- self._key = data
- def handle_charref(self, name):
- if self._value:
- if name.startswith('x'):
- c = chr(int(name[1:], 16))
- else:
- c = chr(int(name))
- self._value += c
- def handle_entityref(self, name):
- if self._value:
- self._value += chr(name2codepoint[name])
- class OldStatusParser(html.parser.HTMLParser):
- def __init__(self):
- html.parser.HTMLParser.__init__(self)
- self._current_mount = None
- self.result = {}
- self._td = False
- self._mount = False
- self._enter = False
- def handle_starttag(self, tag, attrs):
- attrs = OrderedMultiDict(attrs)
- if (tag == "td"):
- self._td = Tag(attrs)
- self._td['class'] = None
- elif (tag == "h3") and (self._td):
- self._mount = Tag(attrs)
- def handle_endtag(self, tag):
- if (tag == "td"):
- self._td = None
- elif (tag == "h3") and (self._td):
- self._mount = None
- elif (tag == "table") and (self._current_mount):
- if (self._enter):
- self._enter = False
- else:
- self._enter = True
- def handle_data(self, data):
- if (self._mount) and (self._td):
- self._current_mount = data.split(" ")[2]
- self.result[self._current_mount] = OrderedMultiDict()
- elif (self._enter) and (self._td) and (self._current_mount):
- if ("streamdata" in self._td.getall("class")):
- self.result[self._current_mount][self._type] = data
- else:
- self._type = data[:-1]
- class Tag(object):
- attr = OrderedMultiDict()
- def __init__(self, attrs):
- self.attr = attrs
- def __getattr__(self, name):
- return getattr(self.attr, name)
- def __setitem__(self, name, value):
- self.attr[name] = value
- def fixmeta(meta):
- meta = meta.encode('iso_8859-1')
- try:
- try:
- meta = meta.decode('utf-8', 'strict')
- except (UnicodeDecodeError):
- meta = meta.decode('sjis', 'xmlcharrefreplace')
- except (TypeError):
- meta = meta.strip()
- return meta
- if __name__ == '__main__':
- main()
- # http://dev.mysql.com/doc/connector-python/en/myconnpy_example_connecting.html
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement