antes-de-programacion

This commit is contained in:
2026-03-05 08:53:37 +01:00
parent 31b76c8d45
commit 73ff9c77cb
11110 changed files with 20335 additions and 0 deletions

495
app.py Normal file
View File

@@ -0,0 +1,495 @@
import os
import re
import json
import unicodedata
from datetime import datetime, timezone
from flask import Flask, jsonify, request, send_from_directory, Response
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
LOGOS_ROOT = os.path.join(BASE_DIR, 'tv-logos-main')
FLAGS_ROOT = os.path.join(BASE_DIR, 'svg')
DEFAULT_JSON = os.path.join(BASE_DIR, 'channels.json')
DEFAULT_M3U = os.path.join(BASE_DIR, 'example.m3u')
STATIC_DIR = os.path.join(BASE_DIR, 'static')
# Maps tv-logos-main/countries/{folder} → ISO 3166-1 alpha-2 code
COUNTRY_FOLDER_TO_ISO: dict = {
'albania': 'al', 'argentina': 'ar', 'australia': 'au', 'austria': 'at',
'azerbaijan': 'az', 'belgium': 'be', 'brazil': 'br', 'bulgaria': 'bg',
'canada': 'ca', 'caribbean': '', 'chile': 'cl', 'costa-rica': 'cr',
'croatia': 'hr', 'czech-republic': 'cz', 'france': 'fr', 'germany': 'de',
'greece': 'gr', 'hong-kong': 'hk', 'hungary': 'hu', 'india': 'in',
'indonesia': 'id', 'international': '', 'ireland': 'ie', 'israel': 'il',
'italy': 'it', 'lebanon': 'lb', 'lithuania': 'lt', 'luxembourg': 'lu',
'malaysia': 'my', 'malta': 'mt', 'mexico': 'mx', 'netherlands': 'nl',
'new-zealand': 'nz', 'nordic': '', 'philippines': 'ph', 'poland': 'pl',
'portugal': 'pt', 'romania': 'ro', 'russia': 'ru', 'serbia': 'rs',
'singapore': 'sg', 'slovakia': 'sk', 'slovenia': 'si', 'spain': 'es',
'switzerland': 'ch', 'turkey': 'tr', 'ukraine': 'ua',
'united-arab-emirates': 'ae', 'united-kingdom': 'gb', 'united-states': 'us',
'world-africa': '', 'world-asia': '', 'world-europe': '',
'world-latin-america': '', 'world-middle-east': '',
}
app = Flask(__name__, static_folder=STATIC_DIR)
app.config['MAX_CONTENT_LENGTH'] = 32 * 1024 * 1024 # 32 MB max upload
# ── Global state ─────────────────────────────────────────────────────────────
logo_index: dict = {}
state: dict = {'channels': [], 'groups': [], 'group_meta': {}, 'source_file': ''}
_json_mtime: float = 0.0
# ── Logo index ────────────────────────────────────────────────────────────────
def build_logo_index(logos_root: str) -> dict:
"""Walk tv-logos-main/ recursively and build a slug → relative-path index.
Each PNG stem is indexed both with and without the trailing country/region
suffix (e.g. "-es", "-uk", "-int") so that channel-name slugs can match
without needing to know the country code.
"""
index: dict = {}
for root, _dirs, files in os.walk(logos_root):
for fname in files:
if not fname.lower().endswith('.png'):
continue
stem = fname[:-4]
rel = os.path.relpath(os.path.join(root, fname), logos_root).replace(os.sep, '/')
index[stem] = rel
# Index without trailing 2-3 letter country/region suffix
no_suffix = re.sub(r'-[a-z]{2,3}$', '', stem)
if no_suffix and no_suffix != stem and no_suffix not in index:
index[no_suffix] = rel
return index
def normalize_to_slug(text: str) -> str:
"""Convert a channel name or tvg-id to a kebab-case slug for logo lookup."""
s = text.lower()
# Strip common quality/HD labels that don't appear in file names
s = re.sub(r'\s+(sd|hd|fhd|uhd|4k)\b', '', s)
s = re.sub(r'\s+(480p|720p|1080p|1440p|2160p)\b', '', s)
# Strip trailing quality markers (* **)
s = re.sub(r'\s*\*+\s*$', '', s)
# Expand M+ prefix to m-plus so it can match logo file stems
s = s.replace('m+', 'm-plus')
# Normalise unicode (strip accents)
s = unicodedata.normalize('NFD', s)
s = ''.join(c for c in s if unicodedata.category(c) != 'Mn')
# Replace any non-alphanumeric runs with a single hyphen
s = re.sub(r'[^a-z0-9]+', '-', s)
s = re.sub(r'-+', '-', s).strip('-')
return s
def resolve_logo(tvg_id: str, base_name: str, tvg_logo: str) -> tuple:
"""Return (url, source) for the best available logo.
Priority:
1. Local logo matched by tvg_id slug (exact, then prefix)
2. Local logo matched by base_name slug (exact, then prefix)
3. External URL from the M3U tvg-logo attribute
4. Empty string / 'none'
"""
def lookup(slug: str):
if slug in logo_index:
return f'/logos/{logo_index[slug]}', 'local'
# Prefix match: allows "dazn-laliga" → "dazn-laliga-es" when no-suffix
# alias wasn't created (e.g. the suffix was longer than 3 chars)
for key, path in logo_index.items():
if key.startswith(slug) and len(key) <= len(slug) + 4:
return f'/logos/{path}', 'local'
return None, None
if tvg_id:
url, src = lookup(normalize_to_slug(tvg_id))
if url:
return url, src
url, src = lookup(normalize_to_slug(base_name))
if url:
return url, src
if tvg_logo:
return tvg_logo, 'tvg_logo'
return '', 'none'
# ── M3U parser ────────────────────────────────────────────────────────────────
_EXTINF_ATTRS = re.compile(r'([\w-]+)="([^"]*)"')
_ACESTREAM_RE = re.compile(r'^acestream://([a-f0-9]{40})', re.IGNORECASE)
_RESOLUTION_RE = re.compile(r'\s+(480p|720p|1080p|1440p|2160p|4[Kk]|UHD|FHD)\b\s*$', re.IGNORECASE)
_QUALITY_MARKER_RE = re.compile(r'\s*(\*+)\s*$')
_EXTGRP_RE = re.compile(r'group-title="([^"]*)".*?group-logo="([^"]*)"')
def parse_channel_name(raw_name: str) -> dict:
"""Extract base_name, resolution and quality_marker from a raw M3U channel name.
Example: "DAZN LaLiga 1080p **"
{'base_name': 'DAZN LaLiga', 'resolution': '1080p', 'quality_marker': '**'}
"""
name = raw_name.strip()
# 1. Strip trailing quality marker (*** / ** / *)
qm = _QUALITY_MARKER_RE.search(name)
quality_marker = qm.group(1) if qm else ''
if qm:
name = name[:qm.start()]
# 2. Strip trailing resolution token
res_m = _RESOLUTION_RE.search(name)
if res_m:
raw_res = res_m.group(1)
norm = raw_res.upper()
if norm == 'FHD':
resolution = '1080p'
elif norm in ('UHD', '2160P'):
resolution = '4K'
elif norm.startswith('4K') or norm.startswith('4k'):
resolution = '4K'
else:
resolution = raw_res # keeps "720p" / "1080p" as-is
name = name[:res_m.start()]
else:
resolution = ''
return {
'base_name': name.strip(),
'resolution': resolution,
'quality_marker': quality_marker,
}
def slugify(text: str) -> str:
"""Produce a URL-safe slug from arbitrary text."""
s = text.lower()
s = unicodedata.normalize('NFD', s)
s = ''.join(c for c in s if unicodedata.category(c) != 'Mn')
s = re.sub(r'[^a-z0-9]+', '-', s).strip('-')
return re.sub(r'-+', '-', s)
def parse_m3u(content: str) -> tuple:
"""Parse raw M3U text and return (raw_entries, group_meta).
raw_entries: list of dicts with keys tvg_id, tvg_logo, group_title,
raw_name, acestream_hash, acestream_url.
group_meta: dict mapping group name → logo URL (from #EXTGRP lines).
"""
raw_entries = []
group_meta: dict = {}
pending = None
for line in content.splitlines():
line = line.strip()
if not line:
continue
if line.startswith('#EXTGRP:'):
m = _EXTGRP_RE.search(line)
if m:
group_meta[m.group(1)] = m.group(2)
elif line.startswith('#EXTINF:'):
attrs = dict(_EXTINF_ATTRS.findall(line))
name = line.split(',', 1)[1].strip() if ',' in line else ''
pending = {
'tvg_id': attrs.get('tvg-id', ''),
'tvg_logo': attrs.get('tvg-logo', ''),
'group_title': attrs.get('group-title', ''),
'raw_name': name,
}
elif pending and (m := _ACESTREAM_RE.match(line)):
pending['acestream_hash'] = m.group(1)
pending['acestream_url'] = f"acestream://{m.group(1)}"
raw_entries.append(pending)
pending = None
elif line.startswith('#'):
pass # other directives keep pending
elif pending:
pending = None # unexpected non-acestream URL line
return raw_entries, group_meta
def country_code_from_logo_url(logo_url: str, tvg_logo: str) -> str:
"""Derive a 2-letter ISO country code from the resolved logo URL or original tvg-logo URL."""
for url in (logo_url, tvg_logo):
if not url:
continue
# Pattern: /countries/{folder}/ (local path or GitHub raw URL)
m = re.search(r'/countries/([^/]+)/', url)
if m:
code = COUNTRY_FOLDER_TO_ISO.get(m.group(1), '')
if code:
return code
# Pattern: -{2letter}.png at end of filename
m = re.search(r'-([a-z]{2})(?:-[a-z]+)?\.(?:png|svg|jpg)$', url.lower())
if m:
return m.group(1)
return ''
def group_channels(raw_entries: list, group_meta: dict) -> list:
"""Merge raw entries into Channel objects grouped by (base_name, group).
Channels with the same base_name in the same group are considered mirrors
of the same channel and are grouped into a single Channel entry.
"""
channel_map: dict = {}
order: list = []
id_seen: dict = {}
for entry in raw_entries:
parsed = parse_channel_name(entry['raw_name'])
base_name = parsed['base_name']
group = entry['group_title']
key = (base_name.casefold(), group.casefold())
mirror = {
'resolution': parsed['resolution'],
'acestream_hash': entry['acestream_hash'],
}
if key not in channel_map:
base_id = slugify(base_name) or slugify(group)
if base_id in id_seen:
id_seen[base_id] += 1
channel_id = f'{base_id}-{id_seen[base_id]}'
else:
id_seen[base_id] = 0
channel_id = base_id
channel_map[key] = {
'id': channel_id,
'name': base_name,
'group': group,
'subcategory': '',
'country_code': '',
'logo_url': '',
'tags': [],
'_tvg_id': entry['tvg_id'], # internal
'_tvg_logo': entry['tvg_logo'], # internal
'mirrors': [],
}
order.append(key)
else:
ch = channel_map[key]
if not ch['_tvg_id'] and entry['tvg_id']:
ch['_tvg_id'] = entry['tvg_id']
if not ch['_tvg_logo'] and entry['tvg_logo']:
ch['_tvg_logo'] = entry['tvg_logo']
channel_map[key]['mirrors'].append(mirror)
channels = []
for key in order:
ch = channel_map[key]
tvg_id = ch.pop('_tvg_id', '')
tvg_logo = ch.pop('_tvg_logo', '')
logo_url, _ = resolve_logo(tvg_id, ch['name'], tvg_logo)
ch['logo_url'] = logo_url
ch['country_code'] = country_code_from_logo_url(logo_url, tvg_logo)
channels.append(ch)
return channels
def load_from_m3u_content(content: str, source_file: str) -> None:
global state
raw_entries, group_meta = parse_m3u(content)
channels = group_channels(raw_entries, group_meta)
groups = sorted({ch['group'] for ch in channels})
state = {
'channels': channels,
'groups': groups,
'group_meta': group_meta,
'source_file': source_file,
}
def load_from_json_content(data: dict, source_file: str) -> None:
global state
channels = data.get('channels', [])
# backwards compat: old exports used base_name instead of name
for ch in channels:
if 'base_name' in ch and 'name' not in ch:
ch['name'] = ch.pop('base_name')
ch.setdefault('tags', [])
ch.setdefault('country_code', '')
for m in ch.get('mirrors', []):
# old exports had acestream_url; drop it
m.pop('acestream_url', None)
m.pop('raw_name', None)
m.pop('quality_marker', None)
group_meta = {g['name']: g.get('logo', '') for g in data.get('groups', [])}
groups = sorted({ch['group'] for ch in channels})
state = {
'channels': channels,
'groups': groups,
'group_meta': group_meta,
'source_file': source_file,
}
# ── Flask routes ──────────────────────────────────────────────────────────────
@app.route('/')
def index():
resp = send_from_directory(STATIC_DIR, 'index.html')
resp.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
return resp
@app.route('/ping')
def ping():
return 'OK'
@app.route('/logos/<path:filename>')
def serve_logo(filename):
return send_from_directory(LOGOS_ROOT, filename)
@app.route('/flags/<path:filename>')
def serve_flag(filename):
return send_from_directory(FLAGS_ROOT, filename)
def _maybe_reload_json() -> None:
"""Reload channels.json if it has been modified since last load."""
global _json_mtime
if state.get('source_file') != 'channels.json':
return
if not os.path.exists(DEFAULT_JSON):
return
mtime = os.path.getmtime(DEFAULT_JSON)
if mtime <= _json_mtime:
return
_json_mtime = mtime
try:
with open(DEFAULT_JSON, encoding='utf-8') as f:
load_from_json_content(json.loads(f.read()), 'channels.json')
except Exception:
pass # keep existing state on parse error
def _channels_response():
return jsonify({
'channels': state['channels'],
'groups': state['groups'],
'group_meta': state['group_meta'],
'source': state['source_file'],
'total': len(state['channels']),
})
@app.route('/api/channels')
def api_channels():
_maybe_reload_json()
return _channels_response()
@app.route('/api/import/m3u', methods=['POST'])
def api_import_m3u():
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
f = request.files['file']
try:
content = f.read().decode('utf-8', errors='replace')
load_from_m3u_content(content, f.filename or 'playlist.m3u')
except Exception as exc:
return jsonify({'error': str(exc)}), 400
return _channels_response()
@app.route('/api/import/json', methods=['POST'])
def api_import_json():
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
f = request.files['file']
try:
data = json.loads(f.read().decode('utf-8', errors='replace'))
load_from_json_content(data, f.filename or 'channels.json')
except Exception as exc:
return jsonify({'error': str(exc)}), 400
return _channels_response()
@app.route('/api/export/json')
def api_export_json():
export = {
'version': '1.0',
'exported_at': datetime.now(timezone.utc).isoformat(),
'source': state['source_file'],
'channels': state['channels'],
'groups': [
{'name': g, 'logo': state['group_meta'].get(g, ''), 'subcategories': []}
for g in state['groups']
],
}
return Response(
json.dumps(export, ensure_ascii=False, indent=2),
mimetype='application/json',
headers={'Content-Disposition': 'attachment; filename="channels.json"'},
)
@app.route('/api/export/m3u')
def api_export_m3u():
lines = ['#EXTM3U']
for ch in state['channels']:
for m in ch['mirrors']:
res = m.get('resolution', '')
entry_name = f'{ch["name"]} {res}'.strip() if res else ch["name"]
attrs = (
f'tvg-id="{ch.get("id", "")}" '
f'tvg-logo="{ch.get("logo_url", "")}" '
f'group-title="{ch["group"]}"'
)
lines.append(f'#EXTINF:-1 {attrs},{entry_name}')
lines.append(f'acestream://{m["acestream_hash"]}')
return Response(
'\n'.join(lines),
mimetype='text/plain',
headers={'Content-Disposition': 'attachment; filename="channels.m3u"'},
)
# ── Startup ───────────────────────────────────────────────────────────────────
def startup() -> None:
global logo_index, _json_mtime
print('Building logo index…', end=' ', flush=True)
logo_index = build_logo_index(LOGOS_ROOT)
print(f'{len(logo_index)} entries indexed.')
if os.path.exists(DEFAULT_JSON):
try:
_json_mtime = os.path.getmtime(DEFAULT_JSON)
with open(DEFAULT_JSON, encoding='utf-8') as f:
load_from_json_content(json.loads(f.read()), 'channels.json')
print(f'Loaded {len(state["channels"])} channels from channels.json')
except Exception as exc:
print(f'Warning: could not load channels.json ({exc}), falling back to example.m3u')
_json_mtime = 0.0
if os.path.exists(DEFAULT_M3U):
with open(DEFAULT_M3U, encoding='utf-8') as f:
load_from_m3u_content(f.read(), 'example.m3u')
elif os.path.exists(DEFAULT_M3U):
with open(DEFAULT_M3U, encoding='utf-8') as f:
load_from_m3u_content(f.read(), 'example.m3u')
print(f'Loaded {len(state["channels"])} channels from example.m3u')
startup()
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000, use_reloader=False)