Skip to content

austin-lai/Remove_MP3Tag_with_Python

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

21 Commits
 
 
 
 
 
 

Repository files navigation

Remove MP3Tag with python

> Austin.Lai |
> -----------| October 05th, 2024
> -----------| Updated on April 11th, 2026

Table of Contents


Disclaimer

DISCLAIMER:

This project/repository is provided "as is" and without warranty of any kind, express or implied, including but not limited to the warranties of merchantability, fitness for a particular purpose and noninfringement. In no event shall the authors or copyright holders be liable for any claim, damages or other liability, whether in an action of contract, tort or otherwise, arising from, out of or in connection with the software or the use or other dealings in the software.

This project/repository is for Educational purpose ONLY. Do not use it without permission. The usual disclaimer applies, especially the fact that me (Austin) is not liable for any damages caused by direct or indirect use of the information or functionality provided by these programs. The author or any Internet provider bears NO responsibility for content or misuse of these programs or any derivatives thereof. By using these programs you accept the fact that any damage (data loss, system crash, system compromise, etc.) caused by the use of these programs is not Austin responsibility.


Description

A small Python project designed to streamline audio file metadata by removing specific fields from ID3v2.4 and APEv2 tags within audio files.

Note

Features:

  • Targeted environment: Windows, *Nix
  • Support ID3 and APEv2 tags
  • Support FLAC, MP3, WMA and M4A with AAC (Advanced Audio Codec) format files.
  • Python package: mutagen, colorama.
  • Search audio file metadata by keywords.
  • There are two set of list: keywords - the search will search through anything specify inside this list, not_keywords - the search will exclude anything specify inside this list.
  • Once keywords that are specified in the keywords list, it will display the result to user and prompt the user to enter 'yes' or 'y' (case-insensitive) to remove the fields out of the audio files or press 'Enter' to skip.
  • Save the search result into a json file located at Desktop.
  • Handle script termination when Ctrl+C is pressed and save results before exiting.
  • Skip cover art and acoustid fingerprint field.
  • Show total files and folders that have been processed.

This project is ideal for anyone looking to simplify or automate audio files metadata management.

Important

Please change the configuration accordingly to suits your environment.


remove-mp3tag-with-python-v2.0

The remove-mp3tag-with-python-v2.0.py file can be found here or below:

Click here to expand for the "remove-mp3tag-with-python-v2.0.py" !!!
#!/usr/bin/env python3

from __future__ import annotations

import argparse
import importlib
import json
import os
import signal
import subprocess
import sys
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Optional


APP_NAME = "mp3tag-cleaner-v2.0"
CONFIG_FILE_NAME = f"{APP_NAME}-config.json"
SUPPORTED_EXTENSIONS = (".mp3", ".m4a", ".flac", ".wma")

DEFAULT_DIRECTORY = str(Path.home() / "Desktop" / "New folder")
DEFAULT_KEYWORDS = [
    "https://www.",
    "http://www.",
]
DEFAULT_EXCLUDED_KEYWORDS = [
    "deezer",
    "open.spotify",
    "lame",
    "discogs",
    "GENIE",
    "pmedia_music",
    "music.apple",
    "bandcamp",
    "beatsource",
    "YOUNG-LUV.COM",
    "amazon",
    "beatport",
    "junodownload",
    "WWW.APPLE.COM",
    "musicbrainz.org",
    "melon.com",
    "www.youtube.com",
]

SKIP_KEY_PATTERNS = (
    "cover art",
    "cover",
    "covr",
    "apic",
    "picture",
    "wm/picture",
    "metadata_block_picture",
    "acoustid fingerprint",
    "acoustid_fingerprint",
    "spotify_release_id",
    "spotify_track_id",
)
TEXTUAL_SKIP_KEY_PATTERNS = (
    "lyrics",
    "lyr",
    "unsyncedlyrics",
)

REQUIRED_IMPORTS = {"mutagen": "mutagen"}
OPTIONAL_IMPORTS = {"colorama": "colorama"}

CURRENT_SESSION: Optional["RunSession"] = None
PALETTE = {
    "info": "",
    "ok": "",
    "warn": "",
    "error": "",
    "match": "",
    "dim": "",
    "reset": "",
}


@dataclass
class OpenedContainer:
    name: str
    handle: Any
    readable: bool = True
    issue: Optional[str] = None
    delete_mode: Optional[str] = None


@dataclass
class TagMatch:
    container: str
    key: str
    value: str
    matched_keywords: list[str]


