Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- #===============================================================================
- # m3u2hts.py - Generate/update TVHeadend 3.9 channel/tag configuration files from
- # IPTV M3U playlist
- #
- # (c) 2012 Gregor Rudolf, 2015 antiram
- # Licensed under the MIT license:
- # http://www.opensource.org/licenses/mit-license.php
- #===============================================================================
- from optparse import OptionParser
- import codecs
- import re
- import os
- import sys
- import shutil
- import string
- try:
- import json
- except ImportError:
- # old python? easy_install simplejson
- import simplejson as json
- PROGNUM = re.compile(r"(\d+) - (.*)") # #EXTINF:0,1 - SLO 1 -> #1 - num, 2 - ime
- URLPART = re.compile(r"^((?P<scheme>.+?)://@?)?(?P<host>.*?)(:(?P<port>\d+?))?$")
- #for iptvsimple.addon tags
- XMLTV = re.compile('tvg-id="(.*?)"')
- GROUP = re.compile('group-title="(.*?)"')
- ICON = re.compile('tvg-logo="(.*?)"')
- HDTV = re.compile('(.*?)\sHD')
- #MUXNAME = re.compile('(.+?)"iptv_muxname":\s(.*?)"', re.S)
- MUXNAME = re.compile('\{(.*?)\}', re.S)
- KEYVALUE = re.compile('\s{0,}"(.*?)":\s"(.*?)"')
- CHAN_NUMBERING_GENERATE = 0
- CHAN_NUMBERING_DURATION = 1
- CHAN_NUMBERING_NAMES = 2
- channels = dict()
- tags = dict()
- htschannels = dict()
- htstags = dict()
- htsmuxes = dict()
- networkkey = None
- def readm3u(infile, removenum, channumbering, inputcodec):
- """
- Read IPTV channels from .M3U file
- @param infile: input file
- @param removenum: try to remove channel numbers from names
- @param channumbering: how to get channel number
- @param inputcodec: encoding of input file
- """
- instream = codecs.open(infile, "Ur", encoding=inputcodec)
- chancnt = 0
- tagcnt = 0
- chname = ''
- chtags = None
- chlanguage = None
- chnumber = None
- chxmltv = None
- chicon = None
- for line in instream.readlines():
- line = line.strip()
- if line.startswith("#EXTINF:"):
- #EXTINF:duration,channel number - channel name
- buff = line[8:].split(',')
- chattr = buff[0].split(' ')
- m = PROGNUM.search(buff[1])
- if removenum and m:
- chname = m.group(2)
- else:
- chname = buff[len(buff)-1]
- chname = re.sub(r'\[/?.*?\]', '', chname).strip() #for iptvsimple.addon tags
- if m and channumbering == CHAN_NUMBERING_NAMES:
- chnumber = m.group(1)
- elif channumbering == CHAN_NUMBERING_DURATION:
- chnumber = buff[0]
- #for tvip.ga
- for i in range(1, len(chattr)):
- found = False
- m = XMLTV.match(chattr[i])
- if m:
- chxmltv = m.group(1)
- found = True
- if not found:
- m = GROUP.match(chattr[i])
- if m:
- chtags = m.group(1).split(',')
- n = HDTV.match(chname)
- if n:
- chtags.append(u'HDTV')
- else:
- chtags.append(u'SDTV')
- for t in chtags:
- if not t in tags:
- tagcnt += 1
- tags[t] = {'num': tagcnt, 'name': t}
- found = True
- if not found:
- m = ICON.match(chattr[i])
- if m:
- chicon = "file://" + os.getenv("HOME") + "/.icons/" + m.group(1)
- chattr = None
- elif line.startswith('#EXTTV:'):
- #EXTTV:tag[,tag,tag...];language;XMLTV id[;icon URL]
- buff = line[7:].split(';')
- chtags = buff[0].split(',')
- for t in chtags:
- if not t in tags:
- tagcnt += 1
- tags[t] = {'num': tagcnt, 'name': t}
- chlanguage = buff[1]
- if chlanguage:
- if not chlanguage in tags:
- tagcnt += 1
- tags[chlanguage] = {'num': tagcnt, 'name': chlanguage}
- chtags.append(chlanguage)
- chxmltv = buff[2]
- chicon = buff[3] if len(buff) > 3 else None
- else:
- chgroup = re.search(URLPART, line).groupdict()
- if not chgroup or not chgroup["scheme"]:
- continue
- chancnt += 1
- if channumbering == CHAN_NUMBERING_GENERATE: chnumber = chancnt
- if chname in channels:
- print "already exists in m3u: %s" % chname
- chname += '.'
- channels[chname] = {'num': chancnt, 'number': chnumber, 'name': chname, 'tags': chtags, 'lang': chlanguage,
- 'scheme': chgroup["scheme"], 'ip': chgroup["host"], 'port': chgroup["port"],
- 'xmltv': chxmltv, 'icon': chicon}
- chname = ''
- chtags = None
- chlanguage = None
- chnumber = None
- chxmltv = None
- chicon = None
- def uuid():
- import uuid
- return uuid.uuid4().hex
- def writechannels39(iface, output, networkname):
- xmltvpath = "epggrab/xmltv/channels"
- if not os.path.exists(xmltvpath):
- os.makedirs(xmltvpath)
- tagpath = 'channel/tag'
- if not os.path.exists(tagpath):
- os.makedirs(tagpath)
- chnpath = 'channel/config'
- if not os.path.exists(chnpath):
- os.makedirs(chnpath)
- #channel/tag/UUID
- for tag in tags.values():
- if tag['name'] not in htstags:
- print "new Tag: %s" % tag['name']
- tag['id'] = uuid()
- jstag = {'enabled': 1,
- 'internal': 0,
- 'titledIcon': 0,
- 'name': tag['name'],
- 'comment': '',
- 'icon': ''}
- writejson(os.path.join(tagpath, tag['id']), jstag)
- else:
- tag['id'] = htstags[tag['name']]['id']
- #input/iptv
- path = os.path.join('input', 'iptv')
- if not os.path.exists(path):
- os.makedirs(path)
- #input/iptv/config
- if not os.path.isfile(os.path.join(path, 'config')):
- writejson(os.path.join(path, 'config'), {
- 'uuid': uuid(),
- 'skipinitscan': 1,
- 'autodiscovery': 0
- })
- #input/iptv/networks/uuid()
- if networkkey is None:
- path = os.path.join(path, 'networks', uuid())
- if not os.path.exists(path):
- os.makedirs(path)
- writejson(os.path.join(path, 'config'), {
- 'networkname': networkname, # Network name
- 'skipinitscan': 1, # Skip initial scan
- 'autodiscovery': 0, # Network discovery
- 'idlescan': 0, # Idle scan
- 'max_streams': 2, # Max input streams
- 'max_bandwidth': 0, # Max bandwidth (Kbps)
- 'max_timeout': 10 # Max timeout (seconds)
- })
- #input/iptv/networks/uuid()/muxes
- path = os.path.join(path, 'muxes')
- if not os.path.exists(path):
- os.mkdir(path)
- else:
- path = os.path.join(path, 'networks', networkkey, 'muxes')
- #remove existing channels from dict
- count = len(channels)
- for mkey in htsmuxes.iterkeys():
- if mkey in channels.keys():
- del channels[mkey]
- elif mkey:
- print "%s is new" % mkey
- print "(%d new Channels)" % len(channels)
- #one mux and service for each channel
- for channel in channels.values():
- print "new Channel in m3u: %s" % channel['name']
- muxid = uuid()
- muxpath = os.path.join(path, muxid)
- if not os.path.exists(muxpath):
- os.mkdir(muxpath)
- jsmux = {
- 'iptv_url': getUrl(channel),
- 'iptv_interface': iface,
- 'iptv_atsc': 0,
- 'iptv_svcname': channel['name'],
- 'iptv_muxname': channel['name'],
- 'iptv_sname': channel['name'],
- 'enabled': 1,
- 'scan_result': 2 # mark scan result (1 - ok, 2 - failed)
- }
- #input/iptv/networks/uuid()/muxes/uuid()/config file
- writejson(os.path.join(muxpath, 'config'), jsmux)
- #input/iptv/networks/uuid()/muxes/uuid()/services/uuid()
- svcpath = os.path.join(muxpath, 'services')
- if not os.path.exists(svcpath):
- os.mkdir(svcpath)
- #create empty service with id 1
- if 'service' in output:
- svcid = uuid()
- jssvc = {
- 'sid': 1, # guess service id
- 'svcname': channel['name'],
- 'name': channel['name'],
- 'dvb_servicetype': 1,
- 'enabled': 1
- }
- writejson(os.path.join(svcpath, svcid), jssvc)
- else:
- svcid = None
- #channel/config
- if 'channel' in output:
- chanid = uuid()
- jschan = {
- 'name': channel['name'],
- 'dvr_pre_time': 0,
- 'dvr_pst_time': 0,
- 'services': [svcid]
- }
- if channel['number'] is not None:
- jschan['number'] = int(channel['number'])
- if channel['tags'] is not None:
- jschan['tags'] = list(tags[x]['id'] for x in channel['tags'])
- if channel['icon'] is not None:
- jschan['icon'] = channel['icon']
- writejson(os.path.join(chnpath, chanid), jschan)
- #epg
- #epggrab/xmltv/channels/#
- if channel['xmltv'] is not None:
- xmlid = channel['xmltv']
- #else:
- # xmlid = channel['name']
- jsepg = {
- 'name': xmlid,
- 'channels': [chanid],
- 'icon': channel['icon'] #for tvip.ga
- }
- #writejson(os.path.join(xmltvpath, chanid), jsepg)
- if xmlid != '':
- writejson(os.path.join(xmltvpath, xmlid), jsepg)
- def getUrl(channel):
- if channel['port']:
- url = 'pipe://ffpipe.sh %s://%s:%s' % (channel['scheme'], channel['ip'], channel['port']) #for tvip.ga
- else:
- url = 'pipe://ffpipe.sh %s %s://%s' % (re.sub(r"\s", "\ ", channel['name']), channel['scheme'], channel['ip']) #for tvip.ga
- return url
- def readhtsnetworkkey(networkname):
- path = os.path.join(os.getcwd(), 'input', 'iptv', 'networks')
- if os.path.exists(path):
- for nkey in os.listdir(path):
- ifile = codecs.open(os.path.join(path, nkey, 'config'), 'r', 'utf-8')
- for line in ifile.readlines():
- m = KEYVALUE.match(line)
- if m:
- if m.group(1) == 'networkname' and m.group(2) == networkname:
- return nkey
- ifile.close()
- else:
- return None
- def updatehtsmuxes():
- path = os.path.join(os.getcwd(), 'input', 'iptv', 'networks', networkkey, 'muxes')
- muxname = ''
- url = ''
- for i in os.listdir(path):
- ifile = codecs.open(os.path.join(path, i, 'config'), 'Ur+', 'utf-8')
- for line in ifile.readlines():
- m = KEYVALUE.match(line)
- if m:
- if m.group(1) == 'iptv_muxname':
- muxname = m.group(2)
- elif m.group(1) == 'iptv_url':
- url = m.group(2)
- if muxname in channels:
- #update mux url
- churl = re.sub(r'\\\s', r'\\\\ ', getUrl(channels[muxname]))
- if url != churl:
- ifile.seek(0)
- out = string.replace(ifile.read(), url, churl)
- ifile.seek(0)
- ifile.write(out)
- url = churl
- print "mux url changed for %s" % muxname
- htsmuxes[muxname] = {'id': i, 'iptv_muxname': muxname, 'iptv_url': url}
- else:
- ifile.close()
- shutil.rmtree(os.path.join(os.getcwd(), 'input', 'iptv', 'networks', networkkey, 'muxes', i))
- print "deleted mux and services for %s" % muxname
- ifile.close()
- def readhtstags():
- path = os.path.join(os.getcwd(), 'channel', 'tag')
- name = ''
- for i in os.listdir(path):
- ifile = codecs.open(os.path.join(path, i), 'Ur', 'utf-8')
- for line in ifile.readlines():
- m = KEYVALUE.match(line)
- if m:
- if m.group(1) == "name":
- name = m.group(2)
- htstags[name] = {'id': i, 'name': name}
- ifile.close()
- def updatehtschannels():
- path = os.path.join(os.getcwd(), 'channel', 'config')
- name = ''
- dvr_pst_time = 0
- dvr_pre_time = 0
- number = 0
- icon = ''
- for i in os.listdir(path):
- ifile = codecs.open(os.path.join(path, i), 'Ur+', 'utf-8')
- for line in ifile.readlines():
- m = KEYVALUE.match(line)
- if m:
- if m.group(1) == "name":
- name = m.group(2)
- elif m.group(1) == "dvr_pst_time":
- dvr_pst_time = m.group(2)
- elif m.group(1) == "dvr_pre_time":
- dvr_pre_time = m.group(2)
- elif m.group(1) == "number":
- number = m.group(2)
- elif m.group(1) == "icon":
- icon = m.group(2)
- if name in channels:
- #print "icon: %s" % channels[name]['icon']
- if channels[name]['icon'] is not None:
- chicon = re.sub(r"\\\s", r"\\\\ ", channels[name]['icon'])
- if icon != chicon:
- ifile.seek(0)
- out = string.replace(ifile.read(), icon, chicon)
- ifile.seek(0)
- ifile.write(out)
- icon = chicon
- print "channel icon changed for %s" % name
- ifile.close()
- htschannels[name] = {'id': i, 'name': name, 'dvr_pst_time': dvr_pst_time, 'dvr_pre_time': dvr_pre_time, 'number': number, 'icon': icon}
- #print "%d hts channels read" % len(htschannels)
- def whatisthis(s):
- if isinstance(s, str):
- print "ordinary string"
- elif isinstance(s, unicode):
- print "unicode string"
- else:
- print "not a string"
- def writejson(filename, obj):
- """
- Export obj to filename in JSON format
- @param filename: output file
- @param obj: object to export
- """
- outstream = codecs.open(filename, "w", encoding='utf-8')
- json.dump(obj, outstream, indent=4, ensure_ascii=False)
- outstream.close()
- def main():
- par = OptionParser(usage="%prog [options] inputfile",
- description="Generate TVHeadend 3.9 channel/tag configuration files from IPTV M3U playlist")
- par.add_option('-r', '--removenum', action='store_true', help=u'remove program numbers from names')
- par.add_option('-n', '--numbering', type='int', default=0,
- help=u'program numbers are generated(0), determined from duration(1) or extracted from program names(2)')
- par.add_option('-c', '--codec', action='store', dest='codec', default='cp1250',
- help=u'input file encoding [default: %default]')
- par.add_option('-i', '--iface', action='store', dest='iface', default='eth1',
- help=u'IPTV interface [default: %default]')
- par.add_option('-o', '--output', action='append', type='string', dest='output',
- help=u'what kind of 3.9 output to generate in addition to muxes. '
- u'Available options are "service" and "channel", repeat to combine [default: none]')
- par.add_option('-l', '--networklabel', action='store', dest='networkname', default='IPTV network',
- help=u'Network name in tvheadend')
- opt, args = par.parse_args()
- if len(args) == 1:
- global networkkey
- networkkey = readhtsnetworkkey(opt.networkname)
- readm3u(args[0], opt.removenum, opt.numbering, opt.codec)
- if networkkey is not None:
- updatehtsmuxes()
- readhtstags()
- updatehtschannels()
- writechannels39(opt.iface, opt.output, opt.networkname)
- print("OK")
- else:
- par.print_help()
- if __name__ == '__main__':
- main()
Add Comment
Please, Sign In to add comment