#!/usr/bin/env python3 import argparse import subprocess import os from datetime import datetime from playsound import playsound import time import re import shutil DEFAULT_DVD_DEVICE = "/dev/cdrom" TMP_DIR = "tmp" RIPPED_DIR = "ripped" LOGFILE = "rip.log" NOTIFICATION_SOUND = "bell.oga" SERIES_TITLE_REGEX = r"S\d+[ _]?E(\d+)(-(\d+))?$" MAX_FILENAME_LEN_IN_TMP = 25 # dvdbackup cuts name of output files at 33 chars WAIT_FOR_DEVICE_TIME_SECONDS = 3 def main(): program_start_time = time.time() args = parse_args() if args.type == "series": validate_series_title(args.title) chdir_to_script_dir() mkdirs() dst = os.path.join(RIPPED_DIR, args.type, args.title) if os.path.exists(dst): print( f"A {args.type} with this name has already been ripped. Are you sure you spelled the name right?" ) exit(1) if args.wait: wait_for_dev_to_exist(args) wait_for_dev_to_be_readable(args) success = rip_to_tmp_dir(args) program_execution_time_str = get_program_execution_time_str(program_start_time) if success: mv_ripped_from_tmp_to_ripped_dir(args) write_to_logfile(args, "Success") notify_ripping_success(args, program_execution_time_str) else: write_to_logfile(args, "FAILURE") notify_ripping_error(args, program_execution_time_str) print("Deleting tmp directory") delete_tmp_dir() def parse_args(): parser = argparse.ArgumentParser(description="Rip content of dvds") parser.add_argument( "type", choices=("movie", "series"), help="If the dvd contains one movie, or multiple episodes of a series", ) parser.add_argument( "title", help='The title of the movie. Series must end with "Sx Ex-x". E.g.: "Lost S01 E1-04"', ) parser.add_argument( "--dev", help=f"Dvd device to rip from. Defaults to {DEFAULT_DVD_DEVICE}", default=DEFAULT_DVD_DEVICE, ) parser.add_argument( "--wait", help=f"Wait for dvd device to exist.", action="store_true" ) return parser.parse_args() def validate_series_title(title): if not is_series_title_valid(title): print("Invalid series title!") exit(1) def is_series_title_valid(title): try: episode_matches = re.search(SERIES_TITLE_REGEX, title) if episode_matches is None: return False episode_from = int(episode_matches.group(1)) episode_to = episode_matches.group(3) if episode_to: episode_to = int(episode_to) else: episode_to = episode_from if episode_from <= episode_to: return True except: return False return False def chdir_to_script_dir(): os.chdir(os.path.dirname(__file__)) def mkdirs(): os.makedirs(TMP_DIR, exist_ok=True) os.makedirs(RIPPED_DIR, exist_ok=True) os.makedirs(os.path.join(RIPPED_DIR, "movie"), exist_ok=True) os.makedirs(os.path.join(RIPPED_DIR, "series"), exist_ok=True) def write_to_logfile(args, tag): date = datetime.now().strftime("%d.%m.%Y %H:%M:%S") log_line = f'{date} - {args.type} - "{args.title}" - {tag}' with open(LOGFILE, "a") as file: file.write(log_line + "\n") def wait_for_dev_to_exist(args): while not os.path.exists(args.dev): print( f"Device {args.dev} not found. Waiting {WAIT_FOR_DEVICE_TIME_SECONDS} sec..." ) time.sleep(WAIT_FOR_DEVICE_TIME_SECONDS) def wait_for_dev_to_be_readable(args): while not is_dvd_readable(args): print( f"Device {args.dev} found, but not readable. Waiting {WAIT_FOR_DEVICE_TIME_SECONDS} sec..." ) time.sleep(WAIT_FOR_DEVICE_TIME_SECONDS) def is_dvd_readable(args): command = create_info_command(args) proc = subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) return proc.returncode != 255 def rip_to_tmp_dir(args) -> bool: """Returns: success of command""" command = create_rip_command(args, TMP_DIR) proc = subprocess.run(command) return proc.returncode == 0 def create_info_command(args): return ["dvdbackup", "-i", args.dev, "-I"] def create_rip_command(args, dest): shortened_title = shorten_title(args) return [ "dvdbackup", "-v", "-p", "-i", args.dev, "-o", dest, "-M", "-n", shortened_title, ] def get_program_execution_time_str(program_start_time): program_execution_time_minutes = (time.time() - program_start_time) / 60.0 program_execution_time_minutes = max(0.0, program_execution_time_minutes) return f"{program_execution_time_minutes:.1f} minutes" def notify_ripping_success(args, program_execution_time_str): print(f"Success! Ripping took {program_execution_time_str}") send_notification( f'{args.type.capitalize()} "{args.title}" ripped successfully in {program_execution_time_str}!' ) playsound(NOTIFICATION_SOUND) def notify_ripping_error(args, program_execution_time_str): print(f"Ripping failure after {program_execution_time_str}") send_notification( f'Error ripping {args.type.capitalize()} "{args.title}" after {program_execution_time_str}!' ) playsound(NOTIFICATION_SOUND) def send_notification(text): subprocess.run( ["notify-send", text], shell=False, ) def mv_ripped_from_tmp_to_ripped_dir(args): shortened_title = shorten_title(args) src = os.path.join(TMP_DIR, shortened_title) dst = os.path.join(RIPPED_DIR, args.type, args.title) shutil.move(src, dst) def shorten_title(args): return args.title[-MAX_FILENAME_LEN_IN_TMP:] def delete_tmp_dir(): try: os.rmdir(os.path.join(TMP_DIR)) except FileNotFoundError: pass except OSError: print("Failed deleting due to OSError") if __name__ == "__main__": main()