#!/usr/bin/env python # -*- coding: utf-8 -*- import logging import os import subprocess import tempfile from typing import Optional import librosa import numpy as np import soundfile as sf logger = logging.getLogger(__name__) def video_to_frames( video_path: str, frame_save_path: str, fps: int = 1, start_time: Optional[str] = None, end_time: Optional[str] = None, ) -> None: """从视频中提取帧到指定目录""" os.makedirs(frame_save_path, exist_ok=True) cmd = ["ffmpeg", "-y"] if start_time is not None: cmd += ["-ss", str(start_time)] if end_time is not None: cmd += ["-to", str(end_time)] cmd += ["-i", video_path, "-vf", f"fps={fps}", os.path.join(frame_save_path, "frame_%06d.jpg")] subprocess.run(cmd, check=True, capture_output=True) def video_to_audio( video_path: str, audio_save_path: str, sr: int = 16000, start_time: Optional[str] = None, end_time: Optional[str] = None, ) -> None: """从视频中提取音频""" os.makedirs(os.path.dirname(audio_save_path), exist_ok=True) cmd = ["ffmpeg", "-y"] if start_time is not None: cmd += ["-ss", str(start_time)] if end_time is not None: cmd += ["-to", str(end_time)] # 使用 wav 格式,更通用 cmd += ["-i", video_path, "-vn", "-ac", "1", "-ar", str(sr), audio_save_path] subprocess.run(cmd, check=True, capture_output=True) def format_srt_time(seconds: float) -> str: """将秒数转换为 SRT 时间格式 HH:MM:SS,mmm""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) millis = int((seconds % 1) * 1000) return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}" def generate_srt_from_results(results_log: list, video_duration: float, output_srt_path: str) -> int: """ 根据推理结果生成 SRT 字幕文件 时间对齐:unit N 的输出从 (N+1)s 显示到 (N+2)s Returns: 生成的字幕条数 """ # 需要过滤的特殊 token special_tokens = ["<|tts_pad|>", "<|turn_eos|>", "<|chunk_eos|>", "<|listen|>", "<|speak|>"] srt_lines = [] subtitle_index = 1 for result in results_log: chunk_idx = result["chunk_idx"] text = result.get("text", "") is_listen = result.get("is_listen", True) if not text or is_listen: continue # 过滤特殊 token clean_text = text for token in special_tokens: clean_text = clean_text.replace(token, "") clean_text = clean_text.strip() if not clean_text: continue # 时间对齐:unit N 的输出从 (N+1)s 显示到 (N+2)s start_time = chunk_idx + 1 end_time = chunk_idx + 2 # 不超过视频时长 if start_time >= video_duration: continue end_time = min(end_time, video_duration) start_str = format_srt_time(start_time) end_str = format_srt_time(end_time) srt_lines.append(f"{subtitle_index}") srt_lines.append(f"{start_str} --> {end_str}") srt_lines.append(clean_text) srt_lines.append("") # 空行分隔 subtitle_index += 1 with open(output_srt_path, "w", encoding="utf-8") as f: f.write("\n".join(srt_lines)) return subtitle_index - 1 def build_ai_audio_file( timed_output_audio: list, video_duration: float, output_sample_rate: int, ) -> str: """ 生成 AI 语音音轨文件 时间对齐:unit N 的音频从 (N+1)s 开始 Returns: 音频文件路径 """ # 计算音轨总长度 max_end_time = 0 for chunk_idx, audio in timed_output_audio: start_time = chunk_idx + 1 duration = len(audio) / output_sample_rate end_time = start_time + duration max_end_time = max(max_end_time, end_time) total_duration = max(video_duration, max_end_time) total_samples = int(total_duration * output_sample_rate) ai_audio_track = np.zeros(total_samples, dtype=np.float32) # 将每个 unit 的音频放到对应位置 for chunk_idx, audio in timed_output_audio: start_time = chunk_idx + 1 start_sample = int(start_time * output_sample_rate) end_sample = start_sample + len(audio) if end_sample <= len(ai_audio_track): ai_audio_track[start_sample:end_sample] += audio else: available_len = len(ai_audio_track) - start_sample if available_len > 0: ai_audio_track[start_sample:] += audio[:available_len] ai_audio_track = np.clip(ai_audio_track, -1.0, 1.0) # 保存为临时文件 ai_audio_path = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name sf.write( ai_audio_path, (ai_audio_track * 32768).astype(np.int16), output_sample_rate, subtype="PCM_16", ) return ai_audio_path def generate_duplex_video( video_path: str, output_video_path: str, results_log: list, timed_output_audio: list, output_sample_rate: int = 24000, ): """ 使用 ffmpeg 生成带有双工回复的视频(更可靠,正确处理旋转) - 将 AI 生成的语音混合到视频音轨 - 将 AI 生成的文本作为字幕烧录到视频 时间对齐逻辑: - unit N (chunk_idx=N) 处理视频第 N~(N+1) 秒的输入 - unit N 生成的文本/语音 → 从第 (N+1) 秒开始显示/播放 """ logger.info(f"=" * 60) logger.info(f"Generating duplex video (ffmpeg method)") logger.info(f" Input video: {video_path}") logger.info(f" Output video: {output_video_path}") logger.info(f" Total units: {len(results_log)}") logger.info(f" Audio segments: {len(timed_output_audio)}") # ========== 1. 获取视频时长 ========== try: probe_cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_path, ] result = subprocess.run(probe_cmd, capture_output=True, text=True) video_duration = float(result.stdout.strip()) except Exception as e: logger.warning(f" ffprobe duration failed: {e}, using 60s default") video_duration = 60.0 logger.info(f" Video duration: {video_duration:.2f}s") # ========== 2. 生成 SRT 字幕文件 ========== # 使用输出目录下的固定文件名,避免 /tmp 路径问题 output_dir = os.path.dirname(output_video_path) srt_path = os.path.join(output_dir, "subtitles.srt") subtitle_count = generate_srt_from_results(results_log, video_duration, srt_path) logger.info(f" Generated {subtitle_count} subtitles -> {srt_path}") # ========== 3. 生成 AI 音频文件 ========== ai_audio_path = None if timed_output_audio: ai_audio_path = build_ai_audio_file(timed_output_audio, video_duration, output_sample_rate) logger.info(f" Generated AI audio -> {ai_audio_path}") # ========== 4. 检查原视频是否有音频 ========== has_original_audio = False try: probe_audio_cmd = [ "ffprobe", "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=codec_type", "-of", "default=noprint_wrappers=1:nokey=1", video_path, ] result = subprocess.run(probe_audio_cmd, capture_output=True, text=True) has_original_audio = result.stdout.strip() == "audio" except Exception: pass logger.info(f" Original video has audio: {has_original_audio}") # ========== 5. 使用 ffmpeg 合成视频 ========== # 构建字幕 filter(只有在有字幕时才添加) has_subtitles = subtitle_count > 0 and os.path.exists(srt_path) if has_subtitles: # 对路径进行转义(ffmpeg subtitles filter 需要) # 注意:在 filter_complex 中需要额外转义 srt_path_escaped = srt_path.replace("\\", "\\\\").replace("'", "'\\''").replace(":", "\\:") subtitle_filter = ( f"subtitles='{srt_path_escaped}':" f"force_style='FontSize=28," f"PrimaryColour=&H00FFFFFF," # 白色 f"OutlineColour=&H00000000," # 黑色描边 f"BorderStyle=3," # 带背景框 f"Outline=2," f"Shadow=1," f"MarginV=30," f"Alignment=2'" # 底部居中 ) else: logger.info(f" No subtitles to add") # 构建 ffmpeg 命令 cmd = ["ffmpeg", "-y", "-i", video_path] if ai_audio_path: cmd.extend(["-i", ai_audio_path]) if has_original_audio: # 原视频有音频,混合两个音轨 if has_subtitles: filter_complex = f"[0:v]{subtitle_filter}[vout];[0:a][1:a]amix=inputs=2:duration=longest[aout]" cmd.extend(["-filter_complex", filter_complex, "-map", "[vout]", "-map", "[aout]"]) else: filter_complex = f"[0:a][1:a]amix=inputs=2:duration=longest[aout]" cmd.extend(["-filter_complex", filter_complex, "-map", "0:v", "-map", "[aout]"]) else: # 原视频没有音频,只用 AI 音频 if has_subtitles: filter_complex = f"[0:v]{subtitle_filter}[vout]" cmd.extend(["-filter_complex", filter_complex, "-map", "[vout]", "-map", "1:a"]) else: cmd.extend(["-map", "0:v", "-map", "1:a"]) else: # 没有 AI 音频 if has_subtitles: cmd.extend(["-vf", subtitle_filter]) if has_original_audio: cmd.extend(["-c:a", "copy"]) cmd.extend(["-c:v", "libx264", "-c:a", "aac", "-preset", "medium", "-crf", "23", output_video_path]) logger.info(f" Running ffmpeg: {' '.join(cmd[:6])}...") try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) logger.info(f" ✓ Video generated successfully") except subprocess.CalledProcessError as e: logger.error(f" ✗ ffmpeg failed!") logger.error(f" stderr: {e.stderr}") raise finally: if os.path.exists(srt_path): os.remove(srt_path) logger.info(f" Cleaned up: {srt_path}") if ai_audio_path and os.path.exists(ai_audio_path): os.remove(ai_audio_path) logger.info(f" Cleaned up: {ai_audio_path}") logger.info(f" ✓ Duplex video saved: {output_video_path}") logger.info(f"=" * 60) return output_video_path def adjust_audio_length(audio_path: str, num_frames: int, output_path: str, sr: int = 16000) -> str: """ 调整音频长度以匹配帧数 - 如果音频短了 → 补充静音 - 如果音频长了 → 截断 Args: audio_path: 原始音频路径 num_frames: 帧数(1fps,所以帧数=秒数) output_path: 输出音频路径 sr: 采样率 Returns: 调整后的音频路径 """ import numpy as np import soundfile as sf # 读取原始音频 audio, orig_sr = sf.read(audio_path) # 如果是立体声,转为单声道 if len(audio.shape) > 1: audio = audio.mean(axis=1) # 重采样(如果需要) if orig_sr != sr: # 简单的线性重采样 import scipy.signal as signal audio = signal.resample(audio, int(len(audio) * sr / orig_sr)) # 目标长度(帧数 * 采样率) target_length = num_frames * sr current_length = len(audio) if current_length < target_length: # 音频短了,补充静音 padding = np.zeros(target_length - current_length) audio = np.concatenate([audio, padding]) logger.info(f" 音频补充静音: {current_length / sr:.2f}s -> {target_length / sr:.2f}s") elif current_length > target_length: # 音频长了,截断 audio = audio[:target_length] logger.info(f" 音频截断: {current_length / sr:.2f}s -> {target_length / sr:.2f}s") else: logger.info(f" 音频长度匹配: {current_length / sr:.2f}s") # 保存调整后的音频 sf.write(output_path, audio, sr) return output_path def get_frames_and_audio(video_path, item_output_dir, sample_rate=16000): frame_path = os.path.join(item_output_dir, "input_frames") audio_path = os.path.join(item_output_dir, "input_audio.wav") adjusted_audio_path = os.path.join(item_output_dir, "adjusted_audio.wav") if not os.path.exists(frame_path) or not os.path.isfile(audio_path) or not os.path.exists(audio_path): logger.info(f"No frames in {frame_path}, or audio files in {audio_path}, regenerated it") os.makedirs(frame_path, exist_ok=True) video_to_frames(video_path, frame_path, fps=1) video_to_audio(video_path, audio_path, sr=sample_rate) audio_array, _ = librosa.load(audio_path, sr=sample_rate, mono=True) logger.info(f"Extracted audio ({len(audio_array) / sample_rate:.2f}s) to {audio_path}") else: audio_array, _ = librosa.load(audio_path, sr=sample_rate, mono=True) frame_files = sorted([f for f in os.listdir(frame_path) if f.endswith(".jpg")]) num_frames = len(frame_files) logger.info(f"get {num_frames} frames to {frame_path}") logger.info(f"get audio ({len(audio_array) / sample_rate:.2f}s) to {audio_path}") logger.info(f"adjust audio...") audio_path = adjust_audio_length(audio_path, num_frames, adjusted_audio_path) return frame_path, num_frames, audio_path