dvd_rip/ripper/rip_dvd.py

189 lines
5.1 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import subprocess
import os
from datetime import datetime
from playsound import playsound
import time
import re
import shutil
DEFAULT_DVD_DEVICE = "/dev/cdrom"
TMP_DIR = "tmp"
RIPPED_DIR = "ripped"
LOGFILE = "rip.log"
NOTIFICATION_SOUND = "bell.oga"
SERIES_TITLE_REGEX = r"S\d+[ _]?E(\d+)-(\d+)$"
MAX_FILENAME_LEN_IN_TMP = 30 # dvdbackup cuts name of output files at 33 chars
WAIT_FOR_DEVICE_TIME_SECONDS = 1
def main():
program_start_time = time.time()
args = parse_args()
if args.type == "series":
validate_series_title(args.title)
chdir_to_script_dir()
mkdirs()
dst = os.path.join(RIPPED_DIR, args.type, args.title)
if os.path.exists(dst):
print(
f"A {args.type} with this name has already been ripped. Are you sure you spelled the name right?"
)
exit(1)
if args.wait:
wait_for_dev_to_exist(args)
success = rip_to_tmp_dir(args)
program_execution_time_str = get_program_execution_time_str(program_start_time)
if success:
write_to_logfile(args, "Success")
notify_ripping_success(args, program_execution_time_str)
mv_ripped_from_tmp_to_ripped_dir(args)
else:
write_to_logfile(args, "FAILURE")
notify_ripping_error(args, program_execution_time_str)
print("Deleting tmp directory")
delete_tmp_dir()
def parse_args():
parser = argparse.ArgumentParser(description="Rip content of dvds")
parser.add_argument(
"type",
choices=("movie", "series"),
help="If the dvd contains one movie, or multiple episodes of a series",
)
parser.add_argument(
"title",
help='The title of the movie. Series must end with "Sx Ex-x". E.g.: "Lost S01 E1-04"',
)
parser.add_argument(
"--dev",
help=f"Dvd device to rip from. Defaults to {DEFAULT_DVD_DEVICE}",
default=DEFAULT_DVD_DEVICE,
)
parser.add_argument(
"--wait", help=f"Wait for dvd device to exist.", action="store_true"
)
return parser.parse_args()
def validate_series_title(title):
if not is_series_title_valid(title):
print("Invalid series title!")
exit(1)
def is_series_title_valid(title):
try:
episode_matches = re.search(SERIES_TITLE_REGEX, title)
if episode_matches is None:
return False
episode_from = int(episode_matches.group(1))
episode_to = int(episode_matches.group(2))
if episode_from < episode_to:
return True
except:
return False
return False
def chdir_to_script_dir():
os.chdir(os.path.dirname(__file__))
def mkdirs():
os.makedirs(TMP_DIR, exist_ok=True)
os.makedirs(RIPPED_DIR, exist_ok=True)
os.makedirs(os.path.join(RIPPED_DIR, "movie"), exist_ok=True)
os.makedirs(os.path.join(RIPPED_DIR, "series"), exist_ok=True)
def write_to_logfile(args, tag):
date = datetime.now().strftime("%d.%m.%Y %H:%M:%S")
log_line = f'{date} - {args.type} - "{args.title}" - {tag}'
with open(LOGFILE, "a") as file:
file.write(log_line + "\n")
def wait_for_dev_to_exist(args):
while not os.path.exists(args.dev):
print(
f"Device {args.dev} not found. Waiting {WAIT_FOR_DEVICE_TIME_SECONDS} sec..."
)
time.sleep(WAIT_FOR_DEVICE_TIME_SECONDS)
def rip_to_tmp_dir(args) -> bool:
"""Returns: success of command"""
command = create_rip_command(args, TMP_DIR)
proc = subprocess.run(command, shell=True, capture_output=False)
return proc.returncode == 0
def create_rip_command(args, dest):
shortened_title = args.title[:MAX_FILENAME_LEN_IN_TMP]
return f"dvdbackup -v -p -i '{args.dev}' -o '{dest}' -M -n \"{shortened_title}\""
def get_program_execution_time_str(program_start_time):
program_execution_time_minutes = (time.time() - program_start_time) / 60.0
program_execution_time_minutes = max(0.0, program_execution_time_minutes)
return f"{program_execution_time_minutes:.1f} minutes"
def notify_ripping_success(args, program_execution_time_str):
print(f"Success! Ripping took {program_execution_time_str}")
send_notification(
f'{args.type.capitalize()} "{args.title}" ripped successfully in {program_execution_time_str}!'
)
playsound(NOTIFICATION_SOUND)
def notify_ripping_error(args, program_execution_time_str):
print(f"Ripping failure after {program_execution_time_str}")
send_notification(
f'Error ripping {args.type.capitalize()} "{args.title}" after {program_execution_time_str}!'
)
playsound(NOTIFICATION_SOUND)
def send_notification(text):
subprocess.run(
["notify-send", text],
shell=False,
)
def mv_ripped_from_tmp_to_ripped_dir(args):
shortened_title = args.title[:MAX_FILENAME_LEN_IN_TMP]
src = os.path.join(TMP_DIR, shortened_title)
dst = os.path.join(RIPPED_DIR, args.type, args.title)
shutil.move(src, dst)
def delete_tmp_dir():
try:
os.rmdir(os.path.join(TMP_DIR))
except FileNotFoundError:
pass
except OSError:
print("Failed deleting due to OSError")
if __name__ == "__main__":
main()