import os import re import json import unicodedata from datetime import datetime, timezone from flask import Flask, jsonify, request, send_from_directory, Response BASE_DIR = os.path.dirname(os.path.abspath(__file__)) LOGOS_ROOT = os.path.join(BASE_DIR, 'static', 'logos') FLAGS_ROOT = os.path.join(BASE_DIR, 'static', 'flags') DEFAULT_JSON = os.path.join(BASE_DIR, 'channels.json') DEFAULT_M3U = os.path.join(BASE_DIR, 'example.m3u') STATIC_DIR = os.path.join(BASE_DIR, 'static') # Maps tv-logos-main/countries/{folder} → ISO 3166-1 alpha-2 code COUNTRY_FOLDER_TO_ISO: dict = { 'albania': 'al', 'argentina': 'ar', 'australia': 'au', 'austria': 'at', 'azerbaijan': 'az', 'belgium': 'be', 'brazil': 'br', 'bulgaria': 'bg', 'canada': 'ca', 'caribbean': '', 'chile': 'cl', 'costa-rica': 'cr', 'croatia': 'hr', 'czech-republic': 'cz', 'france': 'fr', 'germany': 'de', 'greece': 'gr', 'hong-kong': 'hk', 'hungary': 'hu', 'india': 'in', 'indonesia': 'id', 'international': '', 'ireland': 'ie', 'israel': 'il', 'italy': 'it', 'lebanon': 'lb', 'lithuania': 'lt', 'luxembourg': 'lu', 'malaysia': 'my', 'malta': 'mt', 'mexico': 'mx', 'netherlands': 'nl', 'new-zealand': 'nz', 'nordic': '', 'philippines': 'ph', 'poland': 'pl', 'portugal': 'pt', 'romania': 'ro', 'russia': 'ru', 'serbia': 'rs', 'singapore': 'sg', 'slovakia': 'sk', 'slovenia': 'si', 'spain': 'es', 'switzerland': 'ch', 'turkey': 'tr', 'ukraine': 'ua', 'united-arab-emirates': 'ae', 'united-kingdom': 'gb', 'united-states': 'us', 'world-africa': '', 'world-asia': '', 'world-europe': '', 'world-latin-america': '', 'world-middle-east': '', } app = Flask(__name__, static_folder=STATIC_DIR) app.config['MAX_CONTENT_LENGTH'] = 32 * 1024 * 1024 # 32 MB max upload # ── Global state ───────────────────────────────────────────────────────────── logo_index: dict = {} state: dict = {'channels': [], 'groups': [], 'group_meta': {}, 'source_file': ''} _json_mtime: float = 0.0 # ── Logo index ──────────────────────────────────────────────────────────────── def build_logo_index(logos_root: str) -> dict: """Walk tv-logos-main/ recursively and build a slug → relative-path index. Each PNG stem is indexed both with and without the trailing country/region suffix (e.g. "-es", "-uk", "-int") so that channel-name slugs can match without needing to know the country code. """ index: dict = {} for root, _dirs, files in os.walk(logos_root): for fname in files: if not fname.lower().endswith('.png'): continue stem = fname[:-4] rel = os.path.relpath(os.path.join(root, fname), logos_root).replace(os.sep, '/') index[stem] = rel # Index without trailing 2-3 letter country/region suffix no_suffix = re.sub(r'-[a-z]{2,3}$', '', stem) if no_suffix and no_suffix != stem and no_suffix not in index: index[no_suffix] = rel return index def normalize_to_slug(text: str) -> str: """Convert a channel name or tvg-id to a kebab-case slug for logo lookup.""" s = text.lower() # Strip common quality/HD labels that don't appear in file names s = re.sub(r'\s+(sd|hd|fhd|uhd|4k)\b', '', s) s = re.sub(r'\s+(480p|720p|1080p|1440p|2160p)\b', '', s) # Strip trailing quality markers (* **) s = re.sub(r'\s*\*+\s*$', '', s) # Expand M+ prefix to m-plus so it can match logo file stems s = s.replace('m+', 'm-plus') # Normalise unicode (strip accents) s = unicodedata.normalize('NFD', s) s = ''.join(c for c in s if unicodedata.category(c) != 'Mn') # Replace any non-alphanumeric runs with a single hyphen s = re.sub(r'[^a-z0-9]+', '-', s) s = re.sub(r'-+', '-', s).strip('-') return s def resolve_logo(tvg_id: str, base_name: str, tvg_logo: str) -> tuple: """Return (url, source) for the best available logo. Priority: 1. Local logo matched by tvg_id slug (exact, then prefix) 2. Local logo matched by base_name slug (exact, then prefix) 3. External URL from the M3U tvg-logo attribute 4. Empty string / 'none' """ def lookup(slug: str): if slug in logo_index: return f'logos/{logo_index[slug]}', 'local' # Prefix match: allows "dazn-laliga" → "dazn-laliga-es" when no-suffix # alias wasn't created (e.g. the suffix was longer than 3 chars) for key, path in logo_index.items(): if key.startswith(slug) and len(key) <= len(slug) + 4: return f'logos/{path}', 'local' return None, None if tvg_id: url, src = lookup(normalize_to_slug(tvg_id)) if url: return url, src url, src = lookup(normalize_to_slug(base_name)) if url: return url, src if tvg_logo: return tvg_logo, 'tvg_logo' return '', 'none' # ── M3U parser ──────────────────────────────────────────────────────────────── _EXTINF_ATTRS = re.compile(r'([\w-]+)="([^"]*)"') _ACESTREAM_RE = re.compile(r'^acestream://([a-f0-9]{40})', re.IGNORECASE) _RESOLUTION_RE = re.compile(r'\s+(480p|720p|1080p|1440p|2160p|4[Kk]|UHD|FHD)\b\s*$', re.IGNORECASE) _QUALITY_MARKER_RE = re.compile(r'\s*(\*+)\s*$') _EXTGRP_RE = re.compile(r'group-title="([^"]*)".*?group-logo="([^"]*)"') def parse_channel_name(raw_name: str) -> dict: """Extract base_name, resolution and quality_marker from a raw M3U channel name. Example: "DAZN LaLiga 1080p **" → {'base_name': 'DAZN LaLiga', 'resolution': '1080p', 'quality_marker': '**'} """ name = raw_name.strip() # 1. Strip trailing quality marker (*** / ** / *) qm = _QUALITY_MARKER_RE.search(name) quality_marker = qm.group(1) if qm else '' if qm: name = name[:qm.start()] # 2. Strip trailing resolution token res_m = _RESOLUTION_RE.search(name) if res_m: raw_res = res_m.group(1) norm = raw_res.upper() if norm == 'FHD': resolution = '1080p' elif norm in ('UHD', '2160P'): resolution = '4K' elif norm.startswith('4K') or norm.startswith('4k'): resolution = '4K' else: resolution = raw_res # keeps "720p" / "1080p" as-is name = name[:res_m.start()] else: resolution = '' return { 'base_name': name.strip(), 'resolution': resolution, 'quality_marker': quality_marker, } def slugify(text: str) -> str: """Produce a URL-safe slug from arbitrary text.""" s = text.lower() s = unicodedata.normalize('NFD', s) s = ''.join(c for c in s if unicodedata.category(c) != 'Mn') s = re.sub(r'[^a-z0-9]+', '-', s).strip('-') return re.sub(r'-+', '-', s) def parse_m3u(content: str) -> tuple: """Parse raw M3U text and return (raw_entries, group_meta). raw_entries: list of dicts with keys tvg_id, tvg_logo, group_title, raw_name, acestream_hash, acestream_url. group_meta: dict mapping group name → logo URL (from #EXTGRP lines). """ raw_entries = [] group_meta: dict = {} pending = None for line in content.splitlines(): line = line.strip() if not line: continue if line.startswith('#EXTGRP:'): m = _EXTGRP_RE.search(line) if m: group_meta[m.group(1)] = m.group(2) elif line.startswith('#EXTINF:'): attrs = dict(_EXTINF_ATTRS.findall(line)) name = line.split(',', 1)[1].strip() if ',' in line else '' pending = { 'tvg_id': attrs.get('tvg-id', ''), 'tvg_logo': attrs.get('tvg-logo', ''), 'group_title': attrs.get('group-title', ''), 'raw_name': name, } elif pending and (m := _ACESTREAM_RE.match(line)): pending['acestream_hash'] = m.group(1) pending['acestream_url'] = f"acestream://{m.group(1)}" raw_entries.append(pending) pending = None elif line.startswith('#'): pass # other directives – keep pending elif pending: pending = None # unexpected non-acestream URL line return raw_entries, group_meta def country_code_from_logo_url(logo_url: str, tvg_logo: str) -> str: """Derive a 2-letter ISO country code from the resolved logo URL or original tvg-logo URL.""" for url in (logo_url, tvg_logo): if not url: continue # Pattern: /countries/{folder}/ (local path or GitHub raw URL) m = re.search(r'/countries/([^/]+)/', url) if m: code = COUNTRY_FOLDER_TO_ISO.get(m.group(1), '') if code: return code # Pattern: -{2letter}.png at end of filename m = re.search(r'-([a-z]{2})(?:-[a-z]+)?\.(?:png|svg|jpg)$', url.lower()) if m: return m.group(1) return '' def group_channels(raw_entries: list, group_meta: dict) -> list: """Merge raw entries into Channel objects grouped by (base_name, group). Channels with the same base_name in the same group are considered mirrors of the same channel and are grouped into a single Channel entry. """ channel_map: dict = {} order: list = [] id_seen: dict = {} for entry in raw_entries: parsed = parse_channel_name(entry['raw_name']) base_name = parsed['base_name'] group = entry['group_title'] key = (base_name.casefold(), group.casefold()) mirror = { 'resolution': parsed['resolution'], 'acestream_hash': entry['acestream_hash'], 'status': 'unknown', 'country_code': country_code_from_logo_url('', entry['tvg_logo']), } if key not in channel_map: base_id = slugify(base_name) or slugify(group) if base_id in id_seen: id_seen[base_id] += 1 channel_id = f'{base_id}-{id_seen[base_id]}' else: id_seen[base_id] = 0 channel_id = base_id channel_map[key] = { 'id': channel_id, 'name': base_name, 'group': group, 'subcategory': '', 'country_code': '', 'logo_url': '', 'tags': [], '_tvg_id': entry['tvg_id'], # internal '_tvg_logo': entry['tvg_logo'], # internal 'mirrors': [], } order.append(key) else: ch = channel_map[key] if not ch['_tvg_id'] and entry['tvg_id']: ch['_tvg_id'] = entry['tvg_id'] if not ch['_tvg_logo'] and entry['tvg_logo']: ch['_tvg_logo'] = entry['tvg_logo'] channel_map[key]['mirrors'].append(mirror) channels = [] for key in order: ch = channel_map[key] tvg_id = ch.pop('_tvg_id', '') tvg_logo = ch.pop('_tvg_logo', '') logo_url, _ = resolve_logo(tvg_id, ch['name'], tvg_logo) ch['logo_url'] = logo_url ch['country_code'] = country_code_from_logo_url(logo_url, tvg_logo) channels.append(ch) return channels def load_from_m3u_content(content: str, source_file: str) -> None: global state raw_entries, group_meta = parse_m3u(content) channels = group_channels(raw_entries, group_meta) groups = sorted({ch['group'] for ch in channels}) state = { 'channels': channels, 'groups': groups, 'group_meta': group_meta, 'source_file': source_file, } def load_from_json_content(data: dict, source_file: str) -> None: global state channels = data.get('channels', []) # backwards compat: old exports used base_name instead of name for ch in channels: if 'base_name' in ch and 'name' not in ch: ch['name'] = ch.pop('base_name') ch.setdefault('tags', []) ch.setdefault('country_code', '') for m in ch.get('mirrors', []): # old exports had acestream_url; drop it m.pop('acestream_url', None) m.pop('raw_name', None) m.pop('quality_marker', None) m.setdefault('status', 'unknown') m.setdefault('country_code', '') group_meta = {g['name']: g.get('logo', '') for g in data.get('groups', [])} groups = sorted({ch['group'] for ch in channels}) state = { 'channels': channels, 'groups': groups, 'group_meta': group_meta, 'source_file': source_file, } # ── Flask routes ────────────────────────────────────────────────────────────── @app.route('/') def index(): resp = send_from_directory(STATIC_DIR, 'index.html') resp.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' return resp @app.route('/ping') def ping(): return 'OK' @app.route('/logos/') def serve_logo(filename): return send_from_directory(LOGOS_ROOT, filename) @app.route('/flags/') def serve_flag(filename): return send_from_directory(FLAGS_ROOT, filename) def _maybe_reload_json() -> None: """Reload channels.json if it has been modified since last load.""" global _json_mtime if state.get('source_file') != 'channels.json': return if not os.path.exists(DEFAULT_JSON): return mtime = os.path.getmtime(DEFAULT_JSON) if mtime <= _json_mtime: return _json_mtime = mtime try: with open(DEFAULT_JSON, encoding='utf-8') as f: load_from_json_content(json.loads(f.read()), 'channels.json') except Exception: pass # keep existing state on parse error def _channels_response(): return jsonify({ 'channels': state['channels'], 'groups': state['groups'], 'group_meta': state['group_meta'], 'source': state['source_file'], 'total': len(state['channels']), }) @app.route('/api/channels') def api_channels(): _maybe_reload_json() return _channels_response() @app.route('/api/import/m3u', methods=['POST']) def api_import_m3u(): if 'file' not in request.files: return jsonify({'error': 'No file provided'}), 400 f = request.files['file'] try: content = f.read().decode('utf-8', errors='replace') load_from_m3u_content(content, f.filename or 'playlist.m3u') except Exception as exc: return jsonify({'error': str(exc)}), 400 return _channels_response() @app.route('/api/import/json', methods=['POST']) def api_import_json(): if 'file' not in request.files: return jsonify({'error': 'No file provided'}), 400 f = request.files['file'] try: data = json.loads(f.read().decode('utf-8', errors='replace')) load_from_json_content(data, f.filename or 'channels.json') except Exception as exc: return jsonify({'error': str(exc)}), 400 return _channels_response() @app.route('/api/export/json') def api_export_json(): export = { 'version': '1.0', 'exported_at': datetime.now(timezone.utc).isoformat(), 'source': state['source_file'], 'channels': state['channels'], 'groups': [ {'name': g, 'logo': state['group_meta'].get(g, ''), 'subcategories': []} for g in state['groups'] ], } return Response( json.dumps(export, ensure_ascii=False, indent=2), mimetype='application/json', headers={'Content-Disposition': 'attachment; filename="channels.json"'}, ) @app.route('/api/export/m3u') def api_export_m3u(): lines = ['#EXTM3U'] for ch in state['channels']: for m in ch['mirrors']: res = m.get('resolution', '') entry_name = f'{ch["name"]} {res}'.strip() if res else ch["name"] attrs = ( f'tvg-id="{ch.get("id", "")}" ' f'tvg-logo="{ch.get("logo_url", "")}" ' f'group-title="{ch["group"]}"' ) lines.append(f'#EXTINF:-1 {attrs},{entry_name}') lines.append(f'acestream://{m["acestream_hash"]}') return Response( '\n'.join(lines), mimetype='text/plain', headers={'Content-Disposition': 'attachment; filename="channels.m3u"'}, ) @app.route('/api/mirror/status', methods=['POST']) def api_mirror_status(): data = request.get_json() channel_id = data.get('channel_id') acestream_hash = data.get('acestream_hash') status = data.get('status', 'unknown') if status not in ('ok', 'issues', 'broken', 'unknown'): return jsonify({'error': 'invalid status'}), 400 ch = next((c for c in state['channels'] if c['id'] == channel_id), None) if not ch: return jsonify({'error': 'not found'}), 404 m = next((m for m in ch['mirrors'] if m['acestream_hash'] == acestream_hash), None) if not m: return jsonify({'error': 'not found'}), 404 m['status'] = status export = { 'version': '1.0', 'exported_at': datetime.now(timezone.utc).isoformat(), 'source': state['source_file'], 'channels': state['channels'], 'groups': [ {'name': g, 'logo': state['group_meta'].get(g, ''), 'subcategories': []} for g in state['groups'] ], } with open(DEFAULT_JSON, 'w', encoding='utf-8') as f: json.dump(export, f, ensure_ascii=False, indent=2) return jsonify({'ok': True}) # ── Startup ─────────────────────────────────────────────────────────────────── def startup() -> None: global logo_index, _json_mtime print('Building logo index…', end=' ', flush=True) logo_index = build_logo_index(LOGOS_ROOT) print(f'{len(logo_index)} entries indexed.') if os.path.exists(DEFAULT_JSON): try: _json_mtime = os.path.getmtime(DEFAULT_JSON) with open(DEFAULT_JSON, encoding='utf-8') as f: load_from_json_content(json.loads(f.read()), 'channels.json') print(f'Loaded {len(state["channels"])} channels from channels.json') except Exception as exc: print(f'Warning: could not load channels.json ({exc}), falling back to example.m3u') _json_mtime = 0.0 if os.path.exists(DEFAULT_M3U): with open(DEFAULT_M3U, encoding='utf-8') as f: load_from_m3u_content(f.read(), 'example.m3u') elif os.path.exists(DEFAULT_M3U): with open(DEFAULT_M3U, encoding='utf-8') as f: load_from_m3u_content(f.read(), 'example.m3u') print(f'Loaded {len(state["channels"])} channels from example.m3u') startup() if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=5000, use_reloader=False)