@dataclass
class FileScanResult:
    file: str
    matches: list[TagMatch] = field(default_factory=list)
    removed_targets: list[str] = field(default_factory=list)
    skipped: bool = False
    errors: list[str] = field(default_factory=list)


@dataclass
class RunSession:
    result_file: Path
    results: list[FileScanResult] = field(default_factory=list)
    folders_processed: int = 0
    files_processed: int = 0
    matches_found: int = 0


def colorize(kind: str, message: str) -> str:
    prefix = PALETTE.get(kind, "")
    suffix = PALETTE.get("reset", "") if prefix else ""
    return f"{prefix}{message}{suffix}"


def print_info(message: str) -> None:
    print(colorize("info", message))


def print_ok(message: str) -> None:
    print(colorize("ok", message))


def print_warn(message: str) -> None:
    print(colorize("warn", message))


def print_error(message: str) -> None:
    print(colorize("error", message))


def print_match(message: str) -> None:
    print(colorize("match", message))


def safe_input(prompt: str) -> Optional[str]:
    try:
        return input(prompt)
    except EOFError:
        return None
    except OSError:
        return None


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Scan audio tags for suspicious text and remove matched tag fields."
    )
    parser.add_argument(
        "--directory",
        default=None,
        help="Directory to scan. When omitted, the saved config value is used.",
    )
    parser.add_argument(
        "--config",
        default=None,
        help="Path to the persistent JSON config file.",
    )
    parser.add_argument(
        "--results-dir",
        default=str(Path.home() / "Desktop"),
        help="Folder where the JSON report will be written.",
    )
    parser.add_argument(
        "--install-deps",
        action="store_true",
        help="Attempt to install missing dependencies before running.",
    )
    parser.add_argument(
        "--assume-yes",
        action="store_true",
        help="Skip interactive setup prompts and use defaults or CLI values.",
    )
    parser.add_argument(
        "--scan-only",
        action="store_true",
        help="Scan and report matches but never delete anything.",
    )
    parser.add_argument(
        "--auto-remove",
        action="store_true",
        help="Remove matched tags without asking for file-by-file confirmation.",
    )
    parser.add_argument(
        "--no-color",
        action="store_true",
        help="Disable colored terminal output.",
    )
    return parser.parse_args()


def build_install_command(script_dir: Path) -> list[str]:
    requirements_file = script_dir / "requirements-mp3tag-cleaner-v2.txt"
    if requirements_file.exists():
        return [sys.executable, "-m", "pip", "install", "-r", str(requirements_file)]
    return [sys.executable, "-m", "pip", "install", "mutagen", "colorama"]


def default_config() -> dict[str, Any]:
    return {
        "directory": DEFAULT_DIRECTORY,
        "keywords": dedupe_casefold(DEFAULT_KEYWORDS),
        "excluded_keywords": dedupe_casefold(DEFAULT_EXCLUDED_KEYWORDS),
    }


def ensure_dependencies(
    script_dir: Path,
    auto_install: bool,
    assume_yes: bool,
) -> bool:
    missing_required = []
    missing_optional = []

    for import_name, package_name in REQUIRED_IMPORTS.items():
        try:
            importlib.import_module(import_name)
        except ImportError:
            missing_required.append((import_name, package_name))

    for import_name, package_name in OPTIONAL_IMPORTS.items():
        try:
            importlib.import_module(import_name)
        except ImportError:
            missing_optional.append((import_name, package_name))

    if not missing_required and not missing_optional:
        return True

    install_cmd = build_install_command(script_dir)
    install_text = subprocess.list2cmdline(install_cmd)

    if missing_required:
        required_names = ", ".join(package for _, package in missing_required)
        print_warn(f"Missing required module(s): {required_names}")
    if missing_optional:
        optional_names = ", ".join(package for _, package in missing_optional)
        print_warn(f"Missing optional module(s): {optional_names}")

    print_info(f"Install with: {install_text}")

    should_install = auto_install
    if not should_install and not assume_yes:
        answer = safe_input("Install missing modules now? [y/N]: ")
        if answer is None:
            answer = ""
        answer = answer.strip().lower()
        should_install = answer in {"y", "yes"}

    if should_install:
        try:
            subprocess.check_call(install_cmd)
        except subprocess.CalledProcessError as exc:
            print_error(f"Dependency installation failed with exit code {exc.returncode}.")
            return False
        except Exception as exc:
            print_error(f"Dependency installation failed: {exc}")
            return False

        for import_name, _ in (*missing_required, *missing_optional):
            importlib.invalidate_caches()
            try:
                importlib.import_module(import_name)
            except ImportError as exc:
                print_error(f"Module still unavailable after install: {import_name} ({exc})")
                return False

        print_ok("Dependencies installed successfully.")
        return True

    if missing_required:
        print_error("The script cannot continue without the required modules above.")
        return False

    return True


