dvd_rip/transcoder/transcode_dvd.py

301 lines
8.4 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import subprocess
import shutil
from datetime import datetime
import json
import re
TMP_DIR = "tmp"
RAW_DIR = "raw"
TRANSCODED_DIR = "transcoded"
LOGFILE = "transcode.log"
HANDBRAKE_PROCESS_NICE_LEVEL = 10
HANDBRAKE_PRESET = "General/HQ 1080p30 Surround"
HANDBRAKE_AUDIO_LANG_LIST = "de,en"
OUT_VIDEO_FORMAT = ".m4v"
MIN_EPISODES_DURATION_SECONDS = 10 * 60
SERIES_TITLE_REGEX = r"S(\d+)[ _]?E(\d+)-(\d+)$"
def main():
chdir_to_script_dir()
mkdirs()
for dvd_type in ("movie", "series"):
for dvd in filter(is_dvd_files_not_locked, list_ripped_dvds(dvd_type)):
print(f"Transcoding {dvd}")
output_files = transcode_ripped_dvd(dvd)
if output_files:
mv_videos_from_tmp_to_transcoded_dir(output_files, dvd.dvd_type)
write_to_logfile(dvd, "Success")
delete_original_video_files(dvd)
delete_transcoding_logfile(dvd)
print("Success")
else:
write_to_logfile(dvd, "FAILURE")
rename_transcoding_logfile_as_error_log(dvd)
print(f"Failed. Please see logs at {dvd.err_log_file}")
delete_tmp_dir()
class Dvd:
def __init__(self, dvd_type, dvd_path):
self.dvd_type = dvd_type
self.dvd_path = dvd_path
self.dvd_title = os.path.basename(dvd_path)
self.log_file = dvd_path + ".log"
self.err_log_file = dvd_path + ".err.log"
def __str__(self):
return f"{self.dvd_type} {self.dvd_path}"
def is_movie(self):
return self.dvd_type == "movie"
def is_series(self):
return self.dvd_type == "series"
def append_line_to_logfile(self, line):
with open(self.log_file, "a") as log_file:
log_file.write(line + "\n")
def series_create_episode_title(self, series, episode):
re_matches = re.search(SERIES_TITLE_REGEX, self.dvd_title)
if re_matches is None:
return None
return self.dvd_title[: re_matches.start()] + f"S{series:02d}_E{episode:02d}"
def series_get_season_episodes(self):
re_matches = re.search(SERIES_TITLE_REGEX, self.dvd_title)
if re_matches is None:
return None
series = int(re_matches.group(1))
episode_from = int(re_matches.group(2))
episode_to = int(re_matches.group(3))
if episode_from >= episode_to:
return None
episodes = list(range(episode_from, episode_to + 1))
return (series, episodes)
def transcode_command_args_without_title_number(self, output_file):
args = [
"nice",
"-n",
str(HANDBRAKE_PROCESS_NICE_LEVEL),
"HandBrakeCLI",
"--preset",
HANDBRAKE_PRESET,
"--first-audio",
"--audio-lang-list",
HANDBRAKE_AUDIO_LANG_LIST,
"--input",
self.dvd_path,
"--output",
output_file,
]
return args
def chdir_to_script_dir():
os.chdir(os.path.dirname(__file__))
def mkdirs():
os.makedirs(TMP_DIR, exist_ok=True)
os.makedirs(TRANSCODED_DIR, exist_ok=True)
os.makedirs(RAW_DIR, exist_ok=True)
os.makedirs(os.path.join(RAW_DIR, "movie"), exist_ok=True)
os.makedirs(os.path.join(RAW_DIR, "series"), exist_ok=True)
def list_ripped_dvds(dvd_type):
path = os.path.join(RAW_DIR, dvd_type)
try:
dvd_titles = os.listdir(path)
dvd_titles = filter(lambda title: not title.endswith(".lock"), dvd_titles)
dvd_titles = filter(lambda title: not title.endswith(".err.log"), dvd_titles)
dvd_titles = filter(lambda title: not title.endswith(".log"), dvd_titles)
return map(
lambda dvd_title: Dvd(dvd_type, os.path.join(path, dvd_title)), dvd_titles
)
except FileNotFoundError:
print(
f"Directory {path} not found. Running the ripper script will generate it."
)
exit(1)
def is_dvd_files_not_locked(dvd):
return not os.path.exists(dvd + ".lock") and not os.path.exists(dvd + ".err.log")
def transcode_ripped_dvd(dvd):
if dvd.is_movie():
return transcode_movie(dvd)
else:
return transcode_series(dvd)
def transcode_movie(dvd: Dvd):
video_file_name = dvd.dvd_title + OUT_VIDEO_FORMAT
output_file = os.path.join(TMP_DIR, video_file_name)
with open(dvd.log_file, "w") as log_file:
proc = subprocess.run(
dvd.transcode_command_args_without_title_number(output_file)
+ [
"--main-feature",
],
stderr=subprocess.STDOUT,
stdout=log_file,
)
if proc.returncode != 0:
return False
else:
return [output_file]
def transcode_series(dvd: Dvd):
title_numbers = find_series_titles(dvd)
if not title_numbers:
dvd.append_line_to_logfile(
"Could not find any titles which could be episodes. Maybe you need to decrease the 'MIN_EPISODES_DURATION_SECONDS'?"
)
return False
season_episodes = dvd.series_get_season_episodes()
if season_episodes is None:
dvd.append_line_to_logfile(f"Dvd has non valid series name!! ({dvd})")
return False
(season, episodes) = season_episodes
if len(episodes) != len(title_numbers):
dvd.append_line_to_logfile(
f"{dvd} should have {len(episodes)} episodes, but handbrake found {len(title_numbers)}. Maybe you need to adjust 'MIN_EPISODES_DURATION_SECONDS'?"
)
return False
output_files = []
for i in range(len(episodes)):
episode = episodes[i]
title_number = title_numbers[i]
episode_title = dvd.series_create_episode_title(season, episode)
if episode_title is None:
dvd.append_line_to_logfile(f"Dvd has non valid series name!! ({dvd})")
return False
video_file_name = episode_title + OUT_VIDEO_FORMAT
output_file = os.path.join(TMP_DIR, video_file_name)
output_files.append(output_file)
with open(dvd.log_file, "w") as log_file:
proc = subprocess.run(
dvd.transcode_command_args_without_title_number(output_file)
+ [
"--title",
title_number,
"--min-duration",
str(MIN_EPISODES_DURATION_SECONDS),
],
stderr=subprocess.STDOUT,
stdout=log_file,
)
if proc.returncode != 0:
return False
return output_files
def find_series_titles(dvd: Dvd):
with open(dvd.log_file, "w") as log_file:
proc = subprocess.run(
[
"nice",
"-n",
str(HANDBRAKE_PROCESS_NICE_LEVEL),
"HandBrakeCLI",
"--input",
dvd.dvd_path,
"-t",
"0",
"--min-duration",
str(MIN_EPISODES_DURATION_SECONDS),
"--json",
],
stdout=subprocess.PIPE,
stderr=log_file,
)
if proc.returncode != 0:
return None
stdout = proc.stdout.decode("utf-8")
json_str = ""
in_json = False
for line in stdout.splitlines():
if not in_json:
if line == "JSON Title Set: {":
in_json = True
json_str = "{\n"
else:
json_str += line + "\n"
if line == "}":
break
json_obj = json.loads(json_str)
titles = list(map(lambda title: title["Index"], json_obj["TitleList"]))
return titles
def mv_videos_from_tmp_to_transcoded_dir(video_files, video_type):
for video_file in video_files:
src = os.path.join(TMP_DIR, video_file)
dst = os.path.join(TRANSCODED_DIR, video_type)
shutil.move(src, dst)
def write_to_logfile(dvd: Dvd, tag):
date = datetime.now().strftime("%d.%m.%Y %H:%M:%S")
log_line = f'{date} - {dvd.dvd_type} - "{dvd.dvd_title}" - {tag}'
with open(LOGFILE, "a") as file:
file.write(log_line + "\n")
def delete_transcoding_logfile(dvd):
os.remove(dvd + ".log")
def rename_transcoding_logfile_as_error_log(dvd):
shutil.move(dvd + ".log", dvd + ".err.log")
def delete_original_video_files(folder):
os.rmdir(folder)
def delete_tmp_dir():
try:
os.rmdir(os.path.join(TMP_DIR))
except FileNotFoundError:
pass
if __name__ == "__main__":
main()