| | |
| | |
| | import logging |
| | import os |
| | import subprocess |
| | import tempfile |
| | from typing import Optional |
| |
|
| | import librosa |
| | import numpy as np |
| | import soundfile as sf |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | def video_to_frames( |
| | video_path: str, |
| | frame_save_path: str, |
| | fps: int = 1, |
| | start_time: Optional[str] = None, |
| | end_time: Optional[str] = None, |
| | ) -> None: |
| | """从视频中提取帧到指定目录""" |
| | os.makedirs(frame_save_path, exist_ok=True) |
| | cmd = ["ffmpeg", "-y"] |
| | if start_time is not None: |
| | cmd += ["-ss", str(start_time)] |
| | if end_time is not None: |
| | cmd += ["-to", str(end_time)] |
| | cmd += ["-i", video_path, "-vf", f"fps={fps}", os.path.join(frame_save_path, "frame_%06d.jpg")] |
| | subprocess.run(cmd, check=True, capture_output=True) |
| |
|
| |
|
| | def video_to_audio( |
| | video_path: str, |
| | audio_save_path: str, |
| | sr: int = 16000, |
| | start_time: Optional[str] = None, |
| | end_time: Optional[str] = None, |
| | ) -> None: |
| | """从视频中提取音频""" |
| | os.makedirs(os.path.dirname(audio_save_path), exist_ok=True) |
| | cmd = ["ffmpeg", "-y"] |
| | if start_time is not None: |
| | cmd += ["-ss", str(start_time)] |
| | if end_time is not None: |
| | cmd += ["-to", str(end_time)] |
| | |
| | cmd += ["-i", video_path, "-vn", "-ac", "1", "-ar", str(sr), audio_save_path] |
| | subprocess.run(cmd, check=True, capture_output=True) |
| |
|
| |
|
| | def format_srt_time(seconds: float) -> str: |
| | """将秒数转换为 SRT 时间格式 HH:MM:SS,mmm""" |
| | hours = int(seconds // 3600) |
| | minutes = int((seconds % 3600) // 60) |
| | secs = int(seconds % 60) |
| | millis = int((seconds % 1) * 1000) |
| | return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}" |
| |
|
| |
|
| | def generate_srt_from_results(results_log: list, video_duration: float, output_srt_path: str) -> int: |
| | """ |
| | 根据推理结果生成 SRT 字幕文件 |
| | |
| | 时间对齐:unit N 的输出从 (N+1)s 显示到 (N+2)s |
| | |
| | Returns: |
| | 生成的字幕条数 |
| | """ |
| | |
| | special_tokens = ["<|tts_pad|>", "<|turn_eos|>", "<|chunk_eos|>", "<|listen|>", "<|speak|>"] |
| |
|
| | srt_lines = [] |
| | subtitle_index = 1 |
| |
|
| | for result in results_log: |
| | chunk_idx = result["chunk_idx"] |
| | text = result.get("text", "") |
| | is_listen = result.get("is_listen", True) |
| |
|
| | if not text or is_listen: |
| | continue |
| |
|
| | |
| | clean_text = text |
| | for token in special_tokens: |
| | clean_text = clean_text.replace(token, "") |
| | clean_text = clean_text.strip() |
| |
|
| | if not clean_text: |
| | continue |
| |
|
| | |
| | start_time = chunk_idx + 1 |
| | end_time = chunk_idx + 2 |
| |
|
| | |
| | if start_time >= video_duration: |
| | continue |
| | end_time = min(end_time, video_duration) |
| |
|
| | start_str = format_srt_time(start_time) |
| | end_str = format_srt_time(end_time) |
| |
|
| | srt_lines.append(f"{subtitle_index}") |
| | srt_lines.append(f"{start_str} --> {end_str}") |
| | srt_lines.append(clean_text) |
| | srt_lines.append("") |
| |
|
| | subtitle_index += 1 |
| |
|
| | with open(output_srt_path, "w", encoding="utf-8") as f: |
| | f.write("\n".join(srt_lines)) |
| |
|
| | return subtitle_index - 1 |
| |
|
| |
|
| | def build_ai_audio_file( |
| | timed_output_audio: list, |
| | video_duration: float, |
| | output_sample_rate: int, |
| | ) -> str: |
| | """ |
| | 生成 AI 语音音轨文件 |
| | |
| | 时间对齐:unit N 的音频从 (N+1)s 开始 |
| | |
| | Returns: |
| | 音频文件路径 |
| | """ |
| | |
| | max_end_time = 0 |
| | for chunk_idx, audio in timed_output_audio: |
| | start_time = chunk_idx + 1 |
| | duration = len(audio) / output_sample_rate |
| | end_time = start_time + duration |
| | max_end_time = max(max_end_time, end_time) |
| |
|
| | total_duration = max(video_duration, max_end_time) |
| | total_samples = int(total_duration * output_sample_rate) |
| | ai_audio_track = np.zeros(total_samples, dtype=np.float32) |
| |
|
| | |
| | for chunk_idx, audio in timed_output_audio: |
| | start_time = chunk_idx + 1 |
| | start_sample = int(start_time * output_sample_rate) |
| | end_sample = start_sample + len(audio) |
| |
|
| | if end_sample <= len(ai_audio_track): |
| | ai_audio_track[start_sample:end_sample] += audio |
| | else: |
| | available_len = len(ai_audio_track) - start_sample |
| | if available_len > 0: |
| | ai_audio_track[start_sample:] += audio[:available_len] |
| |
|
| | ai_audio_track = np.clip(ai_audio_track, -1.0, 1.0) |
| |
|
| | |
| | ai_audio_path = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name |
| | sf.write( |
| | ai_audio_path, |
| | (ai_audio_track * 32768).astype(np.int16), |
| | output_sample_rate, |
| | subtype="PCM_16", |
| | ) |
| |
|
| | return ai_audio_path |
| |
|
| |
|
| | def generate_duplex_video( |
| | video_path: str, |
| | output_video_path: str, |
| | results_log: list, |
| | timed_output_audio: list, |
| | output_sample_rate: int = 24000, |
| | ): |
| | """ |
| | 使用 ffmpeg 生成带有双工回复的视频(更可靠,正确处理旋转) |
| | |
| | - 将 AI 生成的语音混合到视频音轨 |
| | - 将 AI 生成的文本作为字幕烧录到视频 |
| | |
| | 时间对齐逻辑: |
| | - unit N (chunk_idx=N) 处理视频第 N~(N+1) 秒的输入 |
| | - unit N 生成的文本/语音 → 从第 (N+1) 秒开始显示/播放 |
| | """ |
| | logger.info(f"=" * 60) |
| | logger.info(f"Generating duplex video (ffmpeg method)") |
| | logger.info(f" Input video: {video_path}") |
| | logger.info(f" Output video: {output_video_path}") |
| | logger.info(f" Total units: {len(results_log)}") |
| | logger.info(f" Audio segments: {len(timed_output_audio)}") |
| |
|
| | |
| | try: |
| | probe_cmd = [ |
| | "ffprobe", |
| | "-v", |
| | "error", |
| | "-select_streams", |
| | "v:0", |
| | "-show_entries", |
| | "format=duration", |
| | "-of", |
| | "default=noprint_wrappers=1:nokey=1", |
| | video_path, |
| | ] |
| | result = subprocess.run(probe_cmd, capture_output=True, text=True) |
| | video_duration = float(result.stdout.strip()) |
| | except Exception as e: |
| | logger.warning(f" ffprobe duration failed: {e}, using 60s default") |
| | video_duration = 60.0 |
| |
|
| | logger.info(f" Video duration: {video_duration:.2f}s") |
| |
|
| | |
| | |
| | output_dir = os.path.dirname(output_video_path) |
| | srt_path = os.path.join(output_dir, "subtitles.srt") |
| | subtitle_count = generate_srt_from_results(results_log, video_duration, srt_path) |
| | logger.info(f" Generated {subtitle_count} subtitles -> {srt_path}") |
| |
|
| | |
| | ai_audio_path = None |
| | if timed_output_audio: |
| | ai_audio_path = build_ai_audio_file(timed_output_audio, video_duration, output_sample_rate) |
| | logger.info(f" Generated AI audio -> {ai_audio_path}") |
| |
|
| | |
| | has_original_audio = False |
| | try: |
| | probe_audio_cmd = [ |
| | "ffprobe", |
| | "-v", |
| | "error", |
| | "-select_streams", |
| | "a:0", |
| | "-show_entries", |
| | "stream=codec_type", |
| | "-of", |
| | "default=noprint_wrappers=1:nokey=1", |
| | video_path, |
| | ] |
| | result = subprocess.run(probe_audio_cmd, capture_output=True, text=True) |
| | has_original_audio = result.stdout.strip() == "audio" |
| | except Exception: |
| | pass |
| | logger.info(f" Original video has audio: {has_original_audio}") |
| |
|
| | |
| | |
| | has_subtitles = subtitle_count > 0 and os.path.exists(srt_path) |
| |
|
| | if has_subtitles: |
| | |
| | |
| | srt_path_escaped = srt_path.replace("\\", "\\\\").replace("'", "'\\''").replace(":", "\\:") |
| | subtitle_filter = ( |
| | f"subtitles='{srt_path_escaped}':" |
| | f"force_style='FontSize=28," |
| | f"PrimaryColour=&H00FFFFFF," |
| | f"OutlineColour=&H00000000," |
| | f"BorderStyle=3," |
| | f"Outline=2," |
| | f"Shadow=1," |
| | f"MarginV=30," |
| | f"Alignment=2'" |
| | ) |
| | else: |
| | logger.info(f" No subtitles to add") |
| |
|
| | |
| | cmd = ["ffmpeg", "-y", "-i", video_path] |
| |
|
| | if ai_audio_path: |
| | cmd.extend(["-i", ai_audio_path]) |
| |
|
| | if has_original_audio: |
| | |
| | if has_subtitles: |
| | filter_complex = f"[0:v]{subtitle_filter}[vout];[0:a][1:a]amix=inputs=2:duration=longest[aout]" |
| | cmd.extend(["-filter_complex", filter_complex, "-map", "[vout]", "-map", "[aout]"]) |
| | else: |
| | filter_complex = f"[0:a][1:a]amix=inputs=2:duration=longest[aout]" |
| | cmd.extend(["-filter_complex", filter_complex, "-map", "0:v", "-map", "[aout]"]) |
| | else: |
| | |
| | if has_subtitles: |
| | filter_complex = f"[0:v]{subtitle_filter}[vout]" |
| | cmd.extend(["-filter_complex", filter_complex, "-map", "[vout]", "-map", "1:a"]) |
| | else: |
| | cmd.extend(["-map", "0:v", "-map", "1:a"]) |
| | else: |
| | |
| | if has_subtitles: |
| | cmd.extend(["-vf", subtitle_filter]) |
| | if has_original_audio: |
| | cmd.extend(["-c:a", "copy"]) |
| |
|
| | cmd.extend(["-c:v", "libx264", "-c:a", "aac", "-preset", "medium", "-crf", "23", output_video_path]) |
| |
|
| | logger.info(f" Running ffmpeg: {' '.join(cmd[:6])}...") |
| | try: |
| | result = subprocess.run(cmd, capture_output=True, text=True, check=True) |
| | logger.info(f" ✓ Video generated successfully") |
| | except subprocess.CalledProcessError as e: |
| | logger.error(f" ✗ ffmpeg failed!") |
| | logger.error(f" stderr: {e.stderr}") |
| | raise |
| | finally: |
| | if os.path.exists(srt_path): |
| | os.remove(srt_path) |
| | logger.info(f" Cleaned up: {srt_path}") |
| | if ai_audio_path and os.path.exists(ai_audio_path): |
| | os.remove(ai_audio_path) |
| | logger.info(f" Cleaned up: {ai_audio_path}") |
| |
|
| | logger.info(f" ✓ Duplex video saved: {output_video_path}") |
| | logger.info(f"=" * 60) |
| | return output_video_path |
| |
|
| |
|
| | def adjust_audio_length(audio_path: str, num_frames: int, output_path: str, sr: int = 16000) -> str: |
| | """ |
| | 调整音频长度以匹配帧数 |
| | - 如果音频短了 → 补充静音 |
| | - 如果音频长了 → 截断 |
| | |
| | Args: |
| | audio_path: 原始音频路径 |
| | num_frames: 帧数(1fps,所以帧数=秒数) |
| | output_path: 输出音频路径 |
| | sr: 采样率 |
| | |
| | Returns: |
| | 调整后的音频路径 |
| | """ |
| | import numpy as np |
| | import soundfile as sf |
| |
|
| | |
| | audio, orig_sr = sf.read(audio_path) |
| |
|
| | |
| | if len(audio.shape) > 1: |
| | audio = audio.mean(axis=1) |
| |
|
| | |
| | if orig_sr != sr: |
| | |
| | import scipy.signal as signal |
| |
|
| | audio = signal.resample(audio, int(len(audio) * sr / orig_sr)) |
| |
|
| | |
| | target_length = num_frames * sr |
| | current_length = len(audio) |
| |
|
| | if current_length < target_length: |
| | |
| | padding = np.zeros(target_length - current_length) |
| | audio = np.concatenate([audio, padding]) |
| | logger.info(f" 音频补充静音: {current_length / sr:.2f}s -> {target_length / sr:.2f}s") |
| | elif current_length > target_length: |
| | |
| | audio = audio[:target_length] |
| | logger.info(f" 音频截断: {current_length / sr:.2f}s -> {target_length / sr:.2f}s") |
| | else: |
| | logger.info(f" 音频长度匹配: {current_length / sr:.2f}s") |
| |
|
| | |
| | sf.write(output_path, audio, sr) |
| | return output_path |
| |
|
| |
|
| | def get_frames_and_audio(video_path, item_output_dir, sample_rate=16000): |
| | frame_path = os.path.join(item_output_dir, "input_frames") |
| | audio_path = os.path.join(item_output_dir, "input_audio.wav") |
| | adjusted_audio_path = os.path.join(item_output_dir, "adjusted_audio.wav") |
| |
|
| | if not os.path.exists(frame_path) or not os.path.isfile(audio_path) or not os.path.exists(audio_path): |
| | logger.info(f"No frames in {frame_path}, or audio files in {audio_path}, regenerated it") |
| | os.makedirs(frame_path, exist_ok=True) |
| | video_to_frames(video_path, frame_path, fps=1) |
| |
|
| | video_to_audio(video_path, audio_path, sr=sample_rate) |
| | audio_array, _ = librosa.load(audio_path, sr=sample_rate, mono=True) |
| | logger.info(f"Extracted audio ({len(audio_array) / sample_rate:.2f}s) to {audio_path}") |
| | else: |
| | audio_array, _ = librosa.load(audio_path, sr=sample_rate, mono=True) |
| |
|
| | frame_files = sorted([f for f in os.listdir(frame_path) if f.endswith(".jpg")]) |
| | num_frames = len(frame_files) |
| |
|
| | logger.info(f"get {num_frames} frames to {frame_path}") |
| | logger.info(f"get audio ({len(audio_array) / sample_rate:.2f}s) to {audio_path}") |
| |
|
| | logger.info(f"adjust audio...") |
| | audio_path = adjust_audio_length(audio_path, num_frames, adjusted_audio_path) |
| |
|
| | return frame_path, num_frames, audio_path |
| |
|