def setup_colors(disable_color: bool) -> None:
    global PALETTE

    if disable_color:
        return

    try:
        colorama = importlib.import_module("colorama")
    except ImportError:
        return

    colorama.init(autoreset=True)
    fore = colorama.Fore
    back = colorama.Back
    style = colorama.Style
    PALETTE = {
        "info": back.BLACK + fore.WHITE,
        "ok": back.BLACK + fore.GREEN,
        "warn": back.BLACK + fore.LIGHTYELLOW_EX,
        "error": back.RED + fore.WHITE,
        "match": back.BLACK + fore.LIGHTRED_EX,
        "dim": style.DIM + fore.WHITE,
        "reset": style.RESET_ALL,
    }


def load_mutagen_modules() -> dict[str, Any]:
    id3_mod = importlib.import_module("mutagen.id3")
    ape_mod = importlib.import_module("mutagen.apev2")
    mp4_mod = importlib.import_module("mutagen.mp4")
    flac_mod = importlib.import_module("mutagen.flac")
    asf_mod = importlib.import_module("mutagen.asf")

    return {
        "ID3": id3_mod.ID3,
        "ID3NoHeaderError": id3_mod.ID3NoHeaderError,
        "APEv2": ape_mod.APEv2,
        "APENoHeaderError": ape_mod.APENoHeaderError,
        "APEBadItemError": ape_mod.APEBadItemError,
        "APEDeleteBytes": ape_mod.delete_bytes,
        "APEv2Data": ape_mod._APEv2Data,
        "MP4": mp4_mod.MP4,
        "MP4Tags": mp4_mod.MP4Tags,
        "FLAC": flac_mod.FLAC,
        "ASF": asf_mod.ASF,
    }


def prompt_text(label: str, default: str, assume_yes: bool) -> str:
    if assume_yes:
        return default
    answer = safe_input(f"{label} [{default}]: ")
    if answer is None:
        return default
    answer = answer.strip()
    return answer or default


def parse_csv(text: str) -> list[str]:
    return [item.strip() for item in text.split(",") if item.strip()]


def dedupe_casefold(values: list[str]) -> list[str]:
    result: list[str] = []
    seen: set[str] = set()
    for value in values:
        key = value.casefold()
        if key in seen:
            continue
        seen.add(key)
        result.append(value)
    return result


def get_config_path(args: argparse.Namespace, script_dir: Path) -> Path:
    if args.config:
        return Path(args.config).expanduser()
    return script_dir / CONFIG_FILE_NAME


def normalize_string_list(value: Any) -> list[str]:
    if not isinstance(value, list):
        return []
    return dedupe_casefold([str(item).strip() for item in value if str(item).strip()])


def load_persistent_config(config_path: Path) -> dict[str, Any]:
    config = default_config()
    if not config_path.exists():
        return config

    try:
        with config_path.open("r", encoding="utf-8") as handle:
            loaded = json.load(handle)
    except Exception as exc:
        print_warn(f"Unable to read config file {config_path}: {exc}")
        return config

    if not isinstance(loaded, dict):
        print_warn(f"Config file {config_path} is not a JSON object. Using defaults.")
        return config

    directory = loaded.get("directory")
    if isinstance(directory, str) and directory.strip():
        config["directory"] = directory.strip()

    keywords = normalize_string_list(loaded.get("keywords"))
    if keywords:
        config["keywords"] = keywords

    excluded_keywords = normalize_string_list(
        loaded.get("excluded_keywords", loaded.get("not_keywords"))
    )
    if excluded_keywords:
        config["excluded_keywords"] = excluded_keywords

    return config


def save_persistent_config(
    config_path: Path,
    directory: Path,
    keywords: list[str],
    excluded_keywords: list[str],
) -> None:
    payload = {
        "directory": str(directory),
        "keywords": dedupe_casefold(keywords),
        "excluded_keywords": dedupe_casefold(excluded_keywords),
    }
    config_path.parent.mkdir(parents=True, exist_ok=True)
    with config_path.open("w", encoding="utf-8") as handle:
        json.dump(payload, handle, ensure_ascii=False, indent=2)


def diff_lists(old_values: list[str], new_values: list[str]) -> tuple[list[str], list[str]]:
    old_lookup = {value.casefold(): value for value in old_values}
    new_lookup = {value.casefold(): value for value in new_values}

    added = [new_lookup[key] for key in new_lookup.keys() - old_lookup.keys()]
    removed = [old_lookup[key] for key in old_lookup.keys() - new_lookup.keys()]
    return sorted(added, key=str.casefold), sorted(removed, key=str.casefold)


