""" Image Scanner Module for Bagheera Image Viewer. This module handles asynchronous image discovery, thumbnail generation, and persistent caching. It ensures the main application UI remains responsive by offloading file I/O and image processing to background threads. Key Features: - Asynchronous Scanning: Directory traversal and image loading in separate threads. - Two-Tier Caching: - In-Memory: LRU-based cache with safe concurrent access. - Disk: Persistent storage using LMDB (Lightning Memory-Mapped Database). - Incremental Loading: Batched image processing for memory efficiency. - Cache Maintenance: Automatic cleanup of invalid entries and size enforcement. - Improved Synchronization: Context managers, proper mutex handling, no deadlocks. - Better Path Handling: Uses pathlib for robust path operations. - Enhanced Error Handling: Detailed logging and recovery mechanisms. """ import os import logging import shlex import shutil import struct import subprocess import time import argparse import collections from pathlib import Path from contextlib import contextmanager import lmdb from PySide6.QtCore import ( QObject, QThread, Signal, QMutex, QReadWriteLock, QSize, QSemaphore, QWaitCondition, QByteArray, QBuffer, QIODevice, Qt, QTimer, QRunnable, QThreadPool, QFile ) from PySide6.QtGui import QImage, QImageReader, QImageIOHandler from constants import ( APP_CONFIG, CACHE_PATH, CACHE_MAX_SIZE, CONFIG_DIR, DISK_CACHE_MAX_BYTES, IMAGE_EXTENSIONS, SEARCH_CMD, THUMBNAIL_SIZES, RATING_XATTR_NAME, XATTR_NAME, UITexts, SCANNER_SETTINGS_DEFAULTS, HAVE_BAGHEERASEARCH_LIB ) from imageviewer import ImageViewer from metadatamanager import XattrManager if HAVE_BAGHEERASEARCH_LIB: try: from bagheera_search_lib import BagheeraSearcher except ImportError: HAVE_BAGHEERASEARCH_LIB = False pass # Set up logging for better debugging logger = logging.getLogger(__name__) class ThreadPoolManager: """Manages a global QThreadPool to dynamically adjust thread count.""" def __init__(self): self.pool = QThreadPool() self.default_thread_count = APP_CONFIG.get( "generation_threads", SCANNER_SETTINGS_DEFAULTS.get("generation_threads", 4) ) self.pool.setMaxThreadCount(self.default_thread_count) self.is_user_active = False logger.info(f"ThreadPoolManager initialized with " f"{self.default_thread_count} threads.") def get_pool(self): """Returns the managed QThreadPool instance.""" return self.pool def set_user_active(self, active): """ Adjusts thread count based on user activity. Args: active (bool): True if the user is interacting with the UI. """ if active == self.is_user_active: return self.is_user_active = active if active: # User is active, reduce threads to 1 to prioritize UI responsiveness. self.pool.setMaxThreadCount(1) logger.debug("User is active, reducing thread pool to 1.") else: # User is idle, restore to default thread count. self.pool.setMaxThreadCount(self.default_thread_count) logger.debug(f"User is idle, restoring thread pool to " f"{self.default_thread_count}.") def update_default_thread_count(self): """Updates the default thread count from application settings.""" self.default_thread_count = APP_CONFIG.get( "generation_threads", SCANNER_SETTINGS_DEFAULTS.get("generation_threads", 4) ) # Only apply if not in a user-active (low-thread) state. if not self.is_user_active: self.pool.setMaxThreadCount(self.default_thread_count) logger.info(f"Default thread count updated to {self.default_thread_count}.") class ScannerWorker(QRunnable): """ Worker to process a single image in a thread pool. Handles thumbnail retrieval/generation and metadata loading. """ def __init__(self, cache, path, target_sizes=None, load_metadata=True, signal_emitter=None, semaphore=None): super().__init__() self.cache = cache self.path = path self.target_sizes = target_sizes self.load_metadata_flag = load_metadata self.emitter = signal_emitter self.semaphore = semaphore self._is_cancelled = False # Result will be (path, thumb, mtime, tags, rating, inode, dev) or None self.result = None def shutdown(self): """Marks the worker as cancelled.""" self._is_cancelled = True def run(self): from constants import SCANNER_GENERATE_SIZES sizes_to_check = self.target_sizes if self.target_sizes is not None \ else SCANNER_GENERATE_SIZES if self._is_cancelled: if self.semaphore: self.semaphore.release() return fd = None try: # Optimize: Open file once to reuse FD for stat and xattrs fd = os.open(self.path, os.O_RDONLY) stat_res = os.fstat(fd) curr_mtime = stat_res.st_mtime curr_inode = stat_res.st_ino curr_dev = stat_res.st_dev smallest_thumb_for_signal = None min_size = min(sizes_to_check) if sizes_to_check else 0 # Ensure required thumbnails exist for size in sizes_to_check: if self._is_cancelled: return # Check if a valid thumbnail for this size exists thumb, mtime = self.cache.get_thumbnail(self.path, size, curr_mtime=curr_mtime, inode=curr_inode, device_id=curr_dev) if not thumb or mtime != curr_mtime: # Use generation lock to prevent multiple threads generating with self.cache.generation_lock( self.path, size, curr_mtime, curr_inode, curr_dev) as should_gen: if self._is_cancelled: return if should_gen: # I am the owner, I generate the thumbnail new_thumb = generate_thumbnail(self.path, size, fd=fd) if self._is_cancelled: return if new_thumb and not new_thumb.isNull(): self.cache.set_thumbnail( self.path, new_thumb, curr_mtime, size, inode=curr_inode, device_id=curr_dev, block=True) if size == min_size: smallest_thumb_for_signal = new_thumb else: # Another thread generated it, re-fetch if size == min_size: re_thumb, _ = self.cache.get_thumbnail( self.path, size, curr_mtime=curr_mtime, inode=curr_inode, device_id=curr_dev, async_load=False) smallest_thumb_for_signal = re_thumb elif size == min_size: # valid thumb exists, use it for signal smallest_thumb_for_signal = thumb tags = [] rating = 0 if self.load_metadata_flag: tags, rating = self._load_metadata(fd) self.result = (self.path, smallest_thumb_for_signal, curr_mtime, tags, rating, curr_inode, curr_dev) except Exception as e: logger.error(f"Error processing image {self.path}: {e}") self.result = None finally: if fd is not None: try: os.close(fd) except OSError: pass if self.emitter: self.emitter.emit_progress() if self.semaphore: self.semaphore.release() def _load_metadata(self, path_or_fd): """Loads tag and rating data for a path or file descriptor.""" tags = [] raw_tags = XattrManager.get_attribute(path_or_fd, XATTR_NAME) if raw_tags: tags = sorted(list(set(t.strip() for t in raw_tags.split(',') if t.strip()))) raw_rating = XattrManager.get_attribute(path_or_fd, RATING_XATTR_NAME, "0") try: rating = int(raw_rating) except ValueError: rating = 0 return tags, rating def generate_thumbnail(path, size, fd=None): """Generates a QImage thumbnail for a given path and size.""" try: qfile = None if fd is not None: try: # Ensure we are at the beginning of the file os.lseek(fd, 0, os.SEEK_SET) qfile = QFile() if qfile.open(fd, QIODevice.ReadOnly, QFile.DontCloseHandle): reader = QImageReader(qfile) else: qfile = None reader = QImageReader(path) except OSError: reader = QImageReader(path) else: reader = QImageReader(path) # Optimization: Instruct the image decoder to scale while reading. # This drastically reduces memory usage and CPU time for large images # (e.g. JPEG). if reader.supportsOption(QImageIOHandler.ImageOption.ScaledSize): orig_size = reader.size() if orig_size.isValid() \ and (orig_size.width() > size or orig_size.height() > size): target_size = QSize(orig_size) target_size.scale(size, size, Qt.KeepAspectRatio) reader.setScaledSize(target_size) reader.setAutoTransform(True) img = reader.read() # Fallback: If optimization failed (and it wasn't just a missing file), # try standard read if img.isNull(): error = reader.error() # Optimize: Don't retry for AppleDouble files (._*) if UnknownError if os.path.basename(path).startswith("._") and \ error == QImageReader.ImageReaderError.UnknownError: return None if error != QImageReader.ImageReaderError.FileNotFoundError: reader = QImageReader(path) reader.setAutoTransform(True) img = reader.read() if img.isNull(): return None # Always scale to the target size. Qt.KeepAspectRatio will handle # both upscaling and downscaling correctly. SmoothTransformation gives # better quality for upscaling. return img.scaled(size, size, Qt.KeepAspectRatio, Qt.SmoothTransformation) except Exception as e: logger.error(f"Error generating thumbnail for {path}: {e}") return None class CacheWriter(QThread): """ Dedicated background thread for writing thumbnails to LMDB asynchronously. Batches writes to improve disk throughput and avoid blocking the scanner. """ def __init__(self, cache): super().__init__() self.cache = cache # Use deque for flexible buffer management and deduplication self._queue = collections.deque() self._mutex = QMutex() self._condition_new_data = QWaitCondition() self._condition_space_available = QWaitCondition() # Soft limit for blocking producers (background threads) self._max_size = 50 self._running = True def enqueue(self, item, block=False): """Queue an item for writing. Item: (device_id, inode_key, img, mtime, size, path)""" if not self._running: return self._mutex.lock() try: if block: # Backpressure for background threads (Scanner/Generator) while len(self._queue) >= self._max_size and self._running: self._condition_space_available.wait(self._mutex) if not self._running: return # Ensure we don't accept new items if stopping, especially when block=False if not self._running: return # --- Soft Cleaning: Deduplication --- # Remove redundant pending updates for the same image/size (e.g. # rapid rotations) if len(item) >= 5: new_dev, new_inode, _, _, new_size = item[:5] if self._queue: # Rebuild deque excluding any item that matches the new one's key self._queue = collections.deque( q for q in self._queue if not (len(q) >= 5 and q[0] == new_dev and q[1] == new_inode and q[4] == new_size) ) # Always append the new item (it becomes the authoritative version) self._queue.append(item) self._condition_new_data.wakeOne() finally: self._mutex.unlock() def stop(self): self._mutex.lock() self._running = False # Do not clear the queue here; let the run loop drain it to prevent data loss. self._condition_new_data.wakeAll() self._condition_space_available.wakeAll() self._mutex.unlock() self.wait() def run(self): self.setPriority(QThread.IdlePriority) while self._running or self._queue: self._mutex.lock() if not self._queue: if not self._running: self._mutex.unlock() break # Wait for new items self._condition_new_data.wait(self._mutex) # Auto-flush: if queue has data but not enough for a full batch, # wait up to 200ms for more data. If timeout, flush what we have. # Only wait if running (flush immediately on stop) if self._running and self._queue and len(self._queue) < 50: signaled = self._condition_new_data.wait(self._mutex, 200) if signaled and self._running and len(self._queue) < 50: self._mutex.unlock() continue if not self._queue: self._mutex.unlock() continue # Gather a batch of items # Adaptive batch size: if queue is backing up, increase transaction size # to improve throughput. # Respect max size even during shutdown to avoid OOM or huge transactions batch_limit = self._max_size batch = [] while self._queue and len(batch) < batch_limit: batch.append(self._queue.popleft()) # Notify producers that space might be available self._condition_space_available.wakeAll() self._mutex.unlock() if batch: try: self.cache._batch_write_to_lmdb(batch) except Exception as e: logger.error(f"CacheWriter batch write error: {e}") class CacheLoader(QThread): """ Background thread to load and decode thumbnails from LMDB without blocking UI. Uses LIFO queue (deque) to prioritize most recently requested images. Implements a "drop-oldest" policy when full to ensure new requests (visible images) are prioritized over old pending ones (scrolled away). """ def __init__(self, cache): super().__init__() self.cache = cache # Use deque for LIFO behavior with drop-oldest capability self._queue = collections.deque() self._max_size = 50 self._mutex = QMutex() self._condition = QWaitCondition() self._running = True def enqueue(self, item): if not self._running: return False, None dropped_item = None self._mutex.lock() try: # If queue is full, drop the OLDEST item (left) to make room for NEWEST # (right). # This ensures that during fast scrolling, we process what is currently # on screen (just added) rather than what was on screen moments ago. while len(self._queue) >= self._max_size: # Drop oldest and return it so caller can cleanup state dropped = self._queue.popleft() dropped_item = (dropped[0], dropped[1]) # path, size self._queue.append(item) self._condition.wakeOne() return True, dropped_item finally: self._mutex.unlock() def promote(self, path, size): """Moves an item to the front of the line (end of deque) if exists.""" if not self._running: return self._mutex.lock() try: # Find item by path and size for item in self._queue: if item[0] == path and item[1] == size: # Move to right end (LIFO pop side - highest priority) self._queue.remove(item) self._queue.append(item) break finally: self._mutex.unlock() def stop(self): self._running = False self._mutex.lock() self._condition.wakeAll() self._mutex.unlock() self.wait() def run(self): self.setPriority(QThread.IdlePriority) while self._running: self._mutex.lock() if not self._queue: # Wait up to 100ms for new items self._condition.wait(self._mutex, 100) if not self._queue: self._mutex.unlock() continue # LIFO: Pop from right (newest) item = self._queue.pop() self._mutex.unlock() path, size, mtime, inode, dev = item # Call synchronous get_thumbnail to fetch and decode # This puts the result into the RAM cache img, _ = self.cache.get_thumbnail( path, size, curr_mtime=mtime, inode=inode, device_id=dev, async_load=False ) if img: self.cache.thumbnail_loaded.emit(path, size) self.cache._mark_load_complete(path, size) class GenerationFuture: """Helper class to synchronize threads waiting for a thumbnail.""" def __init__(self): self._mutex = QMutex() self._condition = QWaitCondition() self._finished = False def wait(self): self._mutex.lock() while not self._finished: self._condition.wait(self._mutex) self._mutex.unlock() def complete(self): self._mutex.lock() self._finished = True self._condition.wakeAll() self._mutex.unlock() class ThumbnailCache(QObject): """ Thread-safe in-memory thumbnail cache with LMDB disk persistence. Optimization: Uses (device, inode) as LMDB key instead of file paths. This gives: - Smaller keys (16 bytes vs variable path length) - Faster lookups and LMDB operations - Automatic handling of file moves/renames - Better cache efficiency """ thumbnail_loaded = Signal(str, int) # path, size def __init__(self): super().__init__() # In-memory cache: {path: {size: (QImage, mtime)}} self._thumbnail_cache = {} # Path -> (device, inode) mapping for fast lookup self._path_to_inode = {} self._loading_set = set() # Track pending async loads (path, size) self._futures = {} # Map (dev, inode, size) -> GenerationFuture self._futures_lock = QMutex() self._cache_lock = QReadWriteLock() self._db_lock = QMutex() # Lock specifically for _db_handles access self._db_handles = {} # Cache for LMDB database handles (dbi) self._cancel_loading = False self._cache_bytes_size = 0 self._cache_writer = None self._cache_loader = None self.lmdb_open() def lmdb_open(self): # Initialize LMDB environment cache_dir = Path(CONFIG_DIR) cache_dir.mkdir(parents=True, exist_ok=True) try: # map_size: 1024MB max database size # max_dbs: 512 named databases (one per device + main DB) self._lmdb_env = lmdb.open( CACHE_PATH, map_size=DISK_CACHE_MAX_BYTES, max_dbs=512, readonly=False, create=True ) logger.info(f"LMDB cache opened: {CACHE_PATH}") # Start the async writer thread self._cache_writer = CacheWriter(self) self._cache_writer.start() # Start the async loader thread self._cache_loader = CacheLoader(self) self._cache_loader.start() except Exception as e: logger.error(f"Failed to open LMDB cache: {e}") self._lmdb_env = None def lmdb_close(self): if hasattr(self, '_cache_writer') and self._cache_writer: self._cache_writer.stop() self._cache_writer = None if hasattr(self, '_cache_loader') and self._cache_loader: self._cache_loader.stop() self._cache_loader = None self._loading_set.clear() self._futures.clear() self._db_handles.clear() if hasattr(self, '_lmdb_env') and self._lmdb_env: self._lmdb_env.close() self._lmdb_env = None def __del__(self): self.lmdb_close() @staticmethod def _get_inode_key(path): """Get inode (8 bytes) for a file path.""" try: stat_info = os.stat(path) # Pack inode as uint64 (8 bytes) return struct.pack('Q', stat_info.st_ino) except (OSError, AttributeError): return None @staticmethod def _get_device_id(path): """Get device ID for a file path.""" try: stat_info = os.stat(path) return stat_info.st_dev except OSError: return 0 def _get_device_db(self, device_id, size, write=False, txn=None): """Get or create a named database for the given device.""" env = self._lmdb_env if not env: return None db_name = f"dev_{device_id}_{size}".encode('utf-8') # Protect access to _db_handles which is not thread-safe by default self._db_lock.lock() try: # Return cached handle if available if db_name in self._db_handles: return self._db_handles[db_name] # Not in cache, try to open/create db = env.open_db(db_name, create=write, integerkey=False, txn=txn) self._db_handles[db_name] = db return db except lmdb.NotFoundError: # Expected when reading from non-existent DB (cache miss) return None except Exception as e: logger.error(f"Error opening device DB for dev {device_id} " f"size {size}: {e}") return None finally: self._db_lock.unlock() @contextmanager def _write_lock(self): """Context manager for write-safe access to cache.""" self._cache_lock.lockForWrite() try: yield finally: self._cache_lock.unlock() @contextmanager def _read_lock(self): """Context manager for read-safe access to cache.""" self._cache_lock.lockForRead() try: yield finally: self._cache_lock.unlock() def _ensure_cache_limit(self): """Enforces cache size limit by evicting oldest entries. Must be called with a write lock held.""" # Safety limit: 512MB for thumbnails in RAM to prevent system freeze MAX_RAM_BYTES = 512 * 1024 * 1024 while len(self._thumbnail_cache) > 0 and ( len(self._thumbnail_cache) >= CACHE_MAX_SIZE or self._cache_bytes_size > MAX_RAM_BYTES): oldest_path = next(iter(self._thumbnail_cache)) cached_sizes = self._thumbnail_cache.pop(oldest_path) for img, _ in cached_sizes.values(): if img: self._cache_bytes_size -= img.sizeInBytes() self._path_to_inode.pop(oldest_path, None) def _get_tier_for_size(self, requested_size): """Determines the ideal thumbnail tier based on the requested size.""" if requested_size < 192: return 128 if requested_size < 320: return 256 return 512 def _resolve_file_identity(self, path, curr_mtime, inode, device_id): """Helper to resolve file mtime, device, and inode.""" mtime = curr_mtime dev_id = device_id if device_id is not None else 0 inode_key = struct.pack('Q', inode) if inode is not None else None if mtime is None or dev_id == 0 or not inode_key: try: stat_res = os.stat(path) mtime = stat_res.st_mtime dev_id = stat_res.st_dev inode_key = struct.pack('Q', stat_res.st_ino) except OSError: return None, 0, None return mtime, dev_id, inode_key def _queue_async_load(self, path, size, mtime, dev_id, inode_key): """Helper to queue async load.""" with self._write_lock(): if (path, size) in self._loading_set: # If already queued, promote to process sooner (LIFO) if self._cache_loader: self._cache_loader.promote(path, size) return if not self._cache_loader: return inode_int = struct.unpack('Q', inode_key)[0] if inode_key else 0 # Optimistically add to set self._loading_set.add((path, size)) success, dropped = self._cache_loader.enqueue( (path, size, mtime, inode_int, dev_id)) if dropped: self._loading_set.discard(dropped) elif not success: self._loading_set.discard((path, size)) def _check_disk_cache(self, path, search_order, mtime, dev_id, inode_key): """Helper to check LMDB synchronously.""" if not self._lmdb_env or not inode_key or dev_id == 0: return None, 0 txn = None try: txn = self._lmdb_env.begin(write=False) for size in search_order: db = self._get_device_db(dev_id, size, write=False, txn=txn) if not db: continue value_bytes = txn.get(inode_key, db=db) if value_bytes and len(value_bytes) > 8: stored_mtime = struct.unpack('d', value_bytes[:8])[0] if stored_mtime != mtime: continue payload = value_bytes[8:] if len(payload) > 4 and payload.startswith(b'PTH\0'): path_len = struct.unpack('I', payload[4:8])[0] img_data = payload[8 + path_len:] else: img_data = payload img = QImage() if img.loadFromData(img_data, "PNG"): with self._write_lock(): if path not in self._thumbnail_cache: self._ensure_cache_limit() self._thumbnail_cache[path] = {} self._thumbnail_cache[path][size] = (img, mtime) self._cache_bytes_size += img.sizeInBytes() self._path_to_inode[path] = (dev_id, inode_key) return img, mtime except Exception as e: logger.debug(f"Cache lookup error for {path}: {e}") finally: if txn: txn.abort() return None, 0 def get_thumbnail(self, path, requested_size, curr_mtime=None, inode=None, device_id=None, async_load=False): """ Safely retrieve a thumbnail from cache, finding the best available size. Returns: tuple (QImage or None, mtime) or (None, 0) if not found. """ # 1. Determine the ideal tier and create a prioritized search order. target_tier = self._get_tier_for_size(requested_size) search_order = [target_tier] + \ sorted([s for s in THUMBNAIL_SIZES if s > target_tier]) + \ sorted([s for s in THUMBNAIL_SIZES if s < target_tier], reverse=True) # 2. Resolve file identity (mtime, dev, inode) mtime, dev_id, inode_key = self._resolve_file_identity( path, curr_mtime, inode, device_id) if mtime is None: return None, 0 # 3. Check memory cache (fastest) with self._read_lock(): cached_sizes = self._thumbnail_cache.get(path) if cached_sizes: for size in search_order: if size in cached_sizes: img, cached_mtime = cached_sizes[size] if img and not img.isNull() and cached_mtime == mtime: return img, mtime # 4. Handle Async Request if async_load: self._queue_async_load(path, target_tier, mtime, dev_id, inode_key) return None, 0 # 5. Check Disk Cache (Sync fallback) return self._check_disk_cache(path, search_order, mtime, dev_id, inode_key) def _mark_load_complete(self, path, size): """Remove item from pending loading set.""" with self._write_lock(): # Uncomment to trace individual completions: # logger.debug(f"Load complete: {path}") self._loading_set.discard((path, size)) def set_thumbnail(self, path, img, mtime, size, inode=None, device_id=None, block=False): """Safely store a thumbnail of a specific size in cache.""" if not img or img.isNull() or size not in THUMBNAIL_SIZES: return False img_size = img.sizeInBytes() if inode is not None and device_id is not None: dev_id = device_id inode_key = struct.pack('Q', inode) else: dev_id = self._get_device_id(path) inode_key = self._get_inode_key(path) if not inode_key or dev_id == 0: return False with self._write_lock(): if path not in self._thumbnail_cache: self._ensure_cache_limit() self._thumbnail_cache[path] = {} else: # Move to end to mark as recently used (LRU behavior) # We pop and re-insert the existing entry entry = self._thumbnail_cache.pop(path) self._thumbnail_cache[path] = entry # If replacing, subtract old size if size in self._thumbnail_cache.get(path, {}): old_img, _ = self._thumbnail_cache[path][size] if old_img: self._cache_bytes_size -= old_img.sizeInBytes() self._thumbnail_cache[path][size] = (img, mtime) self._path_to_inode[path] = (dev_id, inode_key) self._cache_bytes_size += img_size # Enqueue asynchronous write to LMDB if self._cache_writer: self._cache_writer.enqueue( (dev_id, inode_key, img, mtime, size, path), block=block) return True def invalidate_path(self, path): """Removes all cached data for a specific path from memory and disk.""" inode_info = None with self._write_lock(): if path in self._thumbnail_cache: cached_sizes = self._thumbnail_cache.pop(path) for img, _ in cached_sizes.values(): if img: self._cache_bytes_size -= img.sizeInBytes() inode_info = self._path_to_inode.pop(path, None) device_id, inode_key = inode_info if inode_info else (None, None) self._delete_from_lmdb(path, device_id=device_id, inode_key=inode_key) def remove_if_missing(self, paths_to_check): """Remove cache entries for files that no longer exist on disk.""" to_remove = [] with self._read_lock(): cached_paths = list(self._thumbnail_cache.keys()) for path in cached_paths: if not os.path.exists(path): to_remove.append(path) if to_remove: entries_to_delete = [] with self._write_lock(): for path in to_remove: if path in self._thumbnail_cache: cached_sizes = self._thumbnail_cache.pop(path) for img, _ in cached_sizes.values(): if img: self._cache_bytes_size -= img.sizeInBytes() inode_info = self._path_to_inode.pop(path, None) entries_to_delete.append((path, inode_info)) # Delete from LMDB outside the lock for path, inode_info in entries_to_delete: device_id, inode_key = inode_info if inode_info else (None, None) self._delete_from_lmdb(path, device_id=device_id, inode_key=inode_key) logger.info(f"Removed {len(to_remove)} missing entries from cache") return len(to_remove) def clean_orphans(self, stop_check=None): """ Scans the LMDB database for entries where the original file no longer exists. This is a heavy operation and should be run in a background thread. """ if not self._lmdb_env: return 0 orphans_removed = 0 # 1. Get all named databases (one per device/size) db_names = [] try: with self._lmdb_env.begin(write=False) as txn: cursor = txn.cursor() for key, _ in cursor: if key.startswith(b'dev_'): db_names.append(key) except Exception as e: logger.error(f"Error listing DBs: {e}") return 0 # 2. Iterate each DB and check paths for db_name in db_names: if stop_check and stop_check(): return orphans_removed try: # Parse device ID from name "dev_{id}_{size}" parts = db_name.decode('utf-8').split('_') if len(parts) >= 3: db_dev = int(parts[1]) else: db_dev = 0 # Parse device ID from name "dev_{id}_{size}" # We open the DB to scan its keys with self._lmdb_env.begin(write=True) as txn: db = self._lmdb_env.open_db(db_name, txn=txn, create=False) cursor = txn.cursor(db=db) for key, value in cursor: if stop_check and stop_check(): return orphans_removed if len(value) > 12 and value[8:12] == b'PTH\0': path_len = struct.unpack('I', value[12:16])[0] path_bytes = value[16:16+path_len] path = path_bytes.decode('utf-8', errors='ignore') should_delete = False try: st = os.stat(path) key_inode = struct.unpack('Q', key)[0] if st.st_dev != db_dev or st.st_ino != key_inode: should_delete = True except OSError: should_delete = True if should_delete: cursor.delete() orphans_removed += 1 except Exception as e: logger.error(f"Error cleaning DB {db_name}: {e}") return orphans_removed def clear_cache(self): """Clear both in-memory and LMDB cache.""" with self._write_lock(): self._thumbnail_cache.clear() self._path_to_inode.clear() self._cache_bytes_size = 0 self.lmdb_close() try: if os.path.exists(CACHE_PATH): shutil.rmtree(CACHE_PATH) self.lmdb_open() logger.info("LMDB cache cleared by removing directory.") except Exception as e: logger.error(f"Error clearing LMDB by removing directory: {e}") def _batch_write_to_lmdb(self, batch): """Write a batch of thumbnails to LMDB in a single transaction.""" env = self._lmdb_env if not env or not batch: return data_to_write = [] # 1. Prepare data (image encoding) outside the transaction lock # This is CPU intensive, better done without holding the DB lock if possible, # though LMDB write lock mostly blocks other writers. for item in batch: # Small sleep to yield GIL/CPU to UI thread during heavy batch encoding QThread.msleep(1) if len(item) == 6: device_id, inode_key, img, mtime, size, path = item else: device_id, inode_key, img, mtime, size = item path = None if not img or img.isNull(): continue try: img_data = self._image_to_bytes(img) if not img_data: continue # Pack mtime as a double (8 bytes) and prepend to image data mtime_bytes = struct.pack('d', mtime) # New format: mtime(8) + 'PTH\0'(4) + len(4) + path + img if path: path_encoded = path.encode('utf-8') header = mtime_bytes + b'PTH\0' + \ struct.pack('I', len(path_encoded)) + path_encoded value_bytes = header + img_data else: value_bytes = mtime_bytes + img_data data_to_write.append((device_id, size, inode_key, value_bytes)) except Exception as e: logger.error(f"Error converting image for LMDB: {e}") if not data_to_write: return # 2. Commit to DB in one transaction try: with env.begin(write=True) as txn: for device_id, size, inode_key, value_bytes in data_to_write: # Ensure DB exists (creates if needed) using the current transaction db = self._get_device_db(device_id, size, write=True, txn=txn) if db: try: txn.put(inode_key, value_bytes, db=db) except lmdb.Error as e: # Handle potential stale DB handles (EINVAL) # This happens if a previous transaction created the handle # but aborted. if "Invalid argument" in str(e): db_name = f"dev_{device_id}_{size}".encode('utf-8') self._db_lock.lock() if db_name in self._db_handles: del self._db_handles[db_name] self._db_lock.unlock() # Retry open and put with fresh handle db = self._get_device_db( device_id, size, write=True, txn=txn) if db: txn.put(inode_key, value_bytes, db=db) else: raise except Exception as e: logger.error(f"Error committing batch to LMDB: {e}") # If transaction failed, handles created within it are now invalid. # Clear cache to be safe. self._db_lock.lock() self._db_handles.clear() self._db_lock.unlock() def _delete_from_lmdb(self, path, device_id=None, inode_key=None): """Delete all thumbnail sizes for a path from LMDB.""" env = self._lmdb_env if not env: return if device_id is None or inode_key is None: device_id = self._get_device_id(path) inode_key = self._get_inode_key(path) if not inode_key or device_id == 0: return for size in THUMBNAIL_SIZES: try: db = self._get_device_db(device_id, size, write=True) if not db: continue with env.begin(write=True) as txn: txn.delete(inode_key, db=db) except Exception as e: logger.error(f"Error deleting from LMDB for size {size}: {e}") @staticmethod def _image_to_bytes(img): """Convert QImage to PNG bytes.""" buf = None try: ba = QByteArray() buf = QBuffer(ba) if not buf.open(QIODevice.WriteOnly): logger.error("Failed to open buffer for image conversion") return None if not img.save(buf, "PNG"): logger.error("Failed to save image to buffer") return None return ba.data() except Exception as e: logger.error(f"Error converting image to bytes: {e}") return None finally: if buf: buf.close() def get_cache_stats(self): """Get current cache statistics.""" with self._read_lock(): # Count all thumbnails, including different sizes of the same image. count = sum(len(sizes) for sizes in self._thumbnail_cache.values()) size = self._cache_bytes_size return count, size def rename_entry(self, old_path, new_path): """Move a cache entry from one path to another.""" # Get new identity to check for cross-filesystem moves try: stat_info = os.stat(new_path) new_dev = stat_info.st_dev new_inode_key = struct.pack('Q', stat_info.st_ino) except OSError: return False entries_to_rewrite = [] old_inode_info = None with self._write_lock(): if old_path not in self._thumbnail_cache: return False self._thumbnail_cache[new_path] = self._thumbnail_cache.pop(old_path) old_inode_info = self._path_to_inode.pop(old_path, None) if old_inode_info: self._path_to_inode[new_path] = (new_dev, new_inode_key) # Always prepare data to persist to update the embedded path in LMDB for size, (img, mtime) in self._thumbnail_cache[new_path].items(): entries_to_rewrite.append((size, img, mtime)) # Perform LMDB operations outside the lock if not old_inode_info: return False old_dev, old_inode = old_inode_info # Only delete old key if physical identity changed (cross-filesystem moves) if old_inode_info != (new_dev, new_inode_key): self._delete_from_lmdb(old_path, device_id=old_dev, inode_key=old_inode) if self._cache_writer: for size, img, mtime in entries_to_rewrite: self._cache_writer.enqueue( (new_dev, new_inode_key, img, mtime, size, new_path), block=False) return True @contextmanager def generation_lock(self, path, size, curr_mtime=None, inode=None, device_id=None): """ Context manager to coordinate thumbnail generation between threads. Prevents double work: if one thread is generating, others wait. Yields: bool: True if the caller should generate the thumbnail. False if the caller waited and should check cache again. """ # Resolve identity for locking key mtime, dev_id, inode_key = self._resolve_file_identity( path, curr_mtime, inode, device_id) if not inode_key or dev_id == 0: # Cannot lock reliably without stable ID, allow generation yield True return key = (dev_id, inode_key, size) future = None owner = False self._futures_lock.lock() if key in self._futures: future = self._futures[key] else: future = GenerationFuture() self._futures[key] = future owner = True self._futures_lock.unlock() if owner: try: yield True finally: future.complete() self._futures_lock.lock() if key in self._futures and self._futures[key] is future: del self._futures[key] self._futures_lock.unlock() else: # Another thread is generating, wait for it future.wait() yield False class CacheCleaner(QThread): """Background thread to remove cache entries for deleted files.""" finished_clean = Signal(int) def __init__(self, cache): super().__init__() self.cache = cache self._is_running = True def stop(self): """Signals the thread to stop.""" self._is_running = False def run(self): self.setPriority(QThread.IdlePriority) if not self._is_running: return # Perform deep cleaning of LMDB removed_count = self.cache.remove_if_missing([]) if self._is_running: removed_count += self.cache.clean_orphans( stop_check=lambda: not self._is_running) self.finished_clean.emit(removed_count) class ThumbnailGenerator(QThread): """ Background thread to generate thumbnails for a specific size for a list of already discovered files. """ generation_complete = Signal() progress = Signal(int, int) # processed, total class SignalEmitter(QObject): """Helper to emit signals from runnables to the main thread.""" progress_tick = Signal() def emit_progress(self): self.progress_tick.emit() def __init__(self, cache, paths, size, thread_pool_manager): super().__init__() self.cache = cache self.paths = paths self.size = size self._abort = False self.thread_pool_manager = thread_pool_manager self._workers = [] self._workers_mutex = QMutex() def stop(self): """Stops the worker thread gracefully.""" self._abort = True self._workers_mutex.lock() for worker in self._workers: worker.shutdown() self._workers_mutex.unlock() self.wait() def run(self): """ Main execution loop. Uses a thread pool to process paths in parallel. """ pool = self.thread_pool_manager.get_pool() emitter = self.SignalEmitter() processed_count = 0 total = len(self.paths) sem = QSemaphore(0) def on_tick(): nonlocal processed_count processed_count += 1 if processed_count % 5 == 0 or processed_count == total: self.progress.emit(processed_count, total) # Use a direct connection or queued connection depending on context, # but since we are in QThread.run, we can connect local slot. # However, runnables run in pool threads. We need thread-safe update. # The signal/slot mechanism handles thread safety automatically. emitter.progress_tick.connect(on_tick, Qt.QueuedConnection) started_count = 0 for path in self.paths: if self._abort: break runnable = ScannerWorker(self.cache, path, target_sizes=[self.size], load_metadata=False, signal_emitter=emitter, semaphore=sem) runnable.setAutoDelete(False) self._workers_mutex.lock() if self._abort: self._workers_mutex.unlock() break self._workers.append(runnable) self._workers_mutex.unlock() pool.start(runnable) started_count += 1 if started_count > 0: sem.acquire(started_count) self._workers_mutex.lock() self._workers.clear() self._workers_mutex.unlock() if not self._abort: self.generation_complete.emit() class ImageScanner(QThread): """ Background thread for scanning directories and loading images. """ # List of tuples (path, QImage_thumb, mtime, tags, rating) # Updated tuple: (path, QImage_thumb, mtime, tags, rating, inode, device_id) images_found = Signal(list) progress_msg = Signal(str) progress_percent = Signal(int) finished_scan = Signal(int) # Total images found more_files_available = Signal(int, int) # Last loaded index, remainder def __init__(self, cache, paths, is_file_list=False, viewers=None, thread_pool_manager=None): # is_file_list is not used if not paths or not isinstance(paths, (list, tuple)): logger.warning("ImageScanner initialized with empty or invalid paths") paths = [] super().__init__() self.cache = cache self.all_files = [] self.thread_pool_manager = thread_pool_manager self._viewers = viewers self._seen_files = set() self._is_file_list = is_file_list if self._is_file_list: self.paths = [] for p in paths: if os.path.isfile(p): p_str = str(p) if p_str not in self._seen_files: self.all_files.append(p_str) self._seen_files.add(p_str) path = os.path.dirname(p) if path not in self.paths: self.paths.append(path) else: self.paths.append(p) else: self.paths = paths self._is_running = True self._auto_load_enabled = False self.count = 0 self.index = 0 self._paused = False self.mutex = QMutex() self.condition = QWaitCondition() self.pending_tasks = [] self._priority_queue = collections.deque() self._processed_paths = set() self._current_workers = [] self._current_workers_mutex = QMutex() # Initial load self.pending_tasks.append((0, APP_CONFIG.get( "scan_batch_size", SCANNER_SETTINGS_DEFAULTS["scan_batch_size"]))) self._last_update_time = 0 if self.thread_pool_manager: self.pool = self.thread_pool_manager.get_pool() else: self.pool = QThreadPool() max_threads = APP_CONFIG.get( "generation_threads", SCANNER_SETTINGS_DEFAULTS.get("generation_threads", 4)) self.pool.setMaxThreadCount(max_threads) logger.info(f"ImageScanner initialized with {len(paths)} paths") def set_auto_load(self, enabled): """Enable or disable automatic loading of all subsequent images.""" self._auto_load_enabled = enabled def set_paused(self, paused): """Pauses or resumes the scanning process.""" self._paused = paused def prioritize(self, paths): """Adds paths to the priority queue for immediate processing.""" self.mutex.lock() for p in paths: self._priority_queue.append(p) self.mutex.unlock() def run(self): # LowPriority ensures UI thread gets CPU time to process events (mouse, draw) self.setPriority(QThread.IdlePriority) self.progress_msg.emit(UITexts.SCANNING_DIRS) self.scan_files() while self._is_running: self.mutex.lock() while not self.pending_tasks and self._is_running: self.condition.wait(self.mutex) if not self._is_running: self.mutex.unlock() break i, to_load = self.pending_tasks.pop(0) self.mutex.unlock() self._process_images(i, to_load) def _update_viewers(self, force=False): if not self._viewers: return current_time = time.time() # Throttle updates to avoid saturating the event loop (max 5 times/sec) if not force and (current_time - self._last_update_time < 0.2): return self._last_update_time = current_time # Image viewers standalone unique sort, this must be improved. all_files_sorted_by_name_ascending = sorted(self.all_files) # Iterate over a copy to avoid runtime errors if list changes in main thread for w in list(self._viewers): try: if isinstance(w, ImageViewer): QTimer.singleShot( 0, w, lambda viewer=w, files=all_files_sorted_by_name_ascending: viewer.update_image_list(files)) except (RuntimeError, Exception): # Handle cases where viewer might be closing/deleted (RuntimeError) or # other issues pass def scan_files(self): for path in self.paths: if not self._is_running: return try: if path.startswith("file://"): path = path[7:] elif path.startswith("file:/"): path = path[6:] # Check if path exists before resolving absolute path to avoid # creating fake paths for search queries. expanded_path = os.path.expanduser(path) if os.path.exists(expanded_path): p = Path(os.path.abspath(expanded_path)) if p.is_file(): p_str = str(p) if p_str not in self._seen_files: self.all_files.append(p_str) self._seen_files.add(p_str) elif p.is_dir(): self._scan_directory(p, 0) self._update_viewers() else: self._search(path) self._update_viewers() # logger.warning(f"Path not found: {p}") except Exception as e: logger.error(f"Error scanning path {path}: {e}") # Ensure final update reaches viewers self._update_viewers(force=True) def _scan_directory(self, dir_path, current_depth): if not self._is_running or current_depth > APP_CONFIG.get( "scan_max_level", SCANNER_SETTINGS_DEFAULTS["scan_max_level"]): return try: for item in dir_path.iterdir(): if not self._is_running: return if item.is_file() and item.suffix.lower() in IMAGE_EXTENSIONS: p = os.path.abspath(str(item)) if p not in self._seen_files: self.all_files.append(p) self._seen_files.add(p) self._update_viewers() elif item.is_dir(): self._scan_directory(item, current_depth + 1) except (PermissionError, OSError): pass def _parse_query(self, query): parser = argparse.ArgumentParser(prog="bagheera-search", add_help=False) # Main arguments parser.add_argument("query", nargs="?", default="") parser.add_argument("-d", "--directory") parser.add_argument("-e", "--exclude", nargs="?", const="", default=None) parser.add_argument("-l", "--limit", type=int) parser.add_argument("-o", "--offset", type=int) parser.add_argument("-r", "--recursive", nargs="?", const="", default=None) parser.add_argument("-x", "--recursive-exclude", nargs="?", const="", default=None) parser.add_argument("-s", "--sort") parser.add_argument("-t", "--type") # Date filters parser.add_argument("--day", type=int) parser.add_argument("--month", type=int) parser.add_argument("--year", type=int) try: args_list = shlex.split(str(query)) args, unknown_args = parser.parse_known_args(args_list) if args.day is not None and args.month is None: raise ValueError("Missing --month (required when --day is used)") if args.month is not None and args.year is None: raise ValueError("Missing --year (required when --month is used)") query_parts = [args.query] if args.query else [] if unknown_args: query_parts.extend(unknown_args) query_text = " ".join(query_parts) # Build options dictionary main_options = {} if args.recursive is not None: main_options["type"] = "folder" else: if args.limit is not None: main_options["limit"] = args.limit if args.offset is not None: main_options["offset"] = args.offset if args.type: main_options["type"] = args.type if args.directory: main_options["directory"] = args.directory if args.year is not None: main_options["year"] = args.year if args.month is not None: main_options["month"] = args.month if args.day is not None: main_options["day"] = args.day if args.sort: main_options["sort"] = args.sort if args.exclude and args.exclude == '': args.recursive_exclude = None if args.recursive_exclude and args.recursive_exclude == '': args.recursive_exclude = None other_options = { "exclude": args.exclude, "id": False, "konsole": False, "limit": args.limit if args.limit and args.recursive is not None else 99999999999, "offset": args.offset if args.offset and args.recursive is not None else 0, "recursive": args.recursive, "recursive_indent": "", "recursive_exclude": args.recursive_exclude, "sort": args.sort, "type": args.type if args.recursive is not None else None, "verbose": False, } return query_text, main_options, other_options except ValueError as e: print(f"Arguments error: {e}") return None, [] except Exception as e: print(f"Unexpected error parsing query: {e}") return None, [] def _search(self, query): engine = APP_CONFIG.get("search_engine", "Bagheera") if HAVE_BAGHEERASEARCH_LIB and (engine == "Bagheera" or not SEARCH_CMD): query_text, main_options, other_options = self._parse_query(query) try: searcher = BagheeraSearcher() for item in searcher.search(query_text, main_options, other_options): if not self._is_running: break p = item["path"].strip() if p and os.path.exists(os.path.expanduser(p)): if p not in self._seen_files: self.all_files.append(p) self._seen_files.add(p) self._update_viewers() except Exception as e: print(f"Error during bagheerasearch library call: {e}") elif SEARCH_CMD: try: cmd = SEARCH_CMD + shlex.split(str(query)) out = subprocess.check_output(cmd, text=True).splitlines() for p in out: if not self._is_running: break p = p.strip() if p.startswith("file://"): p = p[7:] if p and os.path.exists(os.path.expanduser(p)): if p not in self._seen_files: self.all_files.append(p) self._seen_files.add(p) self._update_viewers() except Exception as e: print(f"Error during {SEARCH_CMD} subprocess call: {e}") def load_images(self, i, to_load): if i < 0: i = 0 if i >= len(self.all_files): return self.mutex.lock() self.pending_tasks.append((i, to_load)) self.condition.wakeAll() self.mutex.unlock() def _process_images(self, i, to_load): if i >= len(self.all_files): self.finished_scan.emit(self.count) return if self.thread_pool_manager: max_threads = self.thread_pool_manager.default_thread_count else: max_threads = APP_CONFIG.get( "generation_threads", SCANNER_SETTINGS_DEFAULTS.get("generation_threads", 4)) self.pool.setMaxThreadCount(max_threads) images_loaded = 0 batch = [] while i < len(self.all_files): if not self._is_running: return while self._paused and self._is_running: self.msleep(100) # Collect paths for this chunk to process in parallel chunk_size = max_threads * 2 tasks = [] # List of (path, is_from_priority_queue) # 1. Drain priority queue up to chunk size self.mutex.lock() while len(tasks) < chunk_size and self._priority_queue: p = self._priority_queue.popleft() if p not in self._processed_paths and p in self._seen_files: tasks.append((p, True)) self.mutex.unlock() # 2. Fill remaining chunk space with sequential files temp_i = i while len(tasks) < chunk_size and temp_i < len(self.all_files): p = self.all_files[temp_i] # Skip if already processed (e.g. via priority earlier) if p not in self._processed_paths \ and Path(p).suffix.lower() in IMAGE_EXTENSIONS: tasks.append((p, False)) temp_i += 1 if not tasks: # If no tasks found but still have files (e.g. all skipped extensions), # update index and continue loop i = temp_i continue # Submit tasks to thread pool sem = QSemaphore(0) runnables = [] self._current_workers_mutex.lock() if not self._is_running: self._current_workers_mutex.unlock() return for f_path, _ in tasks: r = ScannerWorker(self.cache, f_path, semaphore=sem) r.setAutoDelete(False) runnables.append(r) self._current_workers.append(r) self.pool.start(r) self._current_workers_mutex.unlock() # Wait only for this chunk to finish using semaphore sem.acquire(len(runnables)) self._current_workers_mutex.lock() self._current_workers.clear() self._current_workers_mutex.unlock() if not self._is_running: return # Process results for r in runnables: if r.result: self._processed_paths.add(r.path) batch.append(r.result) self.count += 1 images_loaded += 1 # Clean up runnables runnables.clear() # Advance sequential index i = temp_i # Emit batch if size is enough (responsiveness optimization) if self.count <= 100: target_batch_size = 20 else: target_batch_size = 200 if len(batch) >= target_batch_size: self.images_found.emit(batch) batch = [] self.msleep(10) # Yield to UI # Check if loading limit reached if images_loaded >= to_load and to_load > 0: if batch: # Emit remaining items self.images_found.emit(batch) next_index = i total_files = len(self.all_files) self.index = next_index self.progress_msg.emit(UITexts.LOADED_PARTIAL.format( self.count, total_files - next_index)) if total_files > 0: percent = int((self.count / total_files) * 100) self.progress_percent.emit(percent) self.more_files_available.emit(next_index, total_files) # This loads all images continuously without pausing only if # explicitly requested if self._auto_load_enabled: self.load_images( next_index, APP_CONFIG.get("scan_batch_size", SCANNER_SETTINGS_DEFAULTS[ "scan_batch_size"])) return if self.count % 10 == 0: # Update progress less frequently self.progress_msg.emit( UITexts.LOADING_SCAN.format(self.count, len(self.all_files))) if len(self.all_files) > 0: percent = int((self.count / len(self.all_files)) * 100) self.progress_percent.emit(percent) self.index = len(self.all_files) if batch: self.images_found.emit(batch) self.progress_percent.emit(100) self.finished_scan.emit(self.count) def stop(self): logger.info("ImageScanner stop requested") self._is_running = False # Cancel currently running workers in the active batch self._current_workers_mutex.lock() for worker in self._current_workers: worker.shutdown() self._current_workers_mutex.unlock() # Wake up the condition variable self.mutex.lock() self.condition.wakeAll() self.mutex.unlock() self.wait()