#!/usr/bin/env python3 """ Script d'analyse des transcriptions avec Hugging Face Transformers Analyse les fichiers txt dans output/transcriptions et génère un résumé structuré """ import os from pathlib import Path import torch from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM from datetime import datetime import re # Bootstrap environnement portable try: from portable_env import setup_portable_env setup_portable_env() except Exception: pass # Configuration (via env, avec fallback local) BASE_DIR = Path(os.environ.get("BOB_BASE_DIR", Path(__file__).parent.parent)) TRANSCRIPTIONS_DIR = Path(os.environ.get("BOB_TRANSCRIPTIONS_DIR", BASE_DIR / "output" / "transcriptions")) OUTPUT_FILE = Path(os.environ.get("BOB_OUTPUT_FILE", BASE_DIR / "output" / "resume_bob.txt")) HF_MODEL = os.environ.get("HF_MODEL", "meta-llama/Llama-3.2-1B-Instruct") # par défaut def load_hf_model(): """Charge un modèle Hugging Face""" try: print(f"Chargement du modèle Hugging Face: {HF_MODEL}") # Utiliser pipeline pour plus de simplicité generator = pipeline( "text-generation", model=HF_MODEL, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, device_map="auto" if torch.cuda.is_available() else "cpu", token=os.environ.get("HF_TOKEN") # Pour les modèles privés ) print(f"Modèle {HF_MODEL} chargé avec succès") return generator except Exception as e: print(f"Erreur lors du chargement du modèle Hugging Face: {e}") print("Assurez-vous que le modèle est disponible et que vous avez les permissions nécessaires") return None def create_analysis_prompt(): """Crée le prompt d'analyse""" return """RÔLE: Expert en classification de contenu journalistique RTL. TÂCHE: Extraire 3 informations précises de cette transcription radio : 1. AUTEUR : Nom complet du journaliste/présentateur - Chercher "les précisions pour RTL de [NOM]" ou signature en fin - Si absent : "Inconnu" 2. QUALIFICATION du format (TRÈS IMPORTANT) : - P = PAPIER seul : Lecture continue par le journaliste, pas d'interviews • Phrases à la 3e personne uniquement • Aucune citation directe de témoins • Style narratif/descriptif pur - P+S = PAPIER + SON : Reportage avec interviews/témoignages • Présence de citations directes ("Je...", "Nous...") • Témoignages de personnes citées par leur prénom • Alternance narratif + paroles rapportées • Phrases comme "explique Alexandre", "témoigne Lucas" - QR = QUESTIONS-RÉPONSES : Interview/débat en direct • Format conversationnel • Questions-réponses explicites • Dialogue en temps réel 3. TITRE : Sujet principal en 4-6 mots, MAJUSCULES, style presse INDICES DE DÉTECTION P+S : - Citations à la 1ère personne : "J'ai été hospitalisé", "Nous avons commencé" - Prénoms + témoignages : "Alexandre explique", "Lucas raconte" - Discours rapporté : "Il dit que", "Ils nous ont dit" - Changement de ton narratif FORMAT OBLIGATOIRE : AUTEUR|QUALIFICATION|TITRE""" def detect_format_indicators(text): """Détecte automatiquement les indicateurs de format P/P+S/QR/MT""" indicators = { 'p_plus_s': 0, # Papier + Son 'qr': 0, # Questions-Réponses 'mt': 0, # Micro-Trottoir 'p_only': 0 # Papier seul } text_lower = text.lower() # Indicateurs Micro-Trottoir (MT) mt_patterns = [ r'\bmoi je (?:trouve|pense|crois|dis)', r'\bje trouve (?:que|ça|dommage)', r'\bje pense que', r'\bpour moi', r'\bà mon avis', r'\bfranchement', r'en arrivant', r'je viens (?:de|d\')', r'aujourd\'hui', r'c\'est dommage', r'malheureusement', r'donc je (?:voulais|pense)', r'quand même', r'un petit peu', r'vraiment dommage', ] # Indicateurs P+S (Papier + Son) p_plus_s_patterns = [ r'\bje\s+(?:suis|ai|me|pense|crois|vais|veux|dois)', r'\bnous\s+(?:avons|sommes|étions|allons|devons)', r'\bj\'(?:ai|étais|avais|irai|aurais)', r'\bmon\s+(?:père|fils|mari|frère)', r'\bma\s+(?:mère|fille|femme|sœur)', r'\b(?:explique|témoigne|raconte|confie|précise|ajoute|poursuit)\s+\w+', r'\b\w+\s+(?:explique|témoigne|raconte|confie|précise|ajoute|poursuit)', r'selon\s+\w+', r'(?:il|elle|ils|elles)\s+(?:dit|disent|explique|expliquent|affirme|assure)\s+que', r'pour\s+\w+\s*,', r'comme\s+(?:le\s+)?(?:dit|explique|précise)\s+\w+', r'voilà ce à quoi', r'c\'est qu?\'?à? partir', r'certains d\'entre (?:nous|eux)', r'parmi les\s+\d+', r'\b[A-Z][a-z]+\s+qui\s+(?:est|a|était)', r'comme\s+[A-Z][a-z]+', r'fièvre\s+et\s+\w+', r'hospitalisé', r'symptômes', r'malade', ] # Indicateurs QR (Questions-Réponses) qr_patterns = [ r'\?.*[A-Z]', # Question suivie de réponse r'question\s*:', r'réponse\s*:', r'vous\s+(?:pensez|croyez|dites)', r'que\s+pensez-vous', r'interview', r'débat', ] # Compter les patterns for pattern in mt_patterns: indicators['mt'] += len(re.findall(pattern, text_lower, re.IGNORECASE)) for pattern in p_plus_s_patterns: indicators['p_plus_s'] += len(re.findall(pattern, text_lower, re.IGNORECASE)) for pattern in qr_patterns: indicators['qr'] += len(re.findall(pattern, text_lower, re.IGNORECASE)) # Si ni MT ni P+S ni QR détecté fortement, c'est probablement P seul if indicators['mt'] < 3 and indicators['p_plus_s'] < 3 and indicators['qr'] < 2: indicators['p_only'] = 5 return indicators def analyze_transcription(generator, transcription_text, filename): """Analyse une transcription avec Hugging Face""" try: # Analyse automatique des patterns format_indicators = detect_format_indicators(transcription_text) # Créer un hint pour l'IA basé sur l'analyse automatique max_score = max(format_indicators.values()) likely_format = [k for k, v in format_indicators.items() if v == max_score][0] hint_map = { 'p_plus_s': "ATTENTION: Nombreux témoignages détectés → OBLIGATOIREMENT P+S", 'qr': "ATTENTION: Format questions-réponses détecté → OBLIGATOIREMENT QR", 'p_only': "ATTENTION: Aucun témoignage/interview → OBLIGATOIREMENT P" } # Si détection très claire (score élevé), forcer le format force_format = "" if format_indicators['p_plus_s'] >= 5: force_format = "\nFORMAT IMPOSÉ: Utilise OBLIGATOIREMENT 'P+S' pour la qualification." elif format_indicators['qr'] >= 3: force_format = "\nFORMAT IMPOSÉ: Utilise OBLIGATOIREMENT 'QR' pour la qualification." elif format_indicators['p_plus_s'] <= 1 and format_indicators['qr'] <= 1: force_format = "\nFORMAT IMPOSÉ: Utilise OBLIGATOIREMENT 'P' pour la qualification." format_hint = hint_map.get(likely_format, "") prompt = create_analysis_prompt() enhanced_prompt = f"{prompt}\n\n{format_hint}{force_format}" # Format pour Llama full_prompt = f"<|begin_of_text|><|start_header_id|>user<|end_header_id|>\n\n{enhanced_prompt}\n\nTRANSCRIPTION:\n{transcription_text}<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n" print(f"Analyse de: {filename}") print(f" Indices détectés: MT={format_indicators['mt']}, P+S={format_indicators['p_plus_s']}, QR={format_indicators['qr']}, P={format_indicators['p_only']}") # Génération avec le modèle response = generator( full_prompt, max_new_tokens=150, temperature=0.2, top_k=20, top_p=0.8, repetition_penalty=1.15, do_sample=True, pad_token_id=generator.tokenizer.eos_token_id ) result = response[0]['generated_text'].replace(full_prompt, '').strip() # Parser le résultat result = result.replace('\n', ' ').strip() if "|" in result: # Prendre la première ligne qui contient des | lines = result.split('\n') for line in lines: if "|" in line and line.count("|") >= 2: parts = line.split("|") if len(parts) >= 3: auteur = parts[0].strip() qualification = parts[1].strip() titre = parts[2].strip() # Nettoyer et formater if auteur.lower() in ["inconnu", "non mentionné", "auteur", ""]: auteur = "Inconnu" # Valider la qualification if qualification.upper() not in ["P", "P+S", "SON", "MT", "QR"]: qualification = "P" # Défaut return { "success": True, "auteur": auteur, "qualification": qualification.upper(), "titre": titre.upper(), "filename": filename } # Si pas de format avec |, essayer de parser différemment if " - " in result: parts = result.split(" - ") if len(parts) >= 3: auteur = parts[0].strip() qualification = parts[1].strip() titre = " - ".join(parts[2:]).strip() if auteur.lower() in ["inconnu", "non mentionné", "auteur", ""]: auteur = "Inconnu" # Valider la qualification if qualification.upper() not in ["P", "P+S", "QR"]: qualification = "P" return { "success": True, "auteur": auteur, "qualification": qualification.upper(), "titre": titre.upper(), "filename": filename } # Si le format n'est pas correct return { "success": False, "error": f"Format de réponse incorrect: {result}", "filename": filename, "raw_response": result } except Exception as e: return { "success": False, "error": str(e), "filename": filename } def read_transcription_file(file_path): """Lit le contenu d'un fichier de transcription et extrait les métadonnées""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Extraire les métadonnées metadata = {} lines = content.split('\n') for line in lines[:10]: # Chercher dans les 10 premières lignes if line.startswith('Fichier source:'): metadata['filename'] = line.replace('Fichier source:', '').strip() elif line.startswith('Durée de traitement:'): metadata['processing_time'] = line.replace('Durée de traitement:', '').strip() # Extraire seulement le texte de transcription (après les métadonnées) if "--------------------------------------------------" in content: parts = content.split("--------------------------------------------------") if len(parts) > 1: text_content = parts[1].strip() return text_content, metadata return content.strip(), metadata except Exception as e: print(f"Erreur lors de la lecture de {file_path}: {e}") return None, {} def apply_duration_correction(result, duration_seconds, format_indicators=None): """Applique une correction probabiliste basée sur la durée et les patterns détectés""" if not duration_seconds: return result original_qualification = result.get("qualification", "") corrected = False # Priorité 1: Détection Micro-Trottoir basée sur patterns + durée if format_indicators and format_indicators.get('mt', 0) >= 8 and duration_seconds < 60: if original_qualification in ["P", "P+S", "SON"]: result["qualification"] = "MT" corrected = True print(f" → Correction MT détecté: {original_qualification} → MT (patterns={format_indicators['mt']})") # Priorité 2: Logique probabiliste selon durée (si pas MT) elif not corrected: if duration_seconds < 30: # < 30s = quasi-certainement un SON if original_qualification in ["P", "P+S"]: result["qualification"] = "SON" corrected = True print(f" → Correction durée < 30s: {original_qualification} → SON") elif 30 <= duration_seconds <= 40: # 30-40s = probablement un SON, mais peut être P if original_qualification == "P+S": result["qualification"] = "SON" corrected = True print(f" → Correction durée 30-40s: P+S → SON") return result def extract_author_from_filename(filename): """Extrait le nom du journaliste depuis le nom du fichier""" try: # Nettoyer le nom du fichier clean_name = filename.replace('_transcription.txt', '').replace('.mp3', '').replace('.MP3', '') # Patterns courants pour extraire un nom (prénom + nom) words = clean_name.split() # Chercher une séquence de 2 mots qui commencent par une majuscule for i in range(len(words) - 1): word1 = words[i].strip('()[]{}.,;:!?-_') word2 = words[i + 1].strip('()[]{}.,;:!?-_') # Vérifier si les deux mots ressemblent à un prénom + nom if (len(word1) >= 2 and len(word2) >= 2 and word1[0].isupper() and word2[0].isupper() and word1.isalpha() and word2.isalpha()): return f"{word1} {word2}" # Si pas trouvé, chercher le premier mot qui commence par une majuscule for word in words: clean_word = word.strip('()[]{}.,;:!?-_') if len(clean_word) >= 2 and clean_word[0].isupper() and clean_word.isalpha(): # Essayer de trouver le mot suivant word_index = words.index(word) if word_index + 1 < len(words): next_word = words[word_index + 1].strip('()[]{}.,;:!?-_') if len(next_word) >= 2 and next_word[0].isupper() and next_word.isalpha(): return f"{clean_word} {next_word}" return clean_word return "Inconnu" except Exception as e: print(f"Erreur extraction auteur de {filename}: {e}") return "Inconnu" def get_audio_duration(audio_filename, input_dir): """Calcule la durée d'un fichier audio en secondes totales""" try: from pydub import AudioSegment audio_path = None audio_extensions = ['.mp3', '.wav', '.m4a', '.flac', '.ogg', '.mp4', '.avi', '.mov'] # Rechercher le fichier audio correspondant for ext in audio_extensions: potential_path = input_dir / audio_filename.replace('_transcription.txt', ext) if potential_path.exists(): audio_path = potential_path break base_name = audio_filename.replace('_transcription.txt', '') potential_path = input_dir / f"{base_name}{ext}" if potential_path.exists(): audio_path = potential_path break if audio_path: audio = AudioSegment.from_file(str(audio_path)) duration_seconds = len(audio) / 1000 # pydub retourne en millisecondes minutes = int(duration_seconds // 60) seconds = int(duration_seconds % 60) return minutes * 100 + seconds return None except Exception as e: print(f"Erreur calcul durée pour {audio_filename}: {e}") return None def get_transcription_files(transcriptions_dir): """Récupère tous les fichiers de transcription""" if not transcriptions_dir.exists(): print(f"Le dossier {transcriptions_dir} n'existe pas") return [] txt_files = list(transcriptions_dir.glob("*_transcription.txt")) return sorted(txt_files) def main(): """Fonction principale""" print("=" * 60) print("ANALYSE DES BOB AVEC HUGGING FACE") print("=" * 60) # Vérification des dossiers script_dir = Path(__file__).parent.absolute() transcriptions_dir = Path(os.environ.get("BOB_TRANSCRIPTIONS_DIR", script_dir.parent / "output" / "transcriptions")) output_file = Path(os.environ.get("BOB_OUTPUT_FILE", script_dir.parent / "output" / "resume_bob.txt")) input_dir = Path(os.environ.get("BOB_INPUT_DIR", script_dir.parent / "input")) # Utiliser la fonction factorisée avec print comme log analyze_files_hf( transcriptions_dir=transcriptions_dir, input_dir=input_dir, output_file=output_file, log_fn=print, progress_fn=None, cancel_fn=None, ) if __name__ == "__main__": main() # --- API factorisée pour le GUI --- def analyze_files_hf(transcriptions_dir: Path, input_dir: Path, output_file: Path, log_fn=print, progress_fn=None, cancel_fn=None): """Analyse tous les fichiers de transcription avec Hugging Face""" log = log_fn or (lambda *a, **k: None) log("Dossier transcriptions: {}".format(transcriptions_dir)) log("Fichier de sortie: {}".format(output_file)) log("") transcription_files = get_transcription_files(transcriptions_dir) if not transcription_files: log("Aucun fichier de transcription trouvé") log("Assurez-vous d'avoir exécuté le script de transcription d'abord") return {"success": False, "count": 0} log(f"Trouvé {len(transcription_files)} fichier(s) de transcription:") for i, file in enumerate(transcription_files, 1): log(f" {i}. {file.name}") log("") # Initialisation du modèle Hugging Face generator = load_hf_model() if not generator: return {"success": False, "error": "Modèle Hugging Face indisponible"} results = [] success_count = 0 total = len(transcription_files) for i, file_path in enumerate(transcription_files, 1): if cancel_fn and cancel_fn(): log("⏹️ Analyse annulée") break log(f"[{i}/{total}] ") transcription_text, metadata = read_transcription_file(file_path) if not transcription_text: log(f"✗ Impossible de lire {file_path.name}") if progress_fn: progress_fn(i, total) continue duration = get_audio_duration(file_path.name, input_dir) author_from_filename = extract_author_from_filename(file_path.name) result = analyze_transcription(generator, transcription_text, file_path.name) if result["success"]: format_indicators = detect_format_indicators(transcription_text) result = apply_duration_correction(result, duration, format_indicators) if result["auteur"].lower() in ["inconnu", "non mentionné", "auteur", ""]: result["auteur"] = author_from_filename result["duree"] = duration if duration else "000" result["filename_source"] = metadata.get("filename", file_path.name) log(f"✓ {result['auteur']} - {result['qualification']} - {result['titre']} - {result['duree']}") results.append(result) success_count += 1 else: log(f"✗ Erreur: {result['error']}") if "raw_response" in result: log(f" Réponse brute: {result['raw_response']}") if progress_fn: progress_fn(i, total) log("") if results: output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, 'w', encoding='utf-8') as f: f.write(f"# RÉSUMÉ DES BOB - {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}\n") f.write(f"# Format: Auteur | Qualification | Titre | Durée\n") f.write("# Qualification: P=papier, P+S=papier+son, QR=question-réponse\n") f.write("# Durée: format MMss (ex: 1min04 = 104)\n") f.write("# " + "="*70 + "\n\n") for r in results: line = f"{r['auteur']} | {r['qualification']} | {r['titre']} | {r['duree']}" f.write(line + "\n") log("=" * 60) log("RÉSUMÉ GÉNÉRÉ") log("=" * 60) log(f"Fichiers analysés: {total}") log(f"Analyses réussies: {success_count}") log(f"Analyses échouées: {total - success_count}") log(f"Fichier de résumé: {output_file}") return {"success": True, "count": total, "ok": success_count, "results": results} else: log("Aucune analyse réussie, pas de fichier de résumé généré") return {"success": False, "count": total, "ok": 0}