#!/usr/bin/env python3 # Copyright (C) 2023 Julian Mutter (julian.mutter@comumail.de) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os import subprocess import shutil from datetime import datetime import json import re from dotenv import load_dotenv TMP_DIR = "tmp" RAW_DIR = "raw" TRANSCODED_DIR = "transcoded" LOGFILE = "transcode.log" load_dotenv() HANDBRAKE_PROCESS_NICE_LEVEL = int(os.environ["HANDBRAKE_PROCESS_NICE_LEVEL"]) HANDBRAKE_PRESET_MOVIE = os.environ["HANDBRAKE_PRESET_MOVIE"] HANDBRAKE_PRESET_SERIES = os.environ["HANDBRAKE_PRESET_SERIES"] HANDBRAKE_AUDIO_LANG_LIST = os.environ["HANDBRAKE_AUDIO_LANG_LIST"] OUT_VIDEO_FORMAT = os.environ["OUT_VIDEO_FORMAT"] MIN_EPISODES_DURATION_SECONDS = int(os.environ["MIN_EPISODES_DURATION_MINUTES"]) * 60 MAX_EPISODES_DURATION_SECONDS = int(os.environ["MAX_EPISODES_DURATION_MINUTES"]) * 60 TRANSCODE_TIMEOUT_SECONDS = int(os.environ["TRANSCODE_TIMEOUT_MINUTES"]) * 60 SERIES_TITLE_REGEX = r"S(\d+)[ _]?E(\d+)(-(\d+))?$" def main(): print("transcode_dvd started") chdir_to_script_dir() mkdirs() for dvd_type in ("movie", "series"): for dvd in filter(is_dvd_files_not_locked, list_ripped_dvds(dvd_type)): print(f"Transcoding {dvd}") delete_old_transcoding_logfile_if_exists(dvd) output_files = transcode_ripped_dvd(dvd) if output_files: mv_videos_from_tmp_to_transcoded_dir(output_files, dvd.dvd_type) write_to_logfile(dvd, "Success") delete_original_video_files(dvd) delete_transcoding_logfile(dvd) delete_titles_file_if_exists(dvd) delete_nodvdnav_file_if_exists(dvd) print("Success") else: write_to_logfile(dvd, "FAILURE") rename_transcoding_logfile_as_error_log(dvd) print(f"Failed. Please see logs at {dvd.err_log_file}") delete_tmp_dir() print("transcode_dvd finished") class Dvd: def __init__(self, dvd_type, dvd_path): self.dvd_type = dvd_type self.dvd_path = dvd_path self.dvd_title = os.path.basename(dvd_path) self.log_file = dvd_path + ".log" self.err_log_file = dvd_path + ".err.log" self.titles_file = dvd_path + ".titles" self.nodvdnav_file = dvd_path + ".nodvdnav" def __str__(self): return f"{self.dvd_type} {self.dvd_path}" def is_movie(self): return self.dvd_type == "movie" def is_series(self): return self.dvd_type == "series" def append_line_to_logfile(self, line): with open(self.log_file, "a") as log_file: log_file.write(line + "\n") def series_create_episode_title(self, series, episode): re_matches = re.search(SERIES_TITLE_REGEX, self.dvd_title) if re_matches is None: return None return self.dvd_title[: re_matches.start()] + f"S{series:02d}_E{episode:02d}" def series_get_season_episodes(self): re_matches = re.search(SERIES_TITLE_REGEX, self.dvd_title) if re_matches is None: return None series = int(re_matches.group(1)) episode_from = int(re_matches.group(2)) episode_to = re_matches.group(4) if episode_to: episode_to = int(episode_to) else: episode_to = episode_from if episode_from > episode_to: return None episodes = list(range(episode_from, episode_to + 1)) return (series, episodes) def transcode_command_args_without_title_number(self, output_file): handbrake_preset = HANDBRAKE_PRESET_MOVIE if self.is_series(): handbrake_preset = HANDBRAKE_PRESET_SERIES args = [ "nice", "-n", str(HANDBRAKE_PROCESS_NICE_LEVEL), "HandBrakeCLI", "--preset", handbrake_preset, "--input", self.dvd_path, "--output", output_file, ] if HANDBRAKE_AUDIO_LANG_LIST.strip() == "": args += ["--all-audio"] else: args += [ "--audio-lang-list", HANDBRAKE_AUDIO_LANG_LIST, "--first-audio"] if os.path.exists(self.nodvdnav_file): args += ["--no-dvdnav"] self.append_line_to_logfile( "Found .nodvdnav file! Running handbrake without libdvdnav..." ) return args def chdir_to_script_dir(): os.chdir(os.path.dirname(__file__)) def mkdirs(): os.makedirs(TMP_DIR, exist_ok=True) os.makedirs(TRANSCODED_DIR, exist_ok=True) os.makedirs(os.path.join(TRANSCODED_DIR, "movie"), exist_ok=True) os.makedirs(os.path.join(TRANSCODED_DIR, "series"), exist_ok=True) os.makedirs(RAW_DIR, exist_ok=True) os.makedirs(os.path.join(RAW_DIR, "movie"), exist_ok=True) os.makedirs(os.path.join(RAW_DIR, "series"), exist_ok=True) def list_ripped_dvds(dvd_type): path = os.path.join(RAW_DIR, dvd_type) try: dvd_titles = os.listdir(path) dvd_titles = filter(lambda title: not title.endswith(".lock"), dvd_titles) dvd_titles = filter(lambda title: not title.endswith(".err.log"), dvd_titles) dvd_titles = filter(lambda title: not title.endswith(".log"), dvd_titles) dvd_titles = filter(lambda title: not title.endswith(".titles"), dvd_titles) dvd_titles = filter(lambda title: not title.endswith(".nodvdnav"), dvd_titles) return map( lambda dvd_title: Dvd(dvd_type, os.path.join(path, dvd_title)), dvd_titles ) except FileNotFoundError: print( f"Directory {path} not found. Running the ripper script will generate it." ) exit(1) def is_dvd_files_not_locked(dvd: Dvd): return not os.path.exists(dvd.dvd_path + ".lock") and not os.path.exists( dvd.dvd_path + ".err.log" ) def transcode_ripped_dvd(dvd: Dvd): if dvd.is_movie(): return transcode_movie(dvd) else: return transcode_series(dvd) def transcode_movie(dvd: Dvd): video_file_name = dvd.dvd_title + OUT_VIDEO_FORMAT output_file = os.path.join(TMP_DIR, video_file_name) success = run_and_log_handbrake( dvd, dvd.transcode_command_args_without_title_number(output_file) + [ "--main-feature", ], ) if not success: return False else: return [output_file] def transcode_series(dvd: Dvd): title_numbers = find_series_titles(dvd) if not title_numbers: dvd.append_line_to_logfile( "Could not find any titles which could be episodes. Maybe you need to decrease the 'MIN_EPISODES_DURATION_SECONDS'?" ) return False season_episodes = dvd.series_get_season_episodes() if season_episodes is None: dvd.append_line_to_logfile(f"Dvd has non valid series name!! ({dvd})") return False (season, episodes) = season_episodes if len(episodes) != len(title_numbers): dvd.append_line_to_logfile( f"{dvd} should have {len(episodes)} episodes, but {len(title_numbers)} were found. Maybe you need to adjust 'MIN_EPISODES_DURATION_SECONDS'?" ) return False output_files = [] for i in range(len(episodes)): episode = episodes[i] title_number = title_numbers[i] output_file = transcode_episode(dvd, season, episode, title_number) if not output_file: return False else: output_files.append(output_file) return output_files def transcode_episode(dvd: Dvd, season, episode, title_number): episode_title = dvd.series_create_episode_title(season, episode) if episode_title is None: dvd.append_line_to_logfile(f"Dvd has non valid series name!! ({dvd})") return False print(f"Transcoding episode {episode_title}...") video_file_name = episode_title + OUT_VIDEO_FORMAT output_file = os.path.join(TMP_DIR, video_file_name) success = run_and_log_handbrake( dvd, dvd.transcode_command_args_without_title_number(output_file) + [ "--title", str(title_number), # "--min-duration", # str(MIN_EPISODES_DURATION_SECONDS), ], ) if not success: return False return output_file # Returns success def run_and_log_handbrake(dvd: Dvd, command) -> bool: with open(dvd.log_file, "a") as log_file: try: proc = subprocess.run( command, stderr=subprocess.STDOUT, stdout=log_file, timeout=TRANSCODE_TIMEOUT_SECONDS, ) if proc.returncode != 0: return False except subprocess.TimeoutExpired: log_file.write( f"Transcoding timeout of {TRANSCODE_TIMEOUT_SECONDS}s reached!" ) return False if check_handbrake_log_for_error(dvd): return False return True # Handbrake often exits with returncode 0 even though it logged an error and transcoding was not successful def check_handbrake_log_for_error(dvd: Dvd): with open(dvd.log_file, "r", encoding="utf-8", errors="ignore") as log_file: for line in log_file: if line.startswith("Encode failed"): return True return False def find_series_titles(dvd: Dvd): if os.path.exists(dvd.titles_file): return read_titles_from_titles_file(dvd) with open(dvd.log_file, "w") as log_file: try: proc = subprocess.run( [ "nice", "-n", str(HANDBRAKE_PROCESS_NICE_LEVEL), "HandBrakeCLI", "--input", dvd.dvd_path, "-t", "0", "--min-duration", str(MIN_EPISODES_DURATION_SECONDS), "--json", ], stdout=subprocess.PIPE, stderr=log_file, timeout=TRANSCODE_TIMEOUT_SECONDS, ) except subprocess.TimeoutExpired: log_file.write( f"Transcoding timeout of {TRANSCODE_TIMEOUT_SECONDS}s reached!" ) return None if proc.returncode != 0: return None stdout = proc.stdout.decode("utf-8", errors="ignore") json_str = "" in_json = False for line in stdout.splitlines(): if not in_json: if line == "JSON Title Set: {": in_json = True json_str = "{\n" else: json_str += line + "\n" if line == "}": break json_obj = json.loads(json_str) titles = json_obj["TitleList"] if dvd.is_series(): titles = filter(is_json_title_duration_not_too_long, titles) titles = list(map(lambda title: title["Index"], titles)) return titles def read_titles_from_titles_file(dvd: Dvd): titles = [] dvd.append_line_to_logfile("Found .titles file! Reading titles from there...") try: with open(dvd.titles_file, "r") as titles_file: for line in titles_file.readlines(): titles.append(int(line)) except: dvd.append_line_to_logfile( "Failed reading titles file! Please make sure it has the right format" ) return [] return titles def is_json_title_duration_not_too_long(title): duration = title["Duration"] duration_seconds = duration["Hours"] * 60 * 60 duration_seconds += duration["Minutes"] * 60 duration_seconds += duration["Seconds"] return duration_seconds <= MAX_EPISODES_DURATION_SECONDS def mv_videos_from_tmp_to_transcoded_dir(video_files, video_type): for video_file in video_files: src = video_file file_name = os.path.basename(video_file) dst = os.path.join(TRANSCODED_DIR, video_type, file_name) shutil.move(src, dst) def write_to_logfile(dvd: Dvd, tag): date = datetime.now().strftime("%d.%m.%Y %H:%M:%S") log_line = f'{date} - {dvd.dvd_type} - "{dvd.dvd_title}" - {tag}' with open(LOGFILE, "a") as file: file.write(log_line + "\n") def delete_transcoding_logfile(dvd: Dvd): os.remove(dvd.log_file) def delete_titles_file_if_exists(dvd: Dvd): if os.path.exists(dvd.titles_file): os.remove(dvd.titles_file) def delete_nodvdnav_file_if_exists(dvd: Dvd): if os.path.exists(dvd.nodvdnav_file): os.remove(dvd.nodvdnav_file) def rename_transcoding_logfile_as_error_log(dvd: Dvd): shutil.move(dvd.log_file, dvd.err_log_file) def delete_old_transcoding_logfile_if_exists(dvd: Dvd): try: os.remove(dvd.log_file) except FileNotFoundError: pass def delete_original_video_files(dvd: Dvd): shutil.rmtree(dvd.dvd_path) def delete_tmp_dir(): try: shutil.rmtree(TMP_DIR) except FileNotFoundError: pass if __name__ == "__main__": main()