import os import logging import struct import time import collections import shutil import lmdb from pathlib import Path import PIL.Image from PySide6.QtCore import ( QObject, QThread, Signal, QMutex, QSemaphore, QReadWriteLock, QMutexLocker, QReadLocker, QWriteLocker, QRunnable ) import imagehash # For perceptual hashing from constants import ( DUPLICATE_CACHE_PATH, DUPLICATE_HASH_DB_NAME, DUPLICATE_EXCEPTIONS_DB_NAME, DUPLICATE_PENDING_DB_NAME, MAX_DHASH_DISTANCE, UITexts ) logger = logging.getLogger(__name__) # Result structure for duplicate detection DuplicateResult = collections.namedtuple('DuplicateResult', ['path1', 'path2', 'hash_value', 'is_exception', 'similarity', 'timestamp']) class BKTree: """A Burkhard-Keller tree for efficient similarity searching using Hamming distance.""" def __init__(self, distance_func): self.distance_func = distance_func self.tree = None def add(self, item): if self.tree is None: self.tree = (item, {}) return node = self.tree while True: val, children = node dist = self.distance_func(item, val) if dist == 0: return if dist in children: node = children[dist] else: children[dist] = (item, {}) break def query(self, item, max_dist): if self.tree is None: return [] results = [] candidates = [self.tree] while candidates: val, children = candidates.pop() dist = self.distance_func(item, val) if dist <= max_dist: results.append((val, dist)) for d in range(max(0, dist - max_dist), dist + max_dist + 1): if d in children: candidates.append(children[d]) return results class HashWorker(QRunnable): """Worker to calculate image hash in a thread pool.""" def __init__(self, path, detector, result_dict, mutex, semaphore): super().__init__() self.path = path self.detector = detector self.result_dict = result_dict self.mutex = mutex self.semaphore = semaphore def run(self): if self.detector._is_running: try: # imagehash requires a PIL/Pillow image object. with PIL.Image.open(self.path) as pil_img: # Using dHash from imagehash library as default h = str(imagehash.dhash(pil_img)) with QMutexLocker(self.mutex): self.result_dict[self.path] = h except Exception as e: logger.warning(f"HashWorker failed for {self.path}: {e}") self.semaphore.release() class DuplicateCache(QObject): """ Manages a persistent LMDB cache for perceptual hashes and duplicate relationships. Uses (device_id, inode) as primary keys for robustness against file renames/moves. """ def __init__(self): super().__init__() self._lmdb_env = None self._hash_db = None self._exceptions_db = None self._pending_db = None self._db_lock = QMutex() # Protects LMDB transactions # In-memory cache for hashes: (dev, inode) -> (hash_value, path) self._hash_cache = {} self._hash_cache_lock = QReadWriteLock() self.lmdb_open() def lmdb_open(self): cache_dir = Path(DUPLICATE_CACHE_PATH) cache_dir.mkdir(parents=True, exist_ok=True) try: self._lmdb_env = lmdb.open( DUPLICATE_CACHE_PATH, map_size=10 * 1024 * 1024 * 1024, # 10GB default max_dbs=3, # For hashes, exceptions and pending readonly=False, create=True ) self._hash_db = self._lmdb_env.open_db(DUPLICATE_HASH_DB_NAME) self._exceptions_db = self._lmdb_env.open_db(DUPLICATE_EXCEPTIONS_DB_NAME) self._pending_db = self._lmdb_env.open_db(DUPLICATE_PENDING_DB_NAME) logger.info(f"Duplicate LMDB cache opened: {DUPLICATE_CACHE_PATH}") except Exception as e: logger.error(f"Failed to open duplicate LMDB cache: {e}") self._lmdb_env = None def lmdb_close(self): if self._lmdb_env: self._lmdb_env.close() self._lmdb_env = None self._hash_db = None self._exceptions_db = None self._pending_db = None def get_hash_stats(self): """Returns (count, size_bytes) for the hash database.""" count = 0 if not self._lmdb_env: return 0, 0 with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=False) as txn: count = txn.stat(db=self._hash_db)['entries'] size = 0 disk_path = os.path.join(DUPLICATE_CACHE_PATH, "data.mdb") if os.path.exists(disk_path): size = os.path.getsize(disk_path) return count, size def clear_hashes(self): """Clears all hashes from the database by recreating the environment.""" with QWriteLocker(self._hash_cache_lock): self._hash_cache.clear() self.lmdb_close() try: if os.path.exists(DUPLICATE_CACHE_PATH): shutil.rmtree(DUPLICATE_CACHE_PATH) self.lmdb_open() logger.info("Duplicate hash cache cleared.") except Exception as e: logger.error(f"Error clearing duplicate LMDB: {e}") def __del__(self): self.lmdb_close() @staticmethod def _get_inode_info(path): try: stat_info = os.stat(path) return stat_info.st_dev, struct.pack('Q', stat_info.st_ino) except OSError: return 0, None def _get_lmdb_key(self, dev_id, inode_key_bytes): return f"{dev_id}-{inode_key_bytes.hex()}".encode('utf-8') def get_hash_and_path(self, dev_id, inode_key_bytes): """Retrieves hash, mtime and path for a given (dev_id, inode_key_bytes).""" # Check in-memory cache first with QReadLocker(self._hash_cache_lock): cached_data = self._hash_cache.get((dev_id, inode_key_bytes)) if cached_data: return cached_data # (hash_value, mtime, path) # Check LMDB if not self._lmdb_env: return None, 0, None with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=False) as txn: lmdb_key = self._get_lmdb_key(dev_id, inode_key_bytes) value_bytes = txn.get(lmdb_key, db=self._hash_db) if value_bytes: # Handle format "hash_value_str|mtime|path_str" or old "hash|path" parts = value_bytes.decode('utf-8').split('|', 2) if len(parts) == 3: hash_str, mtime_str, path_str = parts mtime = float(mtime_str) elif len(parts) == 2: hash_str, path_str = parts mtime = 0.0 # Force re-hash else: return None, 0, None with QWriteLocker(self._hash_cache_lock): self._hash_cache[(dev_id, inode_key_bytes)] = (hash_str, mtime, path_str) return hash_str, mtime, path_str return None, 0, None def get_hash_for_path(self, path, current_mtime, dev_id=None, inode_key_bytes=None): if dev_id is None or inode_key_bytes is None: dev_id, inode_key_bytes = self._get_inode_info(path) if not inode_key_bytes: return None hash_value, cached_mtime, _ = self.get_hash_and_path(dev_id, inode_key_bytes) # Return hash only if mtime matches (with small float tolerance) if hash_value and abs(cached_mtime - current_mtime) < 0.001: return hash_value return None def add_hash_for_path(self, path, hash_value, mtime, dev_id=None, inode_key_bytes=None): if dev_id is None or inode_key_bytes is None: dev_id, inode_key_bytes = self._get_inode_info(path) if not inode_key_bytes or not self._lmdb_env: return False value_str = f"{hash_value}|{mtime}|{path}" with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=True) as txn: lmdb_key = self._get_lmdb_key(dev_id, inode_key_bytes) txn.put(lmdb_key, value_str.encode('utf-8'), db=self._hash_db) with QWriteLocker(self._hash_cache_lock): self._hash_cache[(dev_id, inode_key_bytes)] = (hash_value, mtime, path) return True def remove_hash_for_path(self, path, clear_relationships=True): """ Removes the hash entry for a path. Args: path: File path. clear_relationships: If True, also wipes all entries in pending and exceptions DBs involving this file. """ dev_id, inode_key_bytes = self._get_inode_info(path) if not inode_key_bytes or not self._lmdb_env: return False with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=True) as txn: lmdb_key = self._get_lmdb_key(dev_id, inode_key_bytes) txn.delete(lmdb_key, db=self._hash_db) with QWriteLocker(self._hash_cache_lock): self._hash_cache.pop((dev_id, inode_key_bytes), None) # Also remove any exceptions involving this path if clear_relationships: self._remove_pair_entries_for_path(dev_id, inode_key_bytes, self._exceptions_db) self._remove_pair_entries_for_path(dev_id, inode_key_bytes, self._pending_db) return True def _get_pair_lmdb_key_from_ids(self, dev1, inode1, dev2, inode2): # Ensure canonical order for exception keys key_parts = sorted([f"{dev1}-{inode1.hex()}", f"{dev2}-{inode2.hex()}"]) return f"{key_parts[0]}-{key_parts[1]}".encode('utf-8') def _get_pair_lmdb_key(self, path1, path2): dev1, inode1 = self._get_inode_info(path1) dev2, inode2 = self._get_inode_info(path2) if not inode1 or not inode2: return None return self._get_pair_lmdb_key_from_ids(dev1, inode1, dev2, inode2) def mark_as_exception(self, path1, path2, is_exception=True, similarity=None, timestamp=None): if not self._lmdb_env: return False dev1, inode1 = self._get_inode_info(path1) dev2, inode2 = self._get_inode_info(path2) if not inode1 or not inode2: return False exception_key = self._get_pair_lmdb_key_from_ids(dev1, inode1, dev2, inode2) if not exception_key: return False # Store paths in value to make exception recovery independent of hash DB ts = timestamp if timestamp is not None else int(time.time()) val_str = f"{path1}|{path2}|{similarity if similarity is not None else ''}|{ts}" value = val_str.encode('utf-8') with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=True) as txn: if is_exception: txn.put(exception_key, value, db=self._exceptions_db) else: txn.delete(exception_key, db=self._exceptions_db) return True def is_exception(self, path1, path2): if not self._lmdb_env: return False dev1, inode1 = self._get_inode_info(path1) dev2, inode2 = self._get_inode_info(path2) if not inode1 or not inode2: return False exception_key = self._get_pair_lmdb_key_from_ids(dev1, inode1, dev2, inode2) if not exception_key: return False with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=False) as txn: return txn.get(exception_key, db=self._exceptions_db) is not None def _remove_pair_entries_for_path(self, target_dev, target_inode, db_handle, txn=None): """Removes all entries involving a specific (dev, inode) pair from a pair-based DB.""" if not self._lmdb_env: return target_inode_hex = target_inode.hex() def do_remove(t): cursor = t.cursor(db=db_handle) keys_to_delete = [] for key_bytes, _ in cursor: key_str = key_bytes.decode('utf-8') parts = key_str.split('-') if len(parts) < 4: continue dev1, inode1_hex, dev2, inode2_hex = int(parts[0]), parts[1], int(parts[2]), parts[3] if (dev1 == target_dev and inode1_hex == target_inode_hex) or \ (dev2 == target_dev and inode2_hex == target_inode_hex): keys_to_delete.append(key_bytes) for key in keys_to_delete: t.delete(key, db=db_handle) if txn: do_remove(txn) else: with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=True) as t: do_remove(t) def mark_as_pending(self, path1, path2, is_pending=True, similarity=None, timestamp=None): """Marks a pair as pending review.""" if not self._lmdb_env or self._pending_db is None: return False key = self._get_pair_lmdb_key(path1, path2) if not key: return False # Store paths in value to allow reconstruction without scanning ts = timestamp if timestamp is not None else int(time.time()) val_str = f"{path1}|{path2}|{similarity if similarity is not None else ''}|{ts}" value = val_str.encode('utf-8') with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=True) as txn: if is_pending: txn.put(key, value, db=self._pending_db) else: # Check if it exists before deleting to avoid errors if txn.get(key, db=self._pending_db): txn.delete(key, db=self._pending_db) return True def get_all_pending_duplicates(self): """Retrieves all pending duplicate pairs from the database.""" results = [] if not self._lmdb_env or self._pending_db is None: return results keys_to_delete = [] with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=False) as txn: cursor = txn.cursor(db=self._pending_db) for key, value_bytes in cursor: try: parts = value_bytes.decode('utf-8').split('|') p1, p2 = parts[0], parts[1] sim = int(parts[2]) if len(parts) > 2 and parts[2] else None ts = int(parts[3]) if len(parts) > 3 else 0 if os.path.exists(p1) and os.path.exists(p2): results.append(DuplicateResult(p1, p2, None, False, sim, ts)) else: keys_to_delete.append(key) except Exception: keys_to_delete.append(key) continue if keys_to_delete: try: with self._lmdb_env.begin(write=True) as txn: for k in keys_to_delete: txn.delete(k, db=self._pending_db) logger.info(f"Cleaned up {len(keys_to_delete)} invalid pending duplicates (files deleted externally)") except Exception as e: logger.error(f"Error cleaning up pending duplicates from DB: {e}") return results def get_all_exceptions(self): """Retrieves all duplicate pairs marked as exceptions from the database.""" results = [] if not self._lmdb_env or self._exceptions_db is None: return results with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=False) as txn: cursor = txn.cursor(db=self._exceptions_db) for key_bytes, value_bytes in cursor: try: p1, p2 = None, None sim = None ts = 0 val_str = value_bytes.decode('utf-8') if '|' in val_str: # New format: paths are stored in the value parts = val_str.split('|') if len(parts) >= 2: p1, p2 = parts[0], parts[1] if len(parts) > 2 and parts[2]: sim = int(parts[2]) if len(parts) > 3: ts = int(parts[3]) else: ts = int(os.path.getmtime(p1)) if os.path.exists(p1) else 0 if not p1 or not p2: # Legacy format fallback: lookup paths in hash db key_str = key_bytes.decode('utf-8') kp = key_str.split('-') if len(kp) == 4: k1, k2 = f"{kp[0]}-{kp[1]}".encode(), f"{kp[2]}-{kp[3]}".encode() v1, v2 = txn.get(k1, db=self._hash_db), txn.get(k2, db=self._hash_db) if v1 and v2: # Format is hash|mtime|path|dist... path is always index 2 p1 = v1.decode('utf-8').split('|')[2] p2 = v2.decode('utf-8').split('|')[2] if p1 and p2: if os.path.exists(p1) and os.path.exists(p2): results.append(DuplicateResult(p1, p2, None, True, sim, ts)) except Exception: continue return results def clean_stale_hashes(self): """ Removes hash entries from the database for files that no longer exist on disk. """ if not self._lmdb_env or self._hash_db is None: return 0 keys_to_delete = [] with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=False) as txn: cursor = txn.cursor(db=self._hash_db) for key, value_bytes in cursor: try: # value_bytes is "hash|mtime|path|last_dist" parts = value_bytes.decode('utf-8').split('|') if len(parts) >= 3: path = parts[2] if not os.path.exists(path): keys_to_delete.append(key) except Exception: keys_to_delete.append(key) # Corrupted entry continue if keys_to_delete: with self._lmdb_env.begin(write=True) as txn: for k in keys_to_delete: txn.delete(k, db=self._hash_db) logger.info(f"Cleaned up {len(keys_to_delete)} stale hash entries (files deleted externally)") return len(keys_to_delete) def get_all_hashes_with_paths(self): """Retrieves all hashes from the database along with their associated paths and inode info.""" # hash_value -> [(path, dev_id, inode_key_bytes)] all_hashes = collections.defaultdict(list) if not self._lmdb_env: return all_hashes with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=False) as txn: cursor = txn.cursor(db=self._hash_db) for key_bytes, value_bytes in cursor: # key_bytes is like "dev_id-inode_hex" key_str = key_bytes.decode('utf-8') parts = key_str.split('-') dev_id = int(parts[0]) inode_key_bytes = bytes.fromhex(parts[1]) # value_bytes is "hash|mtime|path|last_dist" parts_val = value_bytes.decode('utf-8').split('|') if len(parts_val) >= 3: hash_value = parts_val[0] path = parts_val[2] else: continue all_hashes[hash_value].append((path, dev_id, inode_key_bytes)) return all_hashes def rename_entry(self, old_path, new_path): """ Updates the cache entry for a file that has been renamed or moved. This involves deleting the old (dev, inode) entry and adding a new one with the new (dev, inode) and path, preserving the hash value. """ old_dev, old_inode_key_bytes = self._get_inode_info(old_path) new_dev, new_inode_key_bytes = self._get_inode_info(new_path) if not old_inode_key_bytes or not new_inode_key_bytes or not self._lmdb_env: return False # If the (dev, inode) pair is the same, only the path in the value needs updating. # This happens if the file is renamed within the same filesystem. if (old_dev, old_inode_key_bytes) == (new_dev, new_inode_key_bytes): hash_value, mtime, _ = self.get_hash_and_path(old_dev, old_inode_key_bytes) if hash_value: self.add_hash_for_path(new_path, hash_value, mtime) self._update_pair_paths(old_path, new_path, self._pending_db) return True return False # If (dev, inode) changed (cross-filesystem move), we need to: # 1. Get the hash from the old entry. # 2. Remove the old entry. # 3. Add a new entry with the new (dev, inode) and path, using the old hash. hash_value, mtime, _ = self.get_hash_and_path(old_dev, old_inode_key_bytes) if hash_value: self.remove_hash_for_path(old_path) # This removes the old (dev, inode) entry self.add_hash_for_path(new_path, hash_value, mtime) # Adds new (dev, inode) entry self._update_pair_paths(old_path, new_path, self._pending_db) return True return False def _update_pair_paths(self, old_path, new_path, db_handle): """Updates stored paths in a pair-based DB value when a file is renamed.""" if not self._lmdb_env or db_handle is None: return with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=True) as txn: cursor = txn.cursor(db=db_handle) for key, value_bytes in cursor: val_str = value_bytes.decode('utf-8') if old_path in val_str: p1, p2 = val_str.split('|') np1 = new_path if p1 == old_path else p1 np2 = new_path if p2 == old_path else p2 txn.put(key, f"{np1}|{np2}".encode('utf-8'), db=db_handle) class DuplicateDetector(QThread): """ Worker thread for detecting duplicate images using perceptual hashing. """ progress_update = Signal(int, int, str) # current, total, message duplicates_found = Signal(list) # List of DuplicateResult detection_finished = Signal() def __init__(self, paths_to_scan, duplicate_cache, pool_manager, method="histogram_hashing", threshold=90, force_full=False): super().__init__() self.paths_to_scan = paths_to_scan self.duplicate_cache = duplicate_cache self.pool_manager = pool_manager self.method = method self.threshold = threshold # Similarity percentage (50-100) self.force_full = force_full self._is_running = True def stop(self): self._is_running = False self.wait() # Add this line def run(self): total_files = len(self.paths_to_scan) found_duplicates = [] unique_duplicate_pairs = set() # To store frozenset((path1, path2)) for uniqueness last_update_time = 0 pool = self.pool_manager.get_pool() # 1. Load existing pending duplicates from cache to avoid recalculation (unless force_full) if not self.force_full: pending = self.duplicate_cache.get_all_pending_duplicates() for p in pending: if p.path1 in self.paths_to_scan and p.path2 in self.paths_to_scan: if p.similarity is None or p.similarity >= self.threshold: found_duplicates.append(p) unique_duplicate_pairs.add(frozenset((p.path1, p.path2))) # Convert similarity threshold (percentage) to Hamming distance distance_threshold = int(MAX_DHASH_DISTANCE * (100 - self.threshold) / 100) logger.info(f"Duplicate detection: Method={self.method}, Similarity Threshold={self.threshold}%, Hamming Distance Threshold={distance_threshold}") # 2. Phase 1: Hash Collection (Parallelized) path_to_hash = {} dirty_hashes_objs = set() dirty_paths = set() paths_to_hash_parallel = [] for path in self.paths_to_scan: try: stat_info = os.stat(path) mtime = stat_info.st_mtime dev, inode = stat_info.st_dev, struct.pack('Q', stat_info.st_ino) cached_h = \ self.duplicate_cache.get_hash_for_path(path, mtime, dev, inode) if cached_h: path_to_hash[path] = (cached_h, dev, inode) else: dirty_paths.add(path) paths_to_hash_parallel.append((path, mtime, dev, inode)) except OSError: continue # Phase 1 starts with files already found in cache or skipped processed_hashing = total_files - len(paths_to_hash_parallel) if paths_to_hash_parallel and self._is_running: batch_size = pool.maxThreadCount() * 2 results_mutex = QMutex() new_hashes = {} sem = QSemaphore(0) for i in range(0, len(paths_to_hash_parallel), batch_size): if not self._is_running: break current_batch = paths_to_hash_parallel[i : i + batch_size] for p_data in current_batch: pool.start(HashWorker(p_data[0], self, new_hashes, results_mutex, sem)) for _ in range(len(current_batch)): while not sem.tryAcquire(1, 100): if not self._is_running: break if not self._is_running: break processed_hashing += 1 if time.perf_counter() - last_update_time > 0.05: self.progress_update.emit(processed_hashing, total_files * 2, UITexts.DUPLICATE_MSG_HASHING.format(filename="...")) last_update_time = time.perf_counter() for p, mtime, dev, inode in paths_to_hash_parallel: h = new_hashes.get(p) if h: path_to_hash[p] = (h, dev, inode) dirty_hashes_objs.add(imagehash.hex_to_hash(h)) self.duplicate_cache.add_hash_for_path(p, h, mtime, dev, inode) if not self._is_running: self.detection_finished.emit() return # Signal phase transition to exactly 50% self.progress_update.emit(total_files, total_files * 2, UITexts.DUPLICATE_MSG_ANALYZING.format(filename="...")) # 3. Phase 2: Comparison (Optimized with BK-Tree) hash_map = collections.defaultdict(list) bk_tree = BKTree(lambda a, b: a - b) for p, (h_str, dev, inode) in path_to_hash.items(): h_obj = imagehash.hex_to_hash(h_str) if h_obj not in hash_map: bk_tree.add(h_obj) hash_map[h_obj].append((p, dev, inode)) if self.force_full or p in dirty_paths: dirty_hashes_objs.add(h_obj) # Optimization: Only query the tree for hashes associated with new or modified files. # This finds pairs (Dirty, Clean) and (Dirty, Dirty). (Clean, Clean) were handled in previous runs. hashes_to_query = list(dirty_hashes_objs) if not self.force_full else list(hash_map.keys()) total_queries = len(hashes_to_query) for i, h1 in enumerate(hashes_to_query): if not self._is_running: break items1 = hash_map[h1] if time.perf_counter() - last_update_time > 0.1: # Scale Phase 2 progress to the 50%-100% range phase2_progress = int(((i + 1) / total_queries) * total_files) if total_queries > 0 else total_files self.progress_update.emit(total_files + phase2_progress, total_files * 2, UITexts.DUPLICATE_MSG_ANALYZING.format(filename="...")) last_update_time = time.perf_counter() # Query tree for similar hashes for h2, distance in bk_tree.query(h1, distance_threshold): items2 = hash_map[h2] for p1, dev1, ino1 in items1: for p2, dev2, ino2 in items2: if not self._is_running: break if (dev1, ino1) == (dev2, ino2): continue # Optimization: Skip pair if BOTH were already verified if not self.force_full and p1 not in dirty_paths and p2 not in dirty_paths: continue canonical = frozenset((p1, p2)) if not self._is_running: break if canonical not in unique_duplicate_pairs: if not self.duplicate_cache.is_exception(p1, p2): sim = int((1.0 - (distance / MAX_DHASH_DISTANCE)) * 100) ts = int(time.time()) res = DuplicateResult(p1, p2, str(h1), False, sim, ts) found_duplicates.append(res) unique_duplicate_pairs.add(canonical) self.duplicate_cache.mark_as_pending(p1, p2, True, similarity=sim, timestamp=ts) self.duplicates_found.emit(found_duplicates) self.detection_finished.emit()