|
|
import glob |
|
|
import itertools |
|
|
import os |
|
|
import random |
|
|
import gc |
|
|
import shutil |
|
|
from typing import List |
|
|
from loguru import logger |
|
|
from moviepy import ( |
|
|
AudioFileClip, |
|
|
ColorClip, |
|
|
CompositeAudioClip, |
|
|
CompositeVideoClip, |
|
|
ImageClip, |
|
|
TextClip, |
|
|
VideoFileClip, |
|
|
afx, |
|
|
concatenate_videoclips, |
|
|
) |
|
|
from moviepy.video.tools.subtitles import SubtitlesClip |
|
|
from PIL import ImageFont |
|
|
|
|
|
from app.models import const |
|
|
from app.models.schema import ( |
|
|
MaterialInfo, |
|
|
VideoAspect, |
|
|
VideoConcatMode, |
|
|
VideoParams, |
|
|
VideoTransitionMode, |
|
|
) |
|
|
from app.services.utils import video_effects |
|
|
from app.utils import utils |
|
|
|
|
|
class SubClippedVideoClip: |
|
|
def __init__(self, file_path, start_time=None, end_time=None, width=None, height=None, duration=None): |
|
|
self.file_path = file_path |
|
|
self.start_time = start_time |
|
|
self.end_time = end_time |
|
|
self.width = width |
|
|
self.height = height |
|
|
if duration is None: |
|
|
self.duration = end_time - start_time |
|
|
else: |
|
|
self.duration = duration |
|
|
|
|
|
def __str__(self): |
|
|
return f"SubClippedVideoClip(file_path={self.file_path}, start_time={self.start_time}, end_time={self.end_time}, duration={self.duration}, width={self.width}, height={self.height})" |
|
|
|
|
|
|
|
|
audio_codec = "aac" |
|
|
video_codec = "libx264" |
|
|
fps = 30 |
|
|
|
|
|
def close_clip(clip): |
|
|
if clip is None: |
|
|
return |
|
|
|
|
|
try: |
|
|
|
|
|
if hasattr(clip, 'reader') and clip.reader is not None: |
|
|
clip.reader.close() |
|
|
|
|
|
|
|
|
if hasattr(clip, 'audio') and clip.audio is not None: |
|
|
if hasattr(clip.audio, 'reader') and clip.audio.reader is not None: |
|
|
clip.audio.reader.close() |
|
|
del clip.audio |
|
|
|
|
|
|
|
|
if hasattr(clip, 'mask') and clip.mask is not None: |
|
|
if hasattr(clip.mask, 'reader') and clip.mask.reader is not None: |
|
|
clip.mask.reader.close() |
|
|
del clip.mask |
|
|
|
|
|
|
|
|
if hasattr(clip, 'clips') and clip.clips: |
|
|
for child_clip in clip.clips: |
|
|
if child_clip is not clip: |
|
|
close_clip(child_clip) |
|
|
|
|
|
|
|
|
if hasattr(clip, 'clips'): |
|
|
clip.clips = [] |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"failed to close clip: {str(e)}") |
|
|
|
|
|
del clip |
|
|
gc.collect() |
|
|
|
|
|
def delete_files(files: List[str] | str): |
|
|
if isinstance(files, str): |
|
|
files = [files] |
|
|
|
|
|
for file in files: |
|
|
try: |
|
|
os.remove(file) |
|
|
except: |
|
|
pass |
|
|
|
|
|
def get_bgm_file(bgm_type: str = "random", bgm_file: str = ""): |
|
|
if not bgm_type: |
|
|
return "" |
|
|
|
|
|
if bgm_file and os.path.exists(bgm_file): |
|
|
return bgm_file |
|
|
|
|
|
if bgm_type == "random": |
|
|
suffix = "*.mp3" |
|
|
song_dir = utils.song_dir() |
|
|
files = glob.glob(os.path.join(song_dir, suffix)) |
|
|
return random.choice(files) |
|
|
|
|
|
return "" |
|
|
|
|
|
|
|
|
def combine_videos( |
|
|
combined_video_path: str, |
|
|
video_paths: List[str], |
|
|
audio_file: str, |
|
|
video_aspect: VideoAspect = VideoAspect.portrait, |
|
|
video_concat_mode: VideoConcatMode = VideoConcatMode.random, |
|
|
video_transition_mode: VideoTransitionMode = None, |
|
|
max_clip_duration: int = 5, |
|
|
threads: int = 2, |
|
|
) -> str: |
|
|
audio_clip = AudioFileClip(audio_file) |
|
|
audio_duration = audio_clip.duration |
|
|
logger.info(f"audio duration: {audio_duration} seconds") |
|
|
|
|
|
req_dur = audio_duration / len(video_paths) |
|
|
req_dur = max_clip_duration |
|
|
logger.info(f"maximum clip duration: {req_dur} seconds") |
|
|
output_dir = os.path.dirname(combined_video_path) |
|
|
|
|
|
aspect = VideoAspect(video_aspect) |
|
|
video_width, video_height = aspect.to_resolution() |
|
|
|
|
|
processed_clips = [] |
|
|
subclipped_items = [] |
|
|
video_duration = 0 |
|
|
for video_path in video_paths: |
|
|
clip = VideoFileClip(video_path) |
|
|
clip_duration = clip.duration |
|
|
clip_w, clip_h = clip.size |
|
|
close_clip(clip) |
|
|
|
|
|
start_time = 0 |
|
|
|
|
|
while start_time < clip_duration: |
|
|
end_time = min(start_time + max_clip_duration, clip_duration) |
|
|
if clip_duration - start_time >= max_clip_duration: |
|
|
subclipped_items.append(SubClippedVideoClip(file_path= video_path, start_time=start_time, end_time=end_time, width=clip_w, height=clip_h)) |
|
|
start_time = end_time |
|
|
if video_concat_mode.value == VideoConcatMode.sequential.value: |
|
|
break |
|
|
|
|
|
|
|
|
if video_concat_mode.value == VideoConcatMode.random.value: |
|
|
random.shuffle(subclipped_items) |
|
|
|
|
|
logger.debug(f"total subclipped items: {len(subclipped_items)}") |
|
|
|
|
|
|
|
|
for i, subclipped_item in enumerate(subclipped_items): |
|
|
if video_duration > audio_duration: |
|
|
break |
|
|
|
|
|
logger.debug(f"processing clip {i+1}: {subclipped_item.width}x{subclipped_item.height}, current duration: {video_duration:.2f}s, remaining: {audio_duration - video_duration:.2f}s") |
|
|
|
|
|
try: |
|
|
clip = VideoFileClip(subclipped_item.file_path).subclipped(subclipped_item.start_time, subclipped_item.end_time) |
|
|
clip_duration = clip.duration |
|
|
|
|
|
clip_w, clip_h = clip.size |
|
|
if clip_w != video_width or clip_h != video_height: |
|
|
clip_ratio = clip.w / clip.h |
|
|
video_ratio = video_width / video_height |
|
|
logger.debug(f"resizing clip, source: {clip_w}x{clip_h}, ratio: {clip_ratio:.2f}, target: {video_width}x{video_height}, ratio: {video_ratio:.2f}") |
|
|
|
|
|
if clip_ratio == video_ratio: |
|
|
clip = clip.resized(new_size=(video_width, video_height)) |
|
|
else: |
|
|
if clip_ratio > video_ratio: |
|
|
scale_factor = video_width / clip_w |
|
|
else: |
|
|
scale_factor = video_height / clip_h |
|
|
|
|
|
new_width = int(clip_w * scale_factor) |
|
|
new_height = int(clip_h * scale_factor) |
|
|
|
|
|
background = ColorClip(size=(video_width, video_height), color=(0, 0, 0)).with_duration(clip_duration) |
|
|
clip_resized = clip.resized(new_size=(new_width, new_height)).with_position("center") |
|
|
clip = CompositeVideoClip([background, clip_resized]) |
|
|
|
|
|
shuffle_side = random.choice(["left", "right", "top", "bottom"]) |
|
|
if video_transition_mode.value == VideoTransitionMode.none.value: |
|
|
clip = clip |
|
|
elif video_transition_mode.value == VideoTransitionMode.fade_in.value: |
|
|
clip = video_effects.fadein_transition(clip, 1) |
|
|
elif video_transition_mode.value == VideoTransitionMode.fade_out.value: |
|
|
clip = video_effects.fadeout_transition(clip, 1) |
|
|
elif video_transition_mode.value == VideoTransitionMode.slide_in.value: |
|
|
clip = video_effects.slidein_transition(clip, 1, shuffle_side) |
|
|
elif video_transition_mode.value == VideoTransitionMode.slide_out.value: |
|
|
clip = video_effects.slideout_transition(clip, 1, shuffle_side) |
|
|
elif video_transition_mode.value == VideoTransitionMode.shuffle.value: |
|
|
transition_funcs = [ |
|
|
lambda c: video_effects.fadein_transition(c, 1), |
|
|
lambda c: video_effects.fadeout_transition(c, 1), |
|
|
lambda c: video_effects.slidein_transition(c, 1, shuffle_side), |
|
|
lambda c: video_effects.slideout_transition(c, 1, shuffle_side), |
|
|
] |
|
|
shuffle_transition = random.choice(transition_funcs) |
|
|
clip = shuffle_transition(clip) |
|
|
|
|
|
if clip.duration > max_clip_duration: |
|
|
clip = clip.subclipped(0, max_clip_duration) |
|
|
|
|
|
|
|
|
clip_file = f"{output_dir}/temp-clip-{i+1}.mp4" |
|
|
clip.write_videofile(clip_file, logger=None, fps=fps, codec=video_codec) |
|
|
|
|
|
close_clip(clip) |
|
|
|
|
|
processed_clips.append(SubClippedVideoClip(file_path=clip_file, duration=clip.duration, width=clip_w, height=clip_h)) |
|
|
video_duration += clip.duration |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"failed to process clip: {str(e)}") |
|
|
|
|
|
|
|
|
if video_duration < audio_duration: |
|
|
logger.warning(f"video duration ({video_duration:.2f}s) is shorter than audio duration ({audio_duration:.2f}s), looping clips to match audio length.") |
|
|
base_clips = processed_clips.copy() |
|
|
for clip in itertools.cycle(base_clips): |
|
|
if video_duration >= audio_duration: |
|
|
break |
|
|
processed_clips.append(clip) |
|
|
video_duration += clip.duration |
|
|
logger.info(f"video duration: {video_duration:.2f}s, audio duration: {audio_duration:.2f}s, looped {len(processed_clips)-len(base_clips)} clips") |
|
|
|
|
|
|
|
|
logger.info("starting clip merging process") |
|
|
if not processed_clips: |
|
|
logger.warning("no clips available for merging") |
|
|
return combined_video_path |
|
|
|
|
|
|
|
|
if len(processed_clips) == 1: |
|
|
logger.info("using single clip directly") |
|
|
shutil.copy(processed_clips[0].file_path, combined_video_path) |
|
|
delete_files(processed_clips) |
|
|
logger.info("video combining completed") |
|
|
return combined_video_path |
|
|
|
|
|
|
|
|
base_clip_path = processed_clips[0].file_path |
|
|
temp_merged_video = f"{output_dir}/temp-merged-video.mp4" |
|
|
temp_merged_next = f"{output_dir}/temp-merged-next.mp4" |
|
|
|
|
|
|
|
|
shutil.copy(base_clip_path, temp_merged_video) |
|
|
|
|
|
|
|
|
for i, clip in enumerate(processed_clips[1:], 1): |
|
|
logger.info(f"merging clip {i}/{len(processed_clips)-1}, duration: {clip.duration:.2f}s") |
|
|
|
|
|
try: |
|
|
|
|
|
base_clip = VideoFileClip(temp_merged_video) |
|
|
next_clip = VideoFileClip(clip.file_path) |
|
|
|
|
|
|
|
|
merged_clip = concatenate_videoclips([base_clip, next_clip]) |
|
|
|
|
|
|
|
|
merged_clip.write_videofile( |
|
|
filename=temp_merged_next, |
|
|
threads=threads, |
|
|
logger=None, |
|
|
temp_audiofile_path=output_dir, |
|
|
audio_codec=audio_codec, |
|
|
fps=fps, |
|
|
) |
|
|
close_clip(base_clip) |
|
|
close_clip(next_clip) |
|
|
close_clip(merged_clip) |
|
|
|
|
|
|
|
|
delete_files(temp_merged_video) |
|
|
os.rename(temp_merged_next, temp_merged_video) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"failed to merge clip: {str(e)}") |
|
|
continue |
|
|
|
|
|
|
|
|
os.rename(temp_merged_video, combined_video_path) |
|
|
|
|
|
|
|
|
clip_files = [clip.file_path for clip in processed_clips] |
|
|
delete_files(clip_files) |
|
|
|
|
|
logger.info("video combining completed") |
|
|
return combined_video_path |
|
|
|
|
|
|
|
|
def wrap_text(text, max_width, font="Arial", fontsize=60): |
|
|
|
|
|
font = ImageFont.truetype(font, fontsize) |
|
|
|
|
|
def get_text_size(inner_text): |
|
|
inner_text = inner_text.strip() |
|
|
left, top, right, bottom = font.getbbox(inner_text) |
|
|
return right - left, bottom - top |
|
|
|
|
|
width, height = get_text_size(text) |
|
|
if width <= max_width: |
|
|
return text, height |
|
|
|
|
|
processed = True |
|
|
|
|
|
_wrapped_lines_ = [] |
|
|
words = text.split(" ") |
|
|
_txt_ = "" |
|
|
for word in words: |
|
|
_before = _txt_ |
|
|
_txt_ += f"{word} " |
|
|
_width, _height = get_text_size(_txt_) |
|
|
if _width <= max_width: |
|
|
continue |
|
|
else: |
|
|
if _txt_.strip() == word.strip(): |
|
|
processed = False |
|
|
break |
|
|
_wrapped_lines_.append(_before) |
|
|
_txt_ = f"{word} " |
|
|
_wrapped_lines_.append(_txt_) |
|
|
if processed: |
|
|
_wrapped_lines_ = [line.strip() for line in _wrapped_lines_] |
|
|
result = "\n".join(_wrapped_lines_).strip() |
|
|
height = len(_wrapped_lines_) * height |
|
|
return result, height |
|
|
|
|
|
_wrapped_lines_ = [] |
|
|
chars = list(text) |
|
|
_txt_ = "" |
|
|
for word in chars: |
|
|
_txt_ += word |
|
|
_width, _height = get_text_size(_txt_) |
|
|
if _width <= max_width: |
|
|
continue |
|
|
else: |
|
|
_wrapped_lines_.append(_txt_) |
|
|
_txt_ = "" |
|
|
_wrapped_lines_.append(_txt_) |
|
|
result = "\n".join(_wrapped_lines_).strip() |
|
|
height = len(_wrapped_lines_) * height |
|
|
return result, height |
|
|
|
|
|
|
|
|
def generate_video( |
|
|
video_path: str, |
|
|
audio_path: str, |
|
|
subtitle_path: str, |
|
|
output_file: str, |
|
|
params: VideoParams, |
|
|
): |
|
|
aspect = VideoAspect(params.video_aspect) |
|
|
video_width, video_height = aspect.to_resolution() |
|
|
|
|
|
logger.info(f"generating video: {video_width} x {video_height}") |
|
|
logger.info(f" ① video: {video_path}") |
|
|
logger.info(f" ② audio: {audio_path}") |
|
|
logger.info(f" ③ subtitle: {subtitle_path}") |
|
|
logger.info(f" ④ output: {output_file}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
output_dir = os.path.dirname(output_file) |
|
|
|
|
|
font_path = "" |
|
|
if params.subtitle_enabled: |
|
|
if not params.font_name: |
|
|
params.font_name = "STHeitiMedium.ttc" |
|
|
font_path = os.path.join(utils.font_dir(), params.font_name) |
|
|
if os.name == "nt": |
|
|
font_path = font_path.replace("\\", "/") |
|
|
|
|
|
logger.info(f" ⑤ font: {font_path}") |
|
|
|
|
|
def create_text_clip(subtitle_item): |
|
|
params.font_size = int(params.font_size) |
|
|
params.stroke_width = int(params.stroke_width) |
|
|
phrase = subtitle_item[1] |
|
|
max_width = video_width * 0.9 |
|
|
wrapped_txt, txt_height = wrap_text( |
|
|
phrase, max_width=max_width, font=font_path, fontsize=params.font_size |
|
|
) |
|
|
interline = int(params.font_size * 0.25) |
|
|
size=(int(max_width), int(txt_height + params.font_size * 0.25 + (interline * (wrapped_txt.count("\n") + 1)))) |
|
|
|
|
|
_clip = TextClip( |
|
|
text=wrapped_txt, |
|
|
font=font_path, |
|
|
font_size=params.font_size, |
|
|
color=params.text_fore_color, |
|
|
bg_color=params.text_background_color, |
|
|
stroke_color=params.stroke_color, |
|
|
stroke_width=params.stroke_width, |
|
|
|
|
|
|
|
|
) |
|
|
duration = subtitle_item[0][1] - subtitle_item[0][0] |
|
|
_clip = _clip.with_start(subtitle_item[0][0]) |
|
|
_clip = _clip.with_end(subtitle_item[0][1]) |
|
|
_clip = _clip.with_duration(duration) |
|
|
if params.subtitle_position == "bottom": |
|
|
_clip = _clip.with_position(("center", video_height * 0.95 - _clip.h)) |
|
|
elif params.subtitle_position == "top": |
|
|
_clip = _clip.with_position(("center", video_height * 0.05)) |
|
|
elif params.subtitle_position == "custom": |
|
|
|
|
|
margin = 10 |
|
|
max_y = video_height - _clip.h - margin |
|
|
min_y = margin |
|
|
custom_y = (video_height - _clip.h) * (params.custom_position / 100) |
|
|
custom_y = max( |
|
|
min_y, min(custom_y, max_y) |
|
|
) |
|
|
_clip = _clip.with_position(("center", custom_y)) |
|
|
else: |
|
|
_clip = _clip.with_position(("center", "center")) |
|
|
return _clip |
|
|
|
|
|
video_clip = VideoFileClip(video_path).without_audio() |
|
|
audio_clip = AudioFileClip(audio_path).with_effects( |
|
|
[afx.MultiplyVolume(params.voice_volume)] |
|
|
) |
|
|
|
|
|
def make_textclip(text): |
|
|
return TextClip( |
|
|
text=text, |
|
|
font=font_path, |
|
|
font_size=params.font_size, |
|
|
) |
|
|
|
|
|
if subtitle_path and os.path.exists(subtitle_path): |
|
|
sub = SubtitlesClip( |
|
|
subtitles=subtitle_path, encoding="utf-8", make_textclip=make_textclip |
|
|
) |
|
|
text_clips = [] |
|
|
for item in sub.subtitles: |
|
|
clip = create_text_clip(subtitle_item=item) |
|
|
text_clips.append(clip) |
|
|
video_clip = CompositeVideoClip([video_clip, *text_clips]) |
|
|
|
|
|
bgm_file = get_bgm_file(bgm_type=params.bgm_type, bgm_file=params.bgm_file) |
|
|
if bgm_file: |
|
|
try: |
|
|
bgm_clip = AudioFileClip(bgm_file).with_effects( |
|
|
[ |
|
|
afx.MultiplyVolume(params.bgm_volume), |
|
|
afx.AudioFadeOut(3), |
|
|
afx.AudioLoop(duration=video_clip.duration), |
|
|
] |
|
|
) |
|
|
audio_clip = CompositeAudioClip([audio_clip, bgm_clip]) |
|
|
except Exception as e: |
|
|
logger.error(f"failed to add bgm: {str(e)}") |
|
|
|
|
|
video_clip = video_clip.with_audio(audio_clip) |
|
|
video_clip.write_videofile( |
|
|
output_file, |
|
|
audio_codec=audio_codec, |
|
|
temp_audiofile_path=output_dir, |
|
|
threads=params.n_threads or 2, |
|
|
logger=None, |
|
|
fps=fps, |
|
|
) |
|
|
video_clip.close() |
|
|
del video_clip |
|
|
|
|
|
|
|
|
def preprocess_video(materials: List[MaterialInfo], clip_duration=4): |
|
|
for material in materials: |
|
|
if not material.url: |
|
|
continue |
|
|
|
|
|
ext = utils.parse_extension(material.url) |
|
|
try: |
|
|
clip = VideoFileClip(material.url) |
|
|
except Exception: |
|
|
clip = ImageClip(material.url) |
|
|
|
|
|
width = clip.size[0] |
|
|
height = clip.size[1] |
|
|
if width < 480 or height < 480: |
|
|
logger.warning(f"low resolution material: {width}x{height}, minimum 480x480 required") |
|
|
continue |
|
|
|
|
|
if ext in const.FILE_TYPE_IMAGES: |
|
|
logger.info(f"processing image: {material.url}") |
|
|
|
|
|
clip = ( |
|
|
ImageClip(material.url) |
|
|
.with_duration(clip_duration) |
|
|
.with_position("center") |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
zoom_clip = clip.resized( |
|
|
lambda t: 1 + (clip_duration * 0.03) * (t / clip.duration) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
final_clip = CompositeVideoClip([zoom_clip]) |
|
|
|
|
|
|
|
|
video_file = f"{material.url}.mp4" |
|
|
final_clip.write_videofile(video_file, fps=30, logger=None) |
|
|
close_clip(clip) |
|
|
material.url = video_file |
|
|
logger.success(f"image processed: {video_file}") |
|
|
return materials |