Files
project-a13tv/app.py
2026-03-17 14:03:14 +01:00

530 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
import json
import unicodedata
from datetime import datetime, timezone
from flask import Flask, jsonify, request, send_from_directory, Response
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
LOGOS_ROOT = os.path.join(BASE_DIR, 'static', 'logos')
FLAGS_ROOT = os.path.join(BASE_DIR, 'static', 'flags')
DEFAULT_JSON = os.path.join(BASE_DIR, 'channels.json')
DEFAULT_M3U = os.path.join(BASE_DIR, 'example.m3u')
STATIC_DIR = os.path.join(BASE_DIR, 'static')
# Maps tv-logos-main/countries/{folder} → ISO 3166-1 alpha-2 code
COUNTRY_FOLDER_TO_ISO: dict = {
'albania': 'al', 'argentina': 'ar', 'australia': 'au', 'austria': 'at',
'azerbaijan': 'az', 'belgium': 'be', 'brazil': 'br', 'bulgaria': 'bg',
'canada': 'ca', 'caribbean': '', 'chile': 'cl', 'costa-rica': 'cr',
'croatia': 'hr', 'czech-republic': 'cz', 'france': 'fr', 'germany': 'de',
'greece': 'gr', 'hong-kong': 'hk', 'hungary': 'hu', 'india': 'in',
'indonesia': 'id', 'international': '', 'ireland': 'ie', 'israel': 'il',
'italy': 'it', 'lebanon': 'lb', 'lithuania': 'lt', 'luxembourg': 'lu',
'malaysia': 'my', 'malta': 'mt', 'mexico': 'mx', 'netherlands': 'nl',
'new-zealand': 'nz', 'nordic': '', 'philippines': 'ph', 'poland': 'pl',
'portugal': 'pt', 'romania': 'ro', 'russia': 'ru', 'serbia': 'rs',
'singapore': 'sg', 'slovakia': 'sk', 'slovenia': 'si', 'spain': 'es',
'switzerland': 'ch', 'turkey': 'tr', 'ukraine': 'ua',
'united-arab-emirates': 'ae', 'united-kingdom': 'gb', 'united-states': 'us',
'world-africa': '', 'world-asia': '', 'world-europe': '',
'world-latin-america': '', 'world-middle-east': '',
}
app = Flask(__name__, static_folder=STATIC_DIR)
app.config['MAX_CONTENT_LENGTH'] = 32 * 1024 * 1024 # 32 MB max upload
# ── Global state ─────────────────────────────────────────────────────────────
logo_index: dict = {}
state: dict = {'channels': [], 'groups': [], 'group_meta': {}, 'source_file': ''}
_json_mtime: float = 0.0
# ── Logo index ────────────────────────────────────────────────────────────────
def build_logo_index(logos_root: str) -> dict:
"""Walk tv-logos-main/ recursively and build a slug → relative-path index.
Each PNG stem is indexed both with and without the trailing country/region
suffix (e.g. "-es", "-uk", "-int") so that channel-name slugs can match
without needing to know the country code.
"""
index: dict = {}
for root, _dirs, files in os.walk(logos_root):
for fname in files:
if not fname.lower().endswith('.png'):
continue
stem = fname[:-4]
rel = os.path.relpath(os.path.join(root, fname), logos_root).replace(os.sep, '/')
index[stem] = rel
# Index without trailing 2-3 letter country/region suffix
no_suffix = re.sub(r'-[a-z]{2,3}$', '', stem)
if no_suffix and no_suffix != stem and no_suffix not in index:
index[no_suffix] = rel
return index
def normalize_to_slug(text: str) -> str:
"""Convert a channel name or tvg-id to a kebab-case slug for logo lookup."""
s = text.lower()
# Strip common quality/HD labels that don't appear in file names
s = re.sub(r'\s+(sd|hd|fhd|uhd|4k)\b', '', s)
s = re.sub(r'\s+(480p|720p|1080p|1440p|2160p)\b', '', s)
# Strip trailing quality markers (* **)
s = re.sub(r'\s*\*+\s*$', '', s)
# Expand M+ prefix to m-plus so it can match logo file stems
s = s.replace('m+', 'm-plus')
# Normalise unicode (strip accents)
s = unicodedata.normalize('NFD', s)
s = ''.join(c for c in s if unicodedata.category(c) != 'Mn')
# Replace any non-alphanumeric runs with a single hyphen
s = re.sub(r'[^a-z0-9]+', '-', s)
s = re.sub(r'-+', '-', s).strip('-')
return s
def resolve_logo(tvg_id: str, base_name: str, tvg_logo: str) -> tuple:
"""Return (url, source) for the best available logo.
Priority:
1. Local logo matched by tvg_id slug (exact, then prefix)
2. Local logo matched by base_name slug (exact, then prefix)
3. External URL from the M3U tvg-logo attribute
4. Empty string / 'none'
"""
def lookup(slug: str):
if slug in logo_index:
return f'logos/{logo_index[slug]}', 'local'
# Prefix match: allows "dazn-laliga" → "dazn-laliga-es" when no-suffix
# alias wasn't created (e.g. the suffix was longer than 3 chars)
for key, path in logo_index.items():
if key.startswith(slug) and len(key) <= len(slug) + 4:
return f'logos/{path}', 'local'
return None, None
if tvg_id:
url, src = lookup(normalize_to_slug(tvg_id))
if url:
return url, src
url, src = lookup(normalize_to_slug(base_name))
if url:
return url, src
if tvg_logo:
return tvg_logo, 'tvg_logo'
return '', 'none'
# ── M3U parser ────────────────────────────────────────────────────────────────
_EXTINF_ATTRS = re.compile(r'([\w-]+)="([^"]*)"')
_ACESTREAM_RE = re.compile(r'^acestream://([a-f0-9]{40})', re.IGNORECASE)
_RESOLUTION_RE = re.compile(r'\s+(480p|720p|1080p|1440p|2160p|4[Kk]|UHD|FHD)\b\s*$', re.IGNORECASE)
_QUALITY_MARKER_RE = re.compile(r'\s*(\*+)\s*$')
_EXTGRP_RE = re.compile(r'group-title="([^"]*)".*?group-logo="([^"]*)"')
def parse_channel_name(raw_name: str) -> dict:
"""Extract base_name, resolution and quality_marker from a raw M3U channel name.
Example: "DAZN LaLiga 1080p **"
{'base_name': 'DAZN LaLiga', 'resolution': '1080p', 'quality_marker': '**'}
"""
name = raw_name.strip()
# 1. Strip trailing quality marker (*** / ** / *)
qm = _QUALITY_MARKER_RE.search(name)
quality_marker = qm.group(1) if qm else ''
if qm:
name = name[:qm.start()]
# 2. Strip trailing resolution token
res_m = _RESOLUTION_RE.search(name)
if res_m:
raw_res = res_m.group(1)
norm = raw_res.upper()
if norm == 'FHD':
resolution = '1080p'
elif norm in ('UHD', '2160P'):
resolution = '4K'
elif norm.startswith('4K') or norm.startswith('4k'):
resolution = '4K'
else:
resolution = raw_res # keeps "720p" / "1080p" as-is
name = name[:res_m.start()]
else:
resolution = ''
return {
'base_name': name.strip(),
'resolution': resolution,
'quality_marker': quality_marker,
}
def slugify(text: str) -> str:
"""Produce a URL-safe slug from arbitrary text."""
s = text.lower()
s = unicodedata.normalize('NFD', s)
s = ''.join(c for c in s if unicodedata.category(c) != 'Mn')
s = re.sub(r'[^a-z0-9]+', '-', s).strip('-')
return re.sub(r'-+', '-', s)
def parse_m3u(content: str) -> tuple:
"""Parse raw M3U text and return (raw_entries, group_meta).
raw_entries: list of dicts with keys tvg_id, tvg_logo, group_title,
raw_name, acestream_hash, acestream_url.
group_meta: dict mapping group name → logo URL (from #EXTGRP lines).
"""
raw_entries = []
group_meta: dict = {}
pending = None
for line in content.splitlines():
line = line.strip()
if not line:
continue
if line.startswith('#EXTGRP:'):
m = _EXTGRP_RE.search(line)
if m:
group_meta[m.group(1)] = m.group(2)
elif line.startswith('#EXTINF:'):
attrs = dict(_EXTINF_ATTRS.findall(line))
name = line.split(',', 1)[1].strip() if ',' in line else ''
pending = {
'tvg_id': attrs.get('tvg-id', ''),
'tvg_logo': attrs.get('tvg-logo', ''),
'group_title': attrs.get('group-title', ''),
'raw_name': name,
}
elif pending and (m := _ACESTREAM_RE.match(line)):
pending['acestream_hash'] = m.group(1)
pending['acestream_url'] = f"acestream://{m.group(1)}"
raw_entries.append(pending)
pending = None
elif line.startswith('#'):
pass # other directives keep pending
elif pending:
pending = None # unexpected non-acestream URL line
return raw_entries, group_meta
def country_code_from_logo_url(logo_url: str, tvg_logo: str) -> str:
"""Derive a 2-letter ISO country code from the resolved logo URL or original tvg-logo URL."""
for url in (logo_url, tvg_logo):
if not url:
continue
# Pattern: /countries/{folder}/ (local path or GitHub raw URL)
m = re.search(r'/countries/([^/]+)/', url)
if m:
code = COUNTRY_FOLDER_TO_ISO.get(m.group(1), '')
if code:
return code
# Pattern: -{2letter}.png at end of filename
m = re.search(r'-([a-z]{2})(?:-[a-z]+)?\.(?:png|svg|jpg)$', url.lower())
if m:
return m.group(1)
return ''
def group_channels(raw_entries: list, group_meta: dict) -> list:
"""Merge raw entries into Channel objects grouped by (base_name, group).
Channels with the same base_name in the same group are considered mirrors
of the same channel and are grouped into a single Channel entry.
"""
channel_map: dict = {}
order: list = []
id_seen: dict = {}
for entry in raw_entries:
parsed = parse_channel_name(entry['raw_name'])
base_name = parsed['base_name']
group = entry['group_title']
key = (base_name.casefold(), group.casefold())
mirror = {
'resolution': parsed['resolution'],
'acestream_hash': entry['acestream_hash'],
'status': 'unknown',
'country_code': country_code_from_logo_url('', entry['tvg_logo']),
}
if key not in channel_map:
base_id = slugify(base_name) or slugify(group)
if base_id in id_seen:
id_seen[base_id] += 1
channel_id = f'{base_id}-{id_seen[base_id]}'
else:
id_seen[base_id] = 0
channel_id = base_id
channel_map[key] = {
'id': channel_id,
'name': base_name,
'group': group,
'subcategory': '',
'country_code': '',
'logo_url': '',
'tags': [],
'_tvg_id': entry['tvg_id'], # internal
'_tvg_logo': entry['tvg_logo'], # internal
'mirrors': [],
}
order.append(key)
else:
ch = channel_map[key]
if not ch['_tvg_id'] and entry['tvg_id']:
ch['_tvg_id'] = entry['tvg_id']
if not ch['_tvg_logo'] and entry['tvg_logo']:
ch['_tvg_logo'] = entry['tvg_logo']
channel_map[key]['mirrors'].append(mirror)
channels = []
for key in order:
ch = channel_map[key]
tvg_id = ch.pop('_tvg_id', '')
tvg_logo = ch.pop('_tvg_logo', '')
logo_url, _ = resolve_logo(tvg_id, ch['name'], tvg_logo)
ch['logo_url'] = logo_url
ch['country_code'] = country_code_from_logo_url(logo_url, tvg_logo)
channels.append(ch)
return channels
def load_from_m3u_content(content: str, source_file: str) -> None:
global state
raw_entries, group_meta = parse_m3u(content)
channels = group_channels(raw_entries, group_meta)
groups = sorted({ch['group'] for ch in channels})
state = {
'channels': channels,
'groups': groups,
'group_meta': group_meta,
'source_file': source_file,
}
def load_from_json_content(data: dict, source_file: str) -> None:
global state
channels = data.get('channels', [])
# backwards compat: old exports used base_name instead of name
for ch in channels:
if 'base_name' in ch and 'name' not in ch:
ch['name'] = ch.pop('base_name')
ch.setdefault('tags', [])
ch.setdefault('country_code', '')
for m in ch.get('mirrors', []):
# old exports had acestream_url; drop it
m.pop('acestream_url', None)
m.pop('raw_name', None)
m.pop('quality_marker', None)
m.setdefault('status', 'unknown')
m.setdefault('country_code', '')
group_meta = {g['name']: g.get('logo', '') for g in data.get('groups', [])}
groups = sorted({ch['group'] for ch in channels})
state = {
'channels': channels,
'groups': groups,
'group_meta': group_meta,
'source_file': source_file,
}
# ── Flask routes ──────────────────────────────────────────────────────────────
@app.route('/')
def index():
resp = send_from_directory(STATIC_DIR, 'index.html')
resp.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
return resp
@app.route('/ping')
def ping():
return 'OK'
@app.route('/logos/<path:filename>')
def serve_logo(filename):
return send_from_directory(LOGOS_ROOT, filename)
@app.route('/flags/<path:filename>')
def serve_flag(filename):
return send_from_directory(FLAGS_ROOT, filename)
def _maybe_reload_json() -> None:
"""Reload channels.json if it has been modified since last load."""
global _json_mtime
if state.get('source_file') != 'channels.json':
return
if not os.path.exists(DEFAULT_JSON):
return
mtime = os.path.getmtime(DEFAULT_JSON)
if mtime <= _json_mtime:
return
_json_mtime = mtime
try:
with open(DEFAULT_JSON, encoding='utf-8') as f:
load_from_json_content(json.loads(f.read()), 'channels.json')
except Exception:
pass # keep existing state on parse error
def _channels_response():
return jsonify({
'channels': state['channels'],
'groups': state['groups'],
'group_meta': state['group_meta'],
'source': state['source_file'],
'total': len(state['channels']),
})
@app.route('/api/channels')
def api_channels():
_maybe_reload_json()
return _channels_response()
@app.route('/api/import/m3u', methods=['POST'])
def api_import_m3u():
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
f = request.files['file']
try:
content = f.read().decode('utf-8', errors='replace')
load_from_m3u_content(content, f.filename or 'playlist.m3u')
except Exception as exc:
return jsonify({'error': str(exc)}), 400
return _channels_response()
@app.route('/api/import/json', methods=['POST'])
def api_import_json():
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
f = request.files['file']
try:
data = json.loads(f.read().decode('utf-8', errors='replace'))
load_from_json_content(data, f.filename or 'channels.json')
except Exception as exc:
return jsonify({'error': str(exc)}), 400
return _channels_response()
@app.route('/api/export/json')
def api_export_json():
export = {
'version': '1.0',
'exported_at': datetime.now(timezone.utc).isoformat(),
'source': state['source_file'],
'channels': state['channels'],
'groups': [
{'name': g, 'logo': state['group_meta'].get(g, ''), 'subcategories': []}
for g in state['groups']
],
}
return Response(
json.dumps(export, ensure_ascii=False, indent=2),
mimetype='application/json',
headers={'Content-Disposition': 'attachment; filename="channels.json"'},
)
@app.route('/api/export/m3u')
def api_export_m3u():
lines = ['#EXTM3U']
for ch in state['channels']:
for m in ch['mirrors']:
res = m.get('resolution', '')
entry_name = f'{ch["name"]} {res}'.strip() if res else ch["name"]
attrs = (
f'tvg-id="{ch.get("id", "")}" '
f'tvg-logo="{ch.get("logo_url", "")}" '
f'group-title="{ch["group"]}"'
)
lines.append(f'#EXTINF:-1 {attrs},{entry_name}')
lines.append(f'acestream://{m["acestream_hash"]}')
return Response(
'\n'.join(lines),
mimetype='text/plain',
headers={'Content-Disposition': 'attachment; filename="channels.m3u"'},
)
@app.route('/api/mirror/status', methods=['POST'])
def api_mirror_status():
data = request.get_json()
channel_id = data.get('channel_id')
acestream_hash = data.get('acestream_hash')
status = data.get('status', 'unknown')
if status not in ('ok', 'issues', 'broken', 'unknown'):
return jsonify({'error': 'invalid status'}), 400
ch = next((c for c in state['channels'] if c['id'] == channel_id), None)
if not ch:
return jsonify({'error': 'not found'}), 404
m = next((m for m in ch['mirrors'] if m['acestream_hash'] == acestream_hash), None)
if not m:
return jsonify({'error': 'not found'}), 404
m['status'] = status
export = {
'version': '1.0',
'exported_at': datetime.now(timezone.utc).isoformat(),
'source': state['source_file'],
'channels': state['channels'],
'groups': [
{'name': g, 'logo': state['group_meta'].get(g, ''), 'subcategories': []}
for g in state['groups']
],
}
with open(DEFAULT_JSON, 'w', encoding='utf-8') as f:
json.dump(export, f, ensure_ascii=False, indent=2)
return jsonify({'ok': True})
# ── Startup ───────────────────────────────────────────────────────────────────
def startup() -> None:
global logo_index, _json_mtime
print('Building logo index…', end=' ', flush=True)
logo_index = build_logo_index(LOGOS_ROOT)
print(f'{len(logo_index)} entries indexed.')
if os.path.exists(DEFAULT_JSON):
try:
_json_mtime = os.path.getmtime(DEFAULT_JSON)
with open(DEFAULT_JSON, encoding='utf-8') as f:
load_from_json_content(json.loads(f.read()), 'channels.json')
print(f'Loaded {len(state["channels"])} channels from channels.json')
except Exception as exc:
print(f'Warning: could not load channels.json ({exc}), falling back to example.m3u')
_json_mtime = 0.0
if os.path.exists(DEFAULT_M3U):
with open(DEFAULT_M3U, encoding='utf-8') as f:
load_from_m3u_content(f.read(), 'example.m3u')
elif os.path.exists(DEFAULT_M3U):
with open(DEFAULT_M3U, encoding='utf-8') as f:
load_from_m3u_content(f.read(), 'example.m3u')
print(f'Loaded {len(state["channels"])} channels from example.m3u')
startup()
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000, use_reloader=False)