def show_config_changes(
    previous_directory: Path,
    previous_keywords: list[str],
    previous_excluded_keywords: list[str],
    new_directory: Path,
    new_keywords: list[str],
    new_excluded_keywords: list[str],
) -> bool:
    changed = False

    if previous_directory != new_directory:
        changed = True
        print_ok("Saved directory update:")
        print_ok(f"  Old: {previous_directory}")
        print_match(f"  New: {new_directory}")

    added_keywords, removed_keywords = diff_lists(previous_keywords, new_keywords)
    if added_keywords or removed_keywords:
        changed = True
        print_ok("Saved keyword updates:")
        if added_keywords:
            print_match(f"  Added keywords: {added_keywords}")
        if removed_keywords:
            print_warn(f"  Removed keywords: {removed_keywords}")

    added_excluded, removed_excluded = diff_lists(
        previous_excluded_keywords,
        new_excluded_keywords,
    )
    if added_excluded or removed_excluded:
        changed = True
        print_ok("Saved excluded-keyword updates:")
        if added_excluded:
            print_match(f"  Added excluded keywords: {added_excluded}")
        if removed_excluded:
            print_warn(f"  Removed excluded keywords: {removed_excluded}")

    if not changed:
        print_info("Persistent scan settings unchanged.")

    return changed


def prompt_list_edits(name: str, values: list[str], assume_yes: bool) -> list[str]:
    current = dedupe_casefold(list(values))
    if assume_yes:
        return current

    print_info(f"Current {name}: {current}")

    add_text = safe_input(f"Add {name} (comma separated, Enter to skip): ")
    if add_text is None:
        return current
    add_text = add_text.strip()
    if add_text:
        current.extend(parse_csv(add_text))
        current = dedupe_casefold(current)

    remove_text = safe_input(f"Remove {name} (comma separated, Enter to skip): ")
    if remove_text is None:
        return current
    remove_text = remove_text.strip()
    if remove_text:
        remove_set = {item.casefold() for item in parse_csv(remove_text)}
        current = [item for item in current if item.casefold() not in remove_set]

    return current


def show_saved_config_summary(
    directory: Path,
    keywords: list[str],
    excluded_keywords: list[str],
) -> None:
    print_info("Current saved scan settings:")
    print_match(f"  Directory: {directory}")
    print_match(f"  Keywords ({len(keywords)}): {keywords}")
    print_match(f"  Excluded keywords ({len(excluded_keywords)}): {excluded_keywords}")


def show_list_edit_summary(label: str, old_values: list[str], new_values: list[str]) -> None:
    added, removed = diff_lists(old_values, new_values)
    if not added and not removed:
        print_info(f"No changes made to {label}.")
        return

    print_ok(f"Updated {label}:")
    if added:
        print_match(f"  Added: {added}")
    if removed:
        print_warn(f"  Removed: {removed}")


def edit_directory_interactively(directory: Path) -> Path:
    updated = Path(prompt_text("Directory to scan", str(directory), False)).expanduser()
    if updated == directory:
        print_info("Directory unchanged.")
        return directory

    print_ok("Updated scan directory:")
    print_warn(f"  Old: {directory}")
    print_match(f"  New: {updated}")
    return updated


def run_interactive_setup_menu(
    directory: Path,
    keywords: list[str],
    excluded_keywords: list[str],
    assume_yes: bool,
) -> tuple[Path, list[str], list[str]]:
    if assume_yes:
        return directory, keywords, excluded_keywords

    while True:
        print()
        print_info("Setup menu:")
        print_match("  1. View saved settings")
        print_match("  2. Edit scan directory")
        print_match("  3. Edit keywords")
        print_match("  4. Edit excluded keywords")
        print_match("  5. Start scan")

        choice = safe_input("Choose an option [1-5]: ")
        if choice is None:
            print_warn("Interactive input unavailable. Starting scan with saved settings.")
            return directory, keywords, excluded_keywords
        choice = choice.strip()

        if choice == "1":
            show_saved_config_summary(directory, keywords, excluded_keywords)
        elif choice == "2":
            directory = edit_directory_interactively(directory)
        elif choice == "3":
            previous_keywords = list(keywords)
            keywords = prompt_list_edits("keywords", keywords, False)
            show_list_edit_summary("keywords", previous_keywords, keywords)
        elif choice == "4":
            previous_excluded = list(excluded_keywords)
            excluded_keywords = prompt_list_edits("exclude keywords", excluded_keywords, False)
            show_list_edit_summary("excluded keywords", previous_excluded, excluded_keywords)
        elif choice == "5" or choice == "":
            return directory, keywords, excluded_keywords
        else:
            print_warn("Please choose a number from 1 to 5.")


