Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3
- from __future__ import generators
- import time
- import logging
- import html.parser
- import urllib.request
- from urllib.error import HTTPError
- from multiprocessing.dummy import Pool
- from MultiDict import OrderedMultiDict
- from functools import reduce
- def get_listener_count(
- server = 'http://stream0.r-a-d.io:1130',
- mount = "/main.mp3",
- timeout = None
- ):
- try:
- request = urllib.request.Request(server, headers = {'User-Agent' : 'Mozilla'})
- result = urllib.request.urlopen(request, timeout = timeout)
- except:
- logging.debug('HTTP access fault: ' + server_name);
- #raise
- else:
- incoming = result.read().decode('utf8').split('\n')
- parser = StatusParser()
- for line in incoming:
- parser.feed(line)
- parser.close()
- result = parser.result
- if mount in result:
- if 'Current Listeners' in result[mount]:
- listeners = int(result[mount]['Current Listeners'])
- return listeners
- else:
- logging.debug('Server does not have listener count: ' + server_name)
- return -1
- logging.debug('Failed to fetch ' + server_name)
- return -1
- class AsyncListeners(object):
- relay_list = []
- timeout = 10.0
- def __init__(self, threads = 5):
- super(AsyncListeners, self).__init__()
- self.pool = Pool(threads)
- def async_get_all_listener_count(self, timeout = 10.0):
- result_list = []
- counts, timer = 0, 0.0
- for relay, mount in self.relay_list:
- result_list.append(self.pool.apply_async(
- get_listener_count,
- kwds = {
- 'server' : relay,
- 'timeout' : timeout,
- 'mount' : mount
- }))
- #print(result_list)
- while True:
- if len(result_list) == 0:
- break
- for result in result_list[:]:
- if result.ready():
- if result.successful():
- counts += result.get()
- result_list.remove(result)
- #print("LOL " + result)
- else:
- print("oh shit")
- #else:
- # nothing yet
- #print(result)
- timer += 0.5
- if timer > self.timeout:
- break
- time.sleep(0.5)
- return counts
- def get_status(icecast_server = 'http://stream0.r-a-d.io:1130/'):
- try:
- request = urllib.request.Request(icecast_server,
- headers = { 'User-Agent' : 'Mozilla' })
- result = urllib.request.urlopen(request)
- except HTTPError as e:
- if e.code == 403: #full server
- logging.warning("Listener limit reached? " + icecast_server)
- f = OrderedMultiDict()
- f['Stream Title'] = 'r/a/dio'
- f['Current Listeners'] = '500'
- f['Current Song'] = 'py'
- return { '/main.mp3' : f }
- else:
- logging.exception("HTTPError " + e.code + " in status fetch")
- except:
- logging.exception("Connection to status page failed")
- else:
- incoming = result.read().decode('iso_8859-1').split('\n')
- parser = StatusParser()
- for line in incoming:
- parser.feed(line)
- parser.close()
- result = parser.result
- # if '/main.mp3' in result:
- # all_listeners = get_all_listener_count()
- # total_count = reduce(lambda x,y:
- # x+y if x > 0 and y > 0 else x,
- # list(all_listeners.values()))
- # result[config.icecast_mount]['Current Listeners'] = str(total_count)
- return parser.result or {}
- return {}
- class StatusParser(html.parser.HTMLParser):
- def __init__(self):
- html.parser.HTMLParser.__init__(self)
- self._current_mount = None
- self.result = {}
- self._td = False
- self._mount = False
- self._enter = False
- def handle_starttag(self, tag, attrs):
- attrs = OrderedMultiDict(attrs)
- if (tag == "td"):
- self._td = Tag(attrs)
- self._td['class'] = None
- elif (tag == "h3") and (self._td):
- self._mount = Tag(attrs)
- def handle_endtag(self, tag):
- if (tag == "td"):
- self._td = None
- elif (tag == "h3") and (self._td):
- self._mount = None
- elif (tag == "table") and (self._current_mount):
- if (self._enter):
- self._enter = False
- else:
- self._enter = True
- def handle_data(self, data):
- if (self._mount) and (self._td):
- self._current_mount = data.split(" ")[2]
- self.result[self._current_mount] = OrderedMultiDict()
- elif (self._enter) and (self._td) and (self._current_mount):
- if ("streamdata" in self._td.getall("class")):
- self.result[self._current_mount][self._type] = data
- else:
- self._type = data[:-1]
- class Tag(object):
- attr = OrderedMultiDict()
- def __init__(self, attrs):
- self.attr = attrs
- def __getattr__(self, name):
- return getattr(self.attr, name)
- def __setitem__(self, name, value):
- self.attr[name] = value
- print(" -!- running")
- print(get_listener_count())
- n = AsyncListeners()
- n.relay_list = [
- ('http://stream1.r-a-d.io:1130/','/radio.ogg'),
- ('http://stream2.r-a-d.io:1130/','/relay.mp3'),
- ('http://stream3.r-a-d.io:1130/','/relay.mp3')
- ]
- print(n.async_get_all_listener_count())
- print(" -!- finished")
- st = get_status()
- print(st['/main.mp3']['Current Song'])
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement