530 lines
19 KiB
Python
530 lines
19 KiB
Python
import os
|
||
import re
|
||
import json
|
||
import unicodedata
|
||
from datetime import datetime, timezone
|
||
from flask import Flask, jsonify, request, send_from_directory, Response
|
||
|
||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
LOGOS_ROOT = os.path.join(BASE_DIR, 'static', 'logos')
|
||
FLAGS_ROOT = os.path.join(BASE_DIR, 'static', 'flags')
|
||
DEFAULT_JSON = os.path.join(BASE_DIR, 'channels.json')
|
||
DEFAULT_M3U = os.path.join(BASE_DIR, 'example.m3u')
|
||
STATIC_DIR = os.path.join(BASE_DIR, 'static')
|
||
|
||
# Maps tv-logos-main/countries/{folder} → ISO 3166-1 alpha-2 code
|
||
COUNTRY_FOLDER_TO_ISO: dict = {
|
||
'albania': 'al', 'argentina': 'ar', 'australia': 'au', 'austria': 'at',
|
||
'azerbaijan': 'az', 'belgium': 'be', 'brazil': 'br', 'bulgaria': 'bg',
|
||
'canada': 'ca', 'caribbean': '', 'chile': 'cl', 'costa-rica': 'cr',
|
||
'croatia': 'hr', 'czech-republic': 'cz', 'france': 'fr', 'germany': 'de',
|
||
'greece': 'gr', 'hong-kong': 'hk', 'hungary': 'hu', 'india': 'in',
|
||
'indonesia': 'id', 'international': '', 'ireland': 'ie', 'israel': 'il',
|
||
'italy': 'it', 'lebanon': 'lb', 'lithuania': 'lt', 'luxembourg': 'lu',
|
||
'malaysia': 'my', 'malta': 'mt', 'mexico': 'mx', 'netherlands': 'nl',
|
||
'new-zealand': 'nz', 'nordic': '', 'philippines': 'ph', 'poland': 'pl',
|
||
'portugal': 'pt', 'romania': 'ro', 'russia': 'ru', 'serbia': 'rs',
|
||
'singapore': 'sg', 'slovakia': 'sk', 'slovenia': 'si', 'spain': 'es',
|
||
'switzerland': 'ch', 'turkey': 'tr', 'ukraine': 'ua',
|
||
'united-arab-emirates': 'ae', 'united-kingdom': 'gb', 'united-states': 'us',
|
||
'world-africa': '', 'world-asia': '', 'world-europe': '',
|
||
'world-latin-america': '', 'world-middle-east': '',
|
||
}
|
||
|
||
app = Flask(__name__, static_folder=STATIC_DIR)
|
||
app.config['MAX_CONTENT_LENGTH'] = 32 * 1024 * 1024 # 32 MB max upload
|
||
|
||
# ── Global state ─────────────────────────────────────────────────────────────
|
||
logo_index: dict = {}
|
||
state: dict = {'channels': [], 'groups': [], 'group_meta': {}, 'source_file': ''}
|
||
_json_mtime: float = 0.0
|
||
|
||
|
||
# ── Logo index ────────────────────────────────────────────────────────────────
|
||
|
||
def build_logo_index(logos_root: str) -> dict:
|
||
"""Walk tv-logos-main/ recursively and build a slug → relative-path index.
|
||
|
||
Each PNG stem is indexed both with and without the trailing country/region
|
||
suffix (e.g. "-es", "-uk", "-int") so that channel-name slugs can match
|
||
without needing to know the country code.
|
||
"""
|
||
index: dict = {}
|
||
for root, _dirs, files in os.walk(logos_root):
|
||
for fname in files:
|
||
if not fname.lower().endswith('.png'):
|
||
continue
|
||
stem = fname[:-4]
|
||
rel = os.path.relpath(os.path.join(root, fname), logos_root).replace(os.sep, '/')
|
||
index[stem] = rel
|
||
# Index without trailing 2-3 letter country/region suffix
|
||
no_suffix = re.sub(r'-[a-z]{2,3}$', '', stem)
|
||
if no_suffix and no_suffix != stem and no_suffix not in index:
|
||
index[no_suffix] = rel
|
||
return index
|
||
|
||
|
||
def normalize_to_slug(text: str) -> str:
|
||
"""Convert a channel name or tvg-id to a kebab-case slug for logo lookup."""
|
||
s = text.lower()
|
||
# Strip common quality/HD labels that don't appear in file names
|
||
s = re.sub(r'\s+(sd|hd|fhd|uhd|4k)\b', '', s)
|
||
s = re.sub(r'\s+(480p|720p|1080p|1440p|2160p)\b', '', s)
|
||
# Strip trailing quality markers (* **)
|
||
s = re.sub(r'\s*\*+\s*$', '', s)
|
||
# Expand M+ prefix to m-plus so it can match logo file stems
|
||
s = s.replace('m+', 'm-plus')
|
||
# Normalise unicode (strip accents)
|
||
s = unicodedata.normalize('NFD', s)
|
||
s = ''.join(c for c in s if unicodedata.category(c) != 'Mn')
|
||
# Replace any non-alphanumeric runs with a single hyphen
|
||
s = re.sub(r'[^a-z0-9]+', '-', s)
|
||
s = re.sub(r'-+', '-', s).strip('-')
|
||
return s
|
||
|
||
|
||
def resolve_logo(tvg_id: str, base_name: str, tvg_logo: str) -> tuple:
|
||
"""Return (url, source) for the best available logo.
|
||
|
||
Priority:
|
||
1. Local logo matched by tvg_id slug (exact, then prefix)
|
||
2. Local logo matched by base_name slug (exact, then prefix)
|
||
3. External URL from the M3U tvg-logo attribute
|
||
4. Empty string / 'none'
|
||
"""
|
||
def lookup(slug: str):
|
||
if slug in logo_index:
|
||
return f'logos/{logo_index[slug]}', 'local'
|
||
# Prefix match: allows "dazn-laliga" → "dazn-laliga-es" when no-suffix
|
||
# alias wasn't created (e.g. the suffix was longer than 3 chars)
|
||
for key, path in logo_index.items():
|
||
if key.startswith(slug) and len(key) <= len(slug) + 4:
|
||
return f'logos/{path}', 'local'
|
||
return None, None
|
||
|
||
if tvg_id:
|
||
url, src = lookup(normalize_to_slug(tvg_id))
|
||
if url:
|
||
return url, src
|
||
|
||
url, src = lookup(normalize_to_slug(base_name))
|
||
if url:
|
||
return url, src
|
||
|
||
if tvg_logo:
|
||
return tvg_logo, 'tvg_logo'
|
||
|
||
return '', 'none'
|
||
|
||
|
||
# ── M3U parser ────────────────────────────────────────────────────────────────
|
||
|
||
_EXTINF_ATTRS = re.compile(r'([\w-]+)="([^"]*)"')
|
||
_ACESTREAM_RE = re.compile(r'^acestream://([a-f0-9]{40})', re.IGNORECASE)
|
||
_RESOLUTION_RE = re.compile(r'\s+(480p|720p|1080p|1440p|2160p|4[Kk]|UHD|FHD)\b\s*$', re.IGNORECASE)
|
||
_QUALITY_MARKER_RE = re.compile(r'\s*(\*+)\s*$')
|
||
_EXTGRP_RE = re.compile(r'group-title="([^"]*)".*?group-logo="([^"]*)"')
|
||
|
||
|
||
def parse_channel_name(raw_name: str) -> dict:
|
||
"""Extract base_name, resolution and quality_marker from a raw M3U channel name.
|
||
|
||
Example: "DAZN LaLiga 1080p **"
|
||
→ {'base_name': 'DAZN LaLiga', 'resolution': '1080p', 'quality_marker': '**'}
|
||
"""
|
||
name = raw_name.strip()
|
||
|
||
# 1. Strip trailing quality marker (*** / ** / *)
|
||
qm = _QUALITY_MARKER_RE.search(name)
|
||
quality_marker = qm.group(1) if qm else ''
|
||
if qm:
|
||
name = name[:qm.start()]
|
||
|
||
# 2. Strip trailing resolution token
|
||
res_m = _RESOLUTION_RE.search(name)
|
||
if res_m:
|
||
raw_res = res_m.group(1)
|
||
norm = raw_res.upper()
|
||
if norm == 'FHD':
|
||
resolution = '1080p'
|
||
elif norm in ('UHD', '2160P'):
|
||
resolution = '4K'
|
||
elif norm.startswith('4K') or norm.startswith('4k'):
|
||
resolution = '4K'
|
||
else:
|
||
resolution = raw_res # keeps "720p" / "1080p" as-is
|
||
name = name[:res_m.start()]
|
||
else:
|
||
resolution = ''
|
||
|
||
return {
|
||
'base_name': name.strip(),
|
||
'resolution': resolution,
|
||
'quality_marker': quality_marker,
|
||
}
|
||
|
||
|
||
def slugify(text: str) -> str:
|
||
"""Produce a URL-safe slug from arbitrary text."""
|
||
s = text.lower()
|
||
s = unicodedata.normalize('NFD', s)
|
||
s = ''.join(c for c in s if unicodedata.category(c) != 'Mn')
|
||
s = re.sub(r'[^a-z0-9]+', '-', s).strip('-')
|
||
return re.sub(r'-+', '-', s)
|
||
|
||
|
||
def parse_m3u(content: str) -> tuple:
|
||
"""Parse raw M3U text and return (raw_entries, group_meta).
|
||
|
||
raw_entries: list of dicts with keys tvg_id, tvg_logo, group_title,
|
||
raw_name, acestream_hash, acestream_url.
|
||
group_meta: dict mapping group name → logo URL (from #EXTGRP lines).
|
||
"""
|
||
raw_entries = []
|
||
group_meta: dict = {}
|
||
pending = None
|
||
|
||
for line in content.splitlines():
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
|
||
if line.startswith('#EXTGRP:'):
|
||
m = _EXTGRP_RE.search(line)
|
||
if m:
|
||
group_meta[m.group(1)] = m.group(2)
|
||
|
||
elif line.startswith('#EXTINF:'):
|
||
attrs = dict(_EXTINF_ATTRS.findall(line))
|
||
name = line.split(',', 1)[1].strip() if ',' in line else ''
|
||
pending = {
|
||
'tvg_id': attrs.get('tvg-id', ''),
|
||
'tvg_logo': attrs.get('tvg-logo', ''),
|
||
'group_title': attrs.get('group-title', ''),
|
||
'raw_name': name,
|
||
}
|
||
|
||
elif pending and (m := _ACESTREAM_RE.match(line)):
|
||
pending['acestream_hash'] = m.group(1)
|
||
pending['acestream_url'] = f"acestream://{m.group(1)}"
|
||
raw_entries.append(pending)
|
||
pending = None
|
||
|
||
elif line.startswith('#'):
|
||
pass # other directives – keep pending
|
||
|
||
elif pending:
|
||
pending = None # unexpected non-acestream URL line
|
||
|
||
return raw_entries, group_meta
|
||
|
||
|
||
def country_code_from_logo_url(logo_url: str, tvg_logo: str) -> str:
|
||
"""Derive a 2-letter ISO country code from the resolved logo URL or original tvg-logo URL."""
|
||
for url in (logo_url, tvg_logo):
|
||
if not url:
|
||
continue
|
||
# Pattern: /countries/{folder}/ (local path or GitHub raw URL)
|
||
m = re.search(r'/countries/([^/]+)/', url)
|
||
if m:
|
||
code = COUNTRY_FOLDER_TO_ISO.get(m.group(1), '')
|
||
if code:
|
||
return code
|
||
# Pattern: -{2letter}.png at end of filename
|
||
m = re.search(r'-([a-z]{2})(?:-[a-z]+)?\.(?:png|svg|jpg)$', url.lower())
|
||
if m:
|
||
return m.group(1)
|
||
return ''
|
||
|
||
|
||
def group_channels(raw_entries: list, group_meta: dict) -> list:
|
||
"""Merge raw entries into Channel objects grouped by (base_name, group).
|
||
|
||
Channels with the same base_name in the same group are considered mirrors
|
||
of the same channel and are grouped into a single Channel entry.
|
||
"""
|
||
channel_map: dict = {}
|
||
order: list = []
|
||
id_seen: dict = {}
|
||
|
||
for entry in raw_entries:
|
||
parsed = parse_channel_name(entry['raw_name'])
|
||
base_name = parsed['base_name']
|
||
group = entry['group_title']
|
||
key = (base_name.casefold(), group.casefold())
|
||
|
||
mirror = {
|
||
'resolution': parsed['resolution'],
|
||
'acestream_hash': entry['acestream_hash'],
|
||
'status': 'unknown',
|
||
'country_code': country_code_from_logo_url('', entry['tvg_logo']),
|
||
}
|
||
|
||
if key not in channel_map:
|
||
base_id = slugify(base_name) or slugify(group)
|
||
if base_id in id_seen:
|
||
id_seen[base_id] += 1
|
||
channel_id = f'{base_id}-{id_seen[base_id]}'
|
||
else:
|
||
id_seen[base_id] = 0
|
||
channel_id = base_id
|
||
|
||
channel_map[key] = {
|
||
'id': channel_id,
|
||
'name': base_name,
|
||
'group': group,
|
||
'subcategory': '',
|
||
'country_code': '',
|
||
'logo_url': '',
|
||
'tags': [],
|
||
'_tvg_id': entry['tvg_id'], # internal
|
||
'_tvg_logo': entry['tvg_logo'], # internal
|
||
'mirrors': [],
|
||
}
|
||
order.append(key)
|
||
else:
|
||
ch = channel_map[key]
|
||
if not ch['_tvg_id'] and entry['tvg_id']:
|
||
ch['_tvg_id'] = entry['tvg_id']
|
||
if not ch['_tvg_logo'] and entry['tvg_logo']:
|
||
ch['_tvg_logo'] = entry['tvg_logo']
|
||
|
||
channel_map[key]['mirrors'].append(mirror)
|
||
|
||
channels = []
|
||
for key in order:
|
||
ch = channel_map[key]
|
||
tvg_id = ch.pop('_tvg_id', '')
|
||
tvg_logo = ch.pop('_tvg_logo', '')
|
||
logo_url, _ = resolve_logo(tvg_id, ch['name'], tvg_logo)
|
||
ch['logo_url'] = logo_url
|
||
ch['country_code'] = country_code_from_logo_url(logo_url, tvg_logo)
|
||
channels.append(ch)
|
||
|
||
return channels
|
||
|
||
|
||
def load_from_m3u_content(content: str, source_file: str) -> None:
|
||
global state
|
||
raw_entries, group_meta = parse_m3u(content)
|
||
channels = group_channels(raw_entries, group_meta)
|
||
groups = sorted({ch['group'] for ch in channels})
|
||
state = {
|
||
'channels': channels,
|
||
'groups': groups,
|
||
'group_meta': group_meta,
|
||
'source_file': source_file,
|
||
}
|
||
|
||
|
||
def load_from_json_content(data: dict, source_file: str) -> None:
|
||
global state
|
||
channels = data.get('channels', [])
|
||
# backwards compat: old exports used base_name instead of name
|
||
for ch in channels:
|
||
if 'base_name' in ch and 'name' not in ch:
|
||
ch['name'] = ch.pop('base_name')
|
||
ch.setdefault('tags', [])
|
||
ch.setdefault('country_code', '')
|
||
for m in ch.get('mirrors', []):
|
||
# old exports had acestream_url; drop it
|
||
m.pop('acestream_url', None)
|
||
m.pop('raw_name', None)
|
||
m.pop('quality_marker', None)
|
||
m.setdefault('status', 'unknown')
|
||
m.setdefault('country_code', '')
|
||
group_meta = {g['name']: g.get('logo', '') for g in data.get('groups', [])}
|
||
groups = sorted({ch['group'] for ch in channels})
|
||
state = {
|
||
'channels': channels,
|
||
'groups': groups,
|
||
'group_meta': group_meta,
|
||
'source_file': source_file,
|
||
}
|
||
|
||
|
||
# ── Flask routes ──────────────────────────────────────────────────────────────
|
||
|
||
@app.route('/')
|
||
def index():
|
||
resp = send_from_directory(STATIC_DIR, 'index.html')
|
||
resp.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
|
||
return resp
|
||
|
||
|
||
@app.route('/ping')
|
||
def ping():
|
||
return 'OK'
|
||
|
||
|
||
@app.route('/logos/<path:filename>')
|
||
def serve_logo(filename):
|
||
return send_from_directory(LOGOS_ROOT, filename)
|
||
|
||
|
||
@app.route('/flags/<path:filename>')
|
||
def serve_flag(filename):
|
||
return send_from_directory(FLAGS_ROOT, filename)
|
||
|
||
|
||
def _maybe_reload_json() -> None:
|
||
"""Reload channels.json if it has been modified since last load."""
|
||
global _json_mtime
|
||
if state.get('source_file') != 'channels.json':
|
||
return
|
||
if not os.path.exists(DEFAULT_JSON):
|
||
return
|
||
mtime = os.path.getmtime(DEFAULT_JSON)
|
||
if mtime <= _json_mtime:
|
||
return
|
||
_json_mtime = mtime
|
||
try:
|
||
with open(DEFAULT_JSON, encoding='utf-8') as f:
|
||
load_from_json_content(json.loads(f.read()), 'channels.json')
|
||
except Exception:
|
||
pass # keep existing state on parse error
|
||
|
||
|
||
def _channels_response():
|
||
return jsonify({
|
||
'channels': state['channels'],
|
||
'groups': state['groups'],
|
||
'group_meta': state['group_meta'],
|
||
'source': state['source_file'],
|
||
'total': len(state['channels']),
|
||
})
|
||
|
||
|
||
@app.route('/api/channels')
|
||
def api_channels():
|
||
_maybe_reload_json()
|
||
return _channels_response()
|
||
|
||
|
||
@app.route('/api/import/m3u', methods=['POST'])
|
||
def api_import_m3u():
|
||
if 'file' not in request.files:
|
||
return jsonify({'error': 'No file provided'}), 400
|
||
f = request.files['file']
|
||
try:
|
||
content = f.read().decode('utf-8', errors='replace')
|
||
load_from_m3u_content(content, f.filename or 'playlist.m3u')
|
||
except Exception as exc:
|
||
return jsonify({'error': str(exc)}), 400
|
||
return _channels_response()
|
||
|
||
|
||
@app.route('/api/import/json', methods=['POST'])
|
||
def api_import_json():
|
||
if 'file' not in request.files:
|
||
return jsonify({'error': 'No file provided'}), 400
|
||
f = request.files['file']
|
||
try:
|
||
data = json.loads(f.read().decode('utf-8', errors='replace'))
|
||
load_from_json_content(data, f.filename or 'channels.json')
|
||
except Exception as exc:
|
||
return jsonify({'error': str(exc)}), 400
|
||
return _channels_response()
|
||
|
||
|
||
@app.route('/api/export/json')
|
||
def api_export_json():
|
||
export = {
|
||
'version': '1.0',
|
||
'exported_at': datetime.now(timezone.utc).isoformat(),
|
||
'source': state['source_file'],
|
||
'channels': state['channels'],
|
||
'groups': [
|
||
{'name': g, 'logo': state['group_meta'].get(g, ''), 'subcategories': []}
|
||
for g in state['groups']
|
||
],
|
||
}
|
||
return Response(
|
||
json.dumps(export, ensure_ascii=False, indent=2),
|
||
mimetype='application/json',
|
||
headers={'Content-Disposition': 'attachment; filename="channels.json"'},
|
||
)
|
||
|
||
|
||
@app.route('/api/export/m3u')
|
||
def api_export_m3u():
|
||
lines = ['#EXTM3U']
|
||
for ch in state['channels']:
|
||
for m in ch['mirrors']:
|
||
res = m.get('resolution', '')
|
||
entry_name = f'{ch["name"]} {res}'.strip() if res else ch["name"]
|
||
attrs = (
|
||
f'tvg-id="{ch.get("id", "")}" '
|
||
f'tvg-logo="{ch.get("logo_url", "")}" '
|
||
f'group-title="{ch["group"]}"'
|
||
)
|
||
lines.append(f'#EXTINF:-1 {attrs},{entry_name}')
|
||
lines.append(f'acestream://{m["acestream_hash"]}')
|
||
return Response(
|
||
'\n'.join(lines),
|
||
mimetype='text/plain',
|
||
headers={'Content-Disposition': 'attachment; filename="channels.m3u"'},
|
||
)
|
||
|
||
|
||
@app.route('/api/mirror/status', methods=['POST'])
|
||
def api_mirror_status():
|
||
data = request.get_json()
|
||
channel_id = data.get('channel_id')
|
||
acestream_hash = data.get('acestream_hash')
|
||
status = data.get('status', 'unknown')
|
||
if status not in ('ok', 'issues', 'broken', 'unknown'):
|
||
return jsonify({'error': 'invalid status'}), 400
|
||
ch = next((c for c in state['channels'] if c['id'] == channel_id), None)
|
||
if not ch:
|
||
return jsonify({'error': 'not found'}), 404
|
||
m = next((m for m in ch['mirrors'] if m['acestream_hash'] == acestream_hash), None)
|
||
if not m:
|
||
return jsonify({'error': 'not found'}), 404
|
||
m['status'] = status
|
||
export = {
|
||
'version': '1.0',
|
||
'exported_at': datetime.now(timezone.utc).isoformat(),
|
||
'source': state['source_file'],
|
||
'channels': state['channels'],
|
||
'groups': [
|
||
{'name': g, 'logo': state['group_meta'].get(g, ''), 'subcategories': []}
|
||
for g in state['groups']
|
||
],
|
||
}
|
||
with open(DEFAULT_JSON, 'w', encoding='utf-8') as f:
|
||
json.dump(export, f, ensure_ascii=False, indent=2)
|
||
return jsonify({'ok': True})
|
||
|
||
|
||
# ── Startup ───────────────────────────────────────────────────────────────────
|
||
|
||
def startup() -> None:
|
||
global logo_index, _json_mtime
|
||
print('Building logo index…', end=' ', flush=True)
|
||
logo_index = build_logo_index(LOGOS_ROOT)
|
||
print(f'{len(logo_index)} entries indexed.')
|
||
|
||
if os.path.exists(DEFAULT_JSON):
|
||
try:
|
||
_json_mtime = os.path.getmtime(DEFAULT_JSON)
|
||
with open(DEFAULT_JSON, encoding='utf-8') as f:
|
||
load_from_json_content(json.loads(f.read()), 'channels.json')
|
||
print(f'Loaded {len(state["channels"])} channels from channels.json')
|
||
except Exception as exc:
|
||
print(f'Warning: could not load channels.json ({exc}), falling back to example.m3u')
|
||
_json_mtime = 0.0
|
||
if os.path.exists(DEFAULT_M3U):
|
||
with open(DEFAULT_M3U, encoding='utf-8') as f:
|
||
load_from_m3u_content(f.read(), 'example.m3u')
|
||
elif os.path.exists(DEFAULT_M3U):
|
||
with open(DEFAULT_M3U, encoding='utf-8') as f:
|
||
load_from_m3u_content(f.read(), 'example.m3u')
|
||
print(f'Loaded {len(state["channels"])} channels from example.m3u')
|
||
|
||
|
||
startup()
|
||
|
||
if __name__ == '__main__':
|
||
app.run(debug=True, host='0.0.0.0', port=5000, use_reloader=False)
|