def collect_runtime_config(
    args: argparse.Namespace,
    config_path: Path,
) -> tuple[Path, list[str], list[str]]:
    had_existing_config = config_path.exists()
    persisted = load_persistent_config(config_path)

    previous_directory = Path(str(persisted["directory"])).expanduser()
    previous_keywords = dedupe_casefold(list(persisted["keywords"]))
    previous_excluded_keywords = dedupe_casefold(list(persisted["excluded_keywords"]))

    directory_default = args.directory if args.directory else str(previous_directory)
    directory = Path(directory_default).expanduser()
    keywords = list(previous_keywords)
    excluded = list(previous_excluded_keywords)

    directory, keywords, excluded = run_interactive_setup_menu(
        directory,
        keywords,
        excluded,
        args.assume_yes,
    )

    changed = show_config_changes(
        previous_directory=previous_directory,
        previous_keywords=previous_keywords,
        previous_excluded_keywords=previous_excluded_keywords,
        new_directory=directory,
        new_keywords=keywords,
        new_excluded_keywords=excluded,
    )

    if changed or not had_existing_config:
        save_persistent_config(config_path, directory, keywords, excluded)
        if had_existing_config:
            print_ok(f"Persistent settings saved to: {config_path}")
        else:
            print_ok(f"Persistent settings file created: {config_path}")
    else:
        print_info(f"Using persistent settings from: {config_path}")

    return directory, keywords, excluded


def build_result_path(results_dir: Path) -> Path:
    results_dir.mkdir(parents=True, exist_ok=True)
    version = 1
    while True:
        candidate = results_dir / f"{APP_NAME}-results-v{version}.json"
        if not candidate.exists():
            return candidate
        version += 1


def serialize_session(session: RunSession) -> dict[str, Any]:
    return {
        "summary": {
            "folders_processed": session.folders_processed,
            "files_processed": session.files_processed,
            "matched_files": session.matches_found,
        },
        "results": [asdict(result) for result in session.results],
    }


def save_results(session: Optional[RunSession]) -> None:
    if session is None or not session.results:
        return
    session.result_file.parent.mkdir(parents=True, exist_ok=True)
    with session.result_file.open("w", encoding="utf-8") as handle:
        json.dump(serialize_session(session), handle, ensure_ascii=False, indent=2)


def handle_interrupt(sig: int, frame: Any) -> None:
    del sig
    del frame
    print()
    print_error("Script interrupted. Saving results before exit.")
    save_results(CURRENT_SESSION)
    raise SystemExit(130)


def should_skip_key(key: str) -> bool:
    lowered = key.casefold()
    if any(pattern in lowered for pattern in SKIP_KEY_PATTERNS):
        return True
    if any(pattern in lowered for pattern in TEXTUAL_SKIP_KEY_PATTERNS):
        return True
    if lowered == "tcon":
        return True
    if "com.apple.itunes" in lowered:
        return True
    return False


def value_to_text(value: Any) -> str:
    if value is None:
        return ""

    if isinstance(value, (bytes, bytearray, memoryview)):
        return ""

    if isinstance(value, (list, tuple, set)):
        parts = [value_to_text(item) for item in value]
        return " | ".join(part for part in parts if part)

    text_attr = getattr(value, "text", None)
    if isinstance(text_attr, (list, tuple)):
        parts = [value_to_text(item) for item in text_attr]
        if any(parts):
            return " | ".join(part for part in parts if part)

    try:
        text = str(value)
    except Exception:
        return ""

    if text.startswith("b'") or text.startswith('b"'):
        return ""

    return text.strip()


def add_container(
    containers: list[OpenedContainer],
    errors: list[str],
    name: str,
    opener: Any,
    ignore_exceptions: tuple[type[BaseException], ...] = (),
) -> None:
    try:
        handle = opener()
    except ignore_exceptions:
        return
    except Exception as exc:
        errors.append(f"{name}: {exc}")
        return

    containers.append(OpenedContainer(name=name, handle=handle))


def is_probably_malformed_apev2_error(exc: Exception) -> bool:
    message = str(exc).casefold()
    return any(
        pattern in message
        for pattern in (
            "codec can't decode",
            "not a valid apev2 key",
            "invalid start byte",
            "ordinal not in range",
            "utf-8",
            "ascii",
        )
    )


