MiniCPM-o-4.5-zhenwu-FlagOS / duplex_utils.py
YummyYum's picture
Upload folder using huggingface_hub
546cc4e verified
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import os
import subprocess
import tempfile
from typing import Optional
import librosa
import numpy as np
import soundfile as sf
logger = logging.getLogger(__name__)
def video_to_frames(
video_path: str,
frame_save_path: str,
fps: int = 1,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
) -> None:
"""从视频中提取帧到指定目录"""
os.makedirs(frame_save_path, exist_ok=True)
cmd = ["ffmpeg", "-y"]
if start_time is not None:
cmd += ["-ss", str(start_time)]
if end_time is not None:
cmd += ["-to", str(end_time)]
cmd += ["-i", video_path, "-vf", f"fps={fps}", os.path.join(frame_save_path, "frame_%06d.jpg")]
subprocess.run(cmd, check=True, capture_output=True)
def video_to_audio(
video_path: str,
audio_save_path: str,
sr: int = 16000,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
) -> None:
"""从视频中提取音频"""
os.makedirs(os.path.dirname(audio_save_path), exist_ok=True)
cmd = ["ffmpeg", "-y"]
if start_time is not None:
cmd += ["-ss", str(start_time)]
if end_time is not None:
cmd += ["-to", str(end_time)]
# 使用 wav 格式,更通用
cmd += ["-i", video_path, "-vn", "-ac", "1", "-ar", str(sr), audio_save_path]
subprocess.run(cmd, check=True, capture_output=True)
def format_srt_time(seconds: float) -> str:
"""将秒数转换为 SRT 时间格式 HH:MM:SS,mmm"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int((seconds % 1) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
def generate_srt_from_results(results_log: list, video_duration: float, output_srt_path: str) -> int:
"""
根据推理结果生成 SRT 字幕文件
时间对齐:unit N 的输出从 (N+1)s 显示到 (N+2)s
Returns:
生成的字幕条数
"""
# 需要过滤的特殊 token
special_tokens = ["<|tts_pad|>", "<|turn_eos|>", "<|chunk_eos|>", "<|listen|>", "<|speak|>"]
srt_lines = []
subtitle_index = 1
for result in results_log:
chunk_idx = result["chunk_idx"]
text = result.get("text", "")
is_listen = result.get("is_listen", True)
if not text or is_listen:
continue
# 过滤特殊 token
clean_text = text
for token in special_tokens:
clean_text = clean_text.replace(token, "")
clean_text = clean_text.strip()
if not clean_text:
continue
# 时间对齐:unit N 的输出从 (N+1)s 显示到 (N+2)s
start_time = chunk_idx + 1
end_time = chunk_idx + 2
# 不超过视频时长
if start_time >= video_duration:
continue
end_time = min(end_time, video_duration)
start_str = format_srt_time(start_time)
end_str = format_srt_time(end_time)
srt_lines.append(f"{subtitle_index}")
srt_lines.append(f"{start_str} --> {end_str}")
srt_lines.append(clean_text)
srt_lines.append("") # 空行分隔
subtitle_index += 1
with open(output_srt_path, "w", encoding="utf-8") as f:
f.write("\n".join(srt_lines))
return subtitle_index - 1
def build_ai_audio_file(
timed_output_audio: list,
video_duration: float,
output_sample_rate: int,
) -> str:
"""
生成 AI 语音音轨文件
时间对齐:unit N 的音频从 (N+1)s 开始
Returns:
音频文件路径
"""
# 计算音轨总长度
max_end_time = 0
for chunk_idx, audio in timed_output_audio:
start_time = chunk_idx + 1
duration = len(audio) / output_sample_rate
end_time = start_time + duration
max_end_time = max(max_end_time, end_time)
total_duration = max(video_duration, max_end_time)
total_samples = int(total_duration * output_sample_rate)
ai_audio_track = np.zeros(total_samples, dtype=np.float32)
# 将每个 unit 的音频放到对应位置
for chunk_idx, audio in timed_output_audio:
start_time = chunk_idx + 1
start_sample = int(start_time * output_sample_rate)
end_sample = start_sample + len(audio)
if end_sample <= len(ai_audio_track):
ai_audio_track[start_sample:end_sample] += audio
else:
available_len = len(ai_audio_track) - start_sample
if available_len > 0:
ai_audio_track[start_sample:] += audio[:available_len]
ai_audio_track = np.clip(ai_audio_track, -1.0, 1.0)
# 保存为临时文件
ai_audio_path = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name
sf.write(
ai_audio_path,
(ai_audio_track * 32768).astype(np.int16),
output_sample_rate,
subtype="PCM_16",
)
return ai_audio_path
def generate_duplex_video(
video_path: str,
output_video_path: str,
results_log: list,
timed_output_audio: list,
output_sample_rate: int = 24000,
):
"""
使用 ffmpeg 生成带有双工回复的视频(更可靠,正确处理旋转)
- 将 AI 生成的语音混合到视频音轨
- 将 AI 生成的文本作为字幕烧录到视频
时间对齐逻辑:
- unit N (chunk_idx=N) 处理视频第 N~(N+1) 秒的输入
- unit N 生成的文本/语音 → 从第 (N+1) 秒开始显示/播放
"""
logger.info(f"=" * 60)
logger.info(f"Generating duplex video (ffmpeg method)")
logger.info(f" Input video: {video_path}")
logger.info(f" Output video: {output_video_path}")
logger.info(f" Total units: {len(results_log)}")
logger.info(f" Audio segments: {len(timed_output_audio)}")
# ========== 1. 获取视频时长 ==========
try:
probe_cmd = [
"ffprobe",
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
video_path,
]
result = subprocess.run(probe_cmd, capture_output=True, text=True)
video_duration = float(result.stdout.strip())
except Exception as e:
logger.warning(f" ffprobe duration failed: {e}, using 60s default")
video_duration = 60.0
logger.info(f" Video duration: {video_duration:.2f}s")
# ========== 2. 生成 SRT 字幕文件 ==========
# 使用输出目录下的固定文件名,避免 /tmp 路径问题
output_dir = os.path.dirname(output_video_path)
srt_path = os.path.join(output_dir, "subtitles.srt")
subtitle_count = generate_srt_from_results(results_log, video_duration, srt_path)
logger.info(f" Generated {subtitle_count} subtitles -> {srt_path}")
# ========== 3. 生成 AI 音频文件 ==========
ai_audio_path = None
if timed_output_audio:
ai_audio_path = build_ai_audio_file(timed_output_audio, video_duration, output_sample_rate)
logger.info(f" Generated AI audio -> {ai_audio_path}")
# ========== 4. 检查原视频是否有音频 ==========
has_original_audio = False
try:
probe_audio_cmd = [
"ffprobe",
"-v",
"error",
"-select_streams",
"a:0",
"-show_entries",
"stream=codec_type",
"-of",
"default=noprint_wrappers=1:nokey=1",
video_path,
]
result = subprocess.run(probe_audio_cmd, capture_output=True, text=True)
has_original_audio = result.stdout.strip() == "audio"
except Exception:
pass
logger.info(f" Original video has audio: {has_original_audio}")
# ========== 5. 使用 ffmpeg 合成视频 ==========
# 构建字幕 filter(只有在有字幕时才添加)
has_subtitles = subtitle_count > 0 and os.path.exists(srt_path)
if has_subtitles:
# 对路径进行转义(ffmpeg subtitles filter 需要)
# 注意:在 filter_complex 中需要额外转义
srt_path_escaped = srt_path.replace("\\", "\\\\").replace("'", "'\\''").replace(":", "\\:")
subtitle_filter = (
f"subtitles='{srt_path_escaped}':"
f"force_style='FontSize=28,"
f"PrimaryColour=&H00FFFFFF," # 白色
f"OutlineColour=&H00000000," # 黑色描边
f"BorderStyle=3," # 带背景框
f"Outline=2,"
f"Shadow=1,"
f"MarginV=30,"
f"Alignment=2'" # 底部居中
)
else:
logger.info(f" No subtitles to add")
# 构建 ffmpeg 命令
cmd = ["ffmpeg", "-y", "-i", video_path]
if ai_audio_path:
cmd.extend(["-i", ai_audio_path])
if has_original_audio:
# 原视频有音频,混合两个音轨
if has_subtitles:
filter_complex = f"[0:v]{subtitle_filter}[vout];[0:a][1:a]amix=inputs=2:duration=longest[aout]"
cmd.extend(["-filter_complex", filter_complex, "-map", "[vout]", "-map", "[aout]"])
else:
filter_complex = f"[0:a][1:a]amix=inputs=2:duration=longest[aout]"
cmd.extend(["-filter_complex", filter_complex, "-map", "0:v", "-map", "[aout]"])
else:
# 原视频没有音频,只用 AI 音频
if has_subtitles:
filter_complex = f"[0:v]{subtitle_filter}[vout]"
cmd.extend(["-filter_complex", filter_complex, "-map", "[vout]", "-map", "1:a"])
else:
cmd.extend(["-map", "0:v", "-map", "1:a"])
else:
# 没有 AI 音频
if has_subtitles:
cmd.extend(["-vf", subtitle_filter])
if has_original_audio:
cmd.extend(["-c:a", "copy"])
cmd.extend(["-c:v", "libx264", "-c:a", "aac", "-preset", "medium", "-crf", "23", output_video_path])
logger.info(f" Running ffmpeg: {' '.join(cmd[:6])}...")
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
logger.info(f" ✓ Video generated successfully")
except subprocess.CalledProcessError as e:
logger.error(f" ✗ ffmpeg failed!")
logger.error(f" stderr: {e.stderr}")
raise
finally:
if os.path.exists(srt_path):
os.remove(srt_path)
logger.info(f" Cleaned up: {srt_path}")
if ai_audio_path and os.path.exists(ai_audio_path):
os.remove(ai_audio_path)
logger.info(f" Cleaned up: {ai_audio_path}")
logger.info(f" ✓ Duplex video saved: {output_video_path}")
logger.info(f"=" * 60)
return output_video_path
def adjust_audio_length(audio_path: str, num_frames: int, output_path: str, sr: int = 16000) -> str:
"""
调整音频长度以匹配帧数
- 如果音频短了 → 补充静音
- 如果音频长了 → 截断
Args:
audio_path: 原始音频路径
num_frames: 帧数(1fps,所以帧数=秒数)
output_path: 输出音频路径
sr: 采样率
Returns:
调整后的音频路径
"""
import numpy as np
import soundfile as sf
# 读取原始音频
audio, orig_sr = sf.read(audio_path)
# 如果是立体声,转为单声道
if len(audio.shape) > 1:
audio = audio.mean(axis=1)
# 重采样(如果需要)
if orig_sr != sr:
# 简单的线性重采样
import scipy.signal as signal
audio = signal.resample(audio, int(len(audio) * sr / orig_sr))
# 目标长度(帧数 * 采样率)
target_length = num_frames * sr
current_length = len(audio)
if current_length < target_length:
# 音频短了,补充静音
padding = np.zeros(target_length - current_length)
audio = np.concatenate([audio, padding])
logger.info(f" 音频补充静音: {current_length / sr:.2f}s -> {target_length / sr:.2f}s")
elif current_length > target_length:
# 音频长了,截断
audio = audio[:target_length]
logger.info(f" 音频截断: {current_length / sr:.2f}s -> {target_length / sr:.2f}s")
else:
logger.info(f" 音频长度匹配: {current_length / sr:.2f}s")
# 保存调整后的音频
sf.write(output_path, audio, sr)
return output_path
def get_frames_and_audio(video_path, item_output_dir, sample_rate=16000):
frame_path = os.path.join(item_output_dir, "input_frames")
audio_path = os.path.join(item_output_dir, "input_audio.wav")
adjusted_audio_path = os.path.join(item_output_dir, "adjusted_audio.wav")
if not os.path.exists(frame_path) or not os.path.isfile(audio_path) or not os.path.exists(audio_path):
logger.info(f"No frames in {frame_path}, or audio files in {audio_path}, regenerated it")
os.makedirs(frame_path, exist_ok=True)
video_to_frames(video_path, frame_path, fps=1)
video_to_audio(video_path, audio_path, sr=sample_rate)
audio_array, _ = librosa.load(audio_path, sr=sample_rate, mono=True)
logger.info(f"Extracted audio ({len(audio_array) / sample_rate:.2f}s) to {audio_path}")
else:
audio_array, _ = librosa.load(audio_path, sr=sample_rate, mono=True)
frame_files = sorted([f for f in os.listdir(frame_path) if f.endswith(".jpg")])
num_frames = len(frame_files)
logger.info(f"get {num_frames} frames to {frame_path}")
logger.info(f"get audio ({len(audio_array) / sample_rate:.2f}s) to {audio_path}")
logger.info(f"adjust audio...")
audio_path = adjust_audio_length(audio_path, num_frames, adjusted_audio_path)
return frame_path, num_frames, audio_path