def is_probably_malformed_mp4_error(exc: Exception) -> bool:
    message = str(exc).casefold()
    return any(
        pattern in message
        for pattern in (
            "unpack requires a buffer of 4 bytes",
            "unpack requires a buffer",
            "truncated data",
            "not enough data",
            "wrong offset inside",
            "metadata error",
            "stream info error",
            "invalid atom",
        )
    )


def add_apev2_container(
    containers: list[OpenedContainer],
    errors: list[str],
    file_path: Path,
    modules: dict[str, Any],
) -> None:
    try:
        handle = modules["APEv2"](str(file_path))
    except modules["APENoHeaderError"]:
        return
    except Exception as exc:
        if is_probably_malformed_apev2_error(exc):
            containers.append(
                OpenedContainer(
                    name="APEv2",
                    handle=None,
                    readable=False,
                    issue=(
                        "Malformed or unreadable APEv2 tag detected. "
                        "The whole APEv2 tag block can still be removed."
                    ),
                    delete_mode="raw_apev2",
                )
            )
            errors.append(f"APEv2 malformed/unreadable: {exc}")
            return

        errors.append(f"APEv2: {exc}")
        return

    containers.append(OpenedContainer(name="APEv2", handle=handle))


def add_mp4_container(
    containers: list[OpenedContainer],
    errors: list[str],
    file_path: Path,
    modules: dict[str, Any],
) -> None:
    try:
        handle = modules["MP4"](str(file_path))
    except Exception as exc:
        if is_probably_malformed_mp4_error(exc):
            containers.append(
                OpenedContainer(
                    name="MP4",
                    handle=None,
                    readable=False,
                    issue=(
                        "Malformed or unreadable MP4/M4A metadata detected. "
                        "The whole MP4 metadata block can still be removed."
                    ),
                    delete_mode="raw_mp4_tags",
                )
            )
            errors.append(f"MP4 malformed/unreadable: {exc}")
            return

        errors.append(f"MP4: {exc}")
        return

    containers.append(OpenedContainer(name="MP4", handle=handle))


def open_containers(file_path: Path, modules: dict[str, Any]) -> tuple[list[OpenedContainer], list[str]]:
    containers: list[OpenedContainer] = []
    errors: list[str] = []
    ext = file_path.suffix.casefold()

    if ext == ".mp3":
        add_container(
            containers,
            errors,
            "ID3",
            lambda: modules["ID3"](str(file_path)),
            ignore_exceptions=(modules["ID3NoHeaderError"],),
        )
        add_apev2_container(containers, errors, file_path, modules)
    elif ext == ".m4a":
        add_mp4_container(containers, errors, file_path, modules)
        add_apev2_container(containers, errors, file_path, modules)
        add_container(
            containers,
            errors,
            "ID3",
            lambda: modules["ID3"](str(file_path)),
            ignore_exceptions=(modules["ID3NoHeaderError"],),
        )
    elif ext == ".flac":
        add_container(containers, errors, "FLAC", lambda: modules["FLAC"](str(file_path)))
        add_apev2_container(containers, errors, file_path, modules)
        add_container(
            containers,
            errors,
            "ID3",
            lambda: modules["ID3"](str(file_path)),
            ignore_exceptions=(modules["ID3NoHeaderError"],),
        )
    elif ext == ".wma":
        add_container(containers, errors, "ASF", lambda: modules["ASF"](str(file_path)))
        add_apev2_container(containers, errors, file_path, modules)
        add_container(
            containers,
            errors,
            "ID3",
            lambda: modules["ID3"](str(file_path)),
            ignore_exceptions=(modules["ID3NoHeaderError"],),
        )

    return containers, errors


def iter_tag_items(container: OpenedContainer) -> list[tuple[str, Any]]:
    if not container.readable or container.handle is None:
        return []
    try:
        return [(str(key), value) for key, value in container.handle.items()]
    except Exception:
        return []


def find_matches(
    containers: list[OpenedContainer],
    keywords: list[str],
    excluded_keywords: list[str],
) -> list[TagMatch]:
    results: list[TagMatch] = []
    seen: set[tuple[str, str]] = set()

    lowered_keywords = [(keyword, keyword.casefold()) for keyword in dedupe_casefold(keywords)]
    lowered_excluded = [keyword.casefold() for keyword in dedupe_casefold(excluded_keywords)]

    for container in containers:
        for key, raw_value in iter_tag_items(container):
            if should_skip_key(key):
                continue

            text_value = value_to_text(raw_value)
            if not text_value:
                continue

            normalized = text_value.casefold()
            if any(excluded in normalized for excluded in lowered_excluded):
                continue

            matched_keywords = [
                original for original, lowered in lowered_keywords if lowered in normalized
            ]
            if not matched_keywords:
                continue

            dedupe_key = (container.name, key)
            if dedupe_key in seen:
                continue
            seen.add(dedupe_key)

            results.append(
                TagMatch(
                    container=container.name,
                    key=key,
                    value=text_value,
                    matched_keywords=matched_keywords,
                )
            )

    return results


def remove_keys_from_container(container: OpenedContainer, keys: set[str]) -> tuple[list[str], list[str]]:
    removed: list[str] = []
    errors: list[str] = []

    try:
        existing_pairs = [(str(key), key) for key in list(container.handle.keys())]
    except Exception as exc:
        return removed, [f"{container.name}: unable to inspect keys ({exc})"]

    existing_map = {display_key: original_key for display_key, original_key in existing_pairs}

    for key in sorted(keys):
        original_key = existing_map.get(key)
        if original_key is None:
            original_key = next(
                (orig for display, orig in existing_pairs if display.casefold() == key.casefold()),
                None,
            )

        if original_key is None:
            errors.append(f"{container.name}: key not found during removal -> {key}")
            continue

        try:
            del container.handle[original_key]
        except Exception as exc:
            errors.append(f"{container.name}: failed to remove {key} ({exc})")
            continue

        removed.append(key)

    if removed:
        try:
            container.handle.save()
        except Exception as exc:
            errors.append(f"{container.name}: failed to save changes ({exc})")

    return removed, errors


def remove_matches(
    containers: list[OpenedContainer],
    matches: list[TagMatch],
) -> tuple[list[str], list[str]]:
    removed_targets: list[str] = []
    errors: list[str] = []

    grouped: dict[str, set[str]] = {}
    for match in matches:
        grouped.setdefault(match.container, set()).add(match.key)

    by_name = {container.name: container for container in containers}
    for container_name, keys in grouped.items():
        container = by_name.get(container_name)
        if container is None:
            errors.append(f"{container_name}: container was not available during removal")
            continue

        removed_keys, container_errors = remove_keys_from_container(container, keys)
        removed_targets.extend(f"{container_name}:{key}" for key in removed_keys)
        errors.extend(container_errors)

    return removed_targets, errors


def find_problem_containers(containers: list[OpenedContainer]) -> list[OpenedContainer]:
    return [container for container in containers if not container.readable and container.delete_mode]


def remove_problem_containers(
    file_path: Path,
    containers: list[OpenedContainer],
    modules: dict[str, Any],
) -> tuple[list[str], list[str]]:
    removed_targets: list[str] = []
    errors: list[str] = []

    for container in containers:
        if container.delete_mode != "raw_apev2":
            continue

        try:
            with file_path.open("rb+") as handle:
                data = modules["APEv2Data"](handle)
                if data.start is None or data.end is None:
                    raise ValueError("APEv2 block offsets could not be located")
                modules["APEDeleteBytes"](handle, data.end - data.start, data.start)
        except Exception as exc:
            errors.append(f"{container.name}: failed to remove malformed tag block ({exc})")
            continue

        removed_targets.append(f"{container.name}:<entire malformed tag block>")

    for container in containers:
        if container.delete_mode != "raw_mp4_tags":
            continue

        try:
            modules["MP4Tags"]().delete(str(file_path))
        except Exception as exc:
            errors.append(f"{container.name}: failed to remove malformed MP4 metadata ({exc})")
            continue

        removed_targets.append(f"{container.name}:<entire malformed MP4 metadata block>")

    return removed_targets, errors


def iter_audio_files(directory: Path) -> list[Path]:
    files: list[Path] = []
    for root, _, filenames in os.walk(directory):
        for filename in filenames:
            file_path = Path(root) / filename
            if file_path.suffix.casefold() in SUPPORTED_EXTENSIONS:
                files.append(file_path)
    return files


def should_remove_file(
    args: argparse.Namespace,
    file_result: FileScanResult,
    problem_containers: list[OpenedContainer],
) -> bool:
    if args.scan_only:
        return False
    if args.auto_remove:
        return True

    if file_result.matches:
        print_warn(f"Matched fields in {file_result.file}:")
        for match in file_result.matches:
            print_match(
                f"  [{match.container}] {match.key} -> {match.value} | keywords={match.matched_keywords}"
            )

    for container in problem_containers:
        print_warn(f"  [{container.name}] {container.issue}")

    if file_result.matches and problem_containers:
        prompt = "Remove the matched fields and malformed tag blocks from this file? [y/N]: "
    elif problem_containers:
        prompt = "Remove the malformed tag block(s) from this file? [y/N]: "
    else:
        prompt = "Remove the matched tag fields from this file? [y/N]: "

    answer = safe_input(prompt)
    if answer is None:
        print_warn("Removal prompt unavailable. Skipping removal for safety.")
        return False
    answer = answer.strip().lower()
    return answer in {"y", "yes"}


def process_directory(
    directory: Path,
    keywords: list[str],
    excluded_keywords: list[str],
    args: argparse.Namespace,
    modules: dict[str, Any],
    session: RunSession,
) -> None:
    if not directory.exists():
        raise FileNotFoundError(f"Directory does not exist: {directory}")
    if not directory.is_dir():
        raise NotADirectoryError(f"Path is not a directory: {directory}")

    audio_files = iter_audio_files(directory)
    folder_roots = {str(path.parent) for path in audio_files}
    session.folders_processed = len(folder_roots)

    if not audio_files:
        print_warn("No supported audio files were found.")
        return

    for file_path in audio_files:
        session.files_processed += 1
        print_info(f"Processing: {file_path}")

        containers, container_errors = open_containers(file_path, modules)
        matches = find_matches(containers, keywords, excluded_keywords)
        problem_containers = find_problem_containers(containers)

        if not matches and not container_errors and not problem_containers:
            print_info("  No matching tags found.")
            continue

        file_result = FileScanResult(
            file=str(file_path),
            matches=matches,
            errors=container_errors[:],
        )

        if matches:
            session.matches_found += 1

        if (matches or problem_containers) and should_remove_file(args, file_result, problem_containers):
            removed_targets: list[str] = []
            removal_errors: list[str] = []

            if matches:
                matched_removed, matched_errors = remove_matches(containers, matches)
                removed_targets.extend(matched_removed)
                removal_errors.extend(matched_errors)

            if problem_containers:
                problem_removed, problem_errors = remove_problem_containers(
                    file_path,
                    problem_containers,
                    modules,
                )
                removed_targets.extend(problem_removed)
                removal_errors.extend(problem_errors)

            file_result.removed_targets = removed_targets
            file_result.errors.extend(removal_errors)
            if removed_targets:
                print_ok(f"  Removed {len(removed_targets)} tag target(s).")
            else:
                print_warn("  No tag targets were removed.")
        else:
            if matches or problem_containers:
                file_result.skipped = True
                print_warn("  Removal skipped.")

        if file_result.errors:
            for error in file_result.errors:
                print_error(f"  {error}")

        if file_result.matches or file_result.errors:
            session.results.append(file_result)
            save_results(session)


def main() -> int:
    args = parse_args()
    script_dir = Path(__file__).resolve().parent
    config_path = get_config_path(args, script_dir)

    if not ensure_dependencies(
        script_dir=script_dir,
        auto_install=args.install_deps,
        assume_yes=args.assume_yes,
    ):
        return 1

    setup_colors(disable_color=args.no_color)

    try:
        modules = load_mutagen_modules()
    except Exception as exc:
        print_error(f"Unable to load mutagen modules: {exc}")
        return 1

    directory, keywords, excluded_keywords = collect_runtime_config(args, config_path)
    session = RunSession(result_file=build_result_path(Path(args.results_dir).expanduser()))

    global CURRENT_SESSION
    CURRENT_SESSION = session
    signal.signal(signal.SIGINT, handle_interrupt)

    print_info(f"Config file: {config_path}")
    print_info(f"Result log: {session.result_file}")
    print_info(f"Scanning directory: {directory}")

    try:
        process_directory(
            directory=directory,
            keywords=keywords,
            excluded_keywords=excluded_keywords,
            args=args,
            modules=modules,
            session=session,
        )
    except (FileNotFoundError, NotADirectoryError) as exc:
        print_error(str(exc))
        return 1
    except KeyboardInterrupt:
        handle_interrupt(0, None)

    save_results(session)

    print()
    print_ok(f"Folders processed: {session.folders_processed}")
    print_ok(f"Files processed: {session.files_processed}")
    print_ok(f"Files with matches: {session.matches_found}")
    if session.results:
        print_ok(f"JSON results saved to: {session.result_file}")
    else:
        print_info("No matches or errors were recorded, so no JSON file was created.")

    return 0


if __name__ == "__main__":
    raise SystemExit(main())

About

MP3Tag Cleaner - Remove MP3Tag with Python

Topics

Resources

License

Stars

Watchers

Forks

Contributors

Languages