diff --git a/bagheeraview.py b/bagheeraview.py index 2c0a013..6b8cf4b 100755 --- a/bagheeraview.py +++ b/bagheeraview.py @@ -68,7 +68,8 @@ from constants import ( ) import constants from settings import SettingsDialog -from imagescanner import CacheCleaner, ImageScanner, ThumbnailCache, ThumbnailGenerator +from imagescanner import (CacheCleaner, ImageScanner, ThumbnailCache, + ThumbnailGenerator, ThreadPoolManager) from imageviewer import ImageViewer from propertiesdialog import PropertiesDialog from widgets import ( @@ -903,13 +904,14 @@ class MainWindow(QMainWindow): scanners and individual image viewer windows. """ - def __init__(self, cache, args): + def __init__(self, cache, args, thread_pool_manager): """ Initializes the MainWindow. Args: cache (ThumbnailCache): The shared thumbnail cache instance. args (list): Command-line arguments passed to the application. + thread_pool_manager (ThreadPoolManager): The shared thread pool manager. """ super().__init__() self.cache = cache @@ -917,6 +919,7 @@ class MainWindow(QMainWindow): self.set_app_icon() self.viewer_shortcuts = {} + self.thread_pool_manager = thread_pool_manager self.full_history = [] self.history = [] self.current_thumb_size = THUMBNAILS_DEFAULT_SIZE @@ -1320,12 +1323,14 @@ class MainWindow(QMainWindow): def _on_scroll_interaction(self, value): """Pauses scanning during scroll to keep UI fluid.""" if self.scanner and self.scanner.isRunning(): + self.thread_pool_manager.set_user_active(True) self.scanner.set_paused(True) self.resume_scan_timer.start() def _resume_scanning(self): """Resumes scanning after interaction pause.""" if self.scanner: + self.thread_pool_manager.set_user_active(False) # Prioritize currently visible images visible_paths = self.get_visible_image_paths() self.scanner.prioritize(visible_paths) @@ -1676,6 +1681,7 @@ class MainWindow(QMainWindow): # Trigger a repaint to apply other color changes like filename color self._apply_global_stylesheet() + self.thread_pool_manager.update_default_thread_count() self.thumbnail_view.updateGeometries() self.thumbnail_view.viewport().update() @@ -2352,6 +2358,7 @@ class MainWindow(QMainWindow): self.is_cleaning = False self.scanner = ImageScanner(self.cache, paths, is_file_list=self._scan_all, + thread_pool_manager=self.thread_pool_manager, viewers=self.viewers) if self._is_loading_all: self.scanner.set_auto_load(True) @@ -3516,7 +3523,8 @@ class MainWindow(QMainWindow): if not paths: return - self.thumbnail_generator = ThumbnailGenerator(self.cache, paths, size) + self.thumbnail_generator = ThumbnailGenerator( + self.cache, paths, size, self.thread_pool_manager) self.thumbnail_generator.generation_complete.connect( self.on_high_res_generation_finished) self.thumbnail_generator.progress.connect( @@ -3983,7 +3991,8 @@ class MainWindow(QMainWindow): # Create a ThumbnailGenerator to regenerate the thumbnail size = self._get_tier_for_size(self.current_thumb_size) - self.thumbnail_generator = ThumbnailGenerator(self.cache, [path], size) + self.thumbnail_generator = ThumbnailGenerator( + self.cache, [path], size, self.thread_pool_manager) self.thumbnail_generator.generation_complete.connect( self.on_high_res_generation_finished) self.thumbnail_generator.progress.connect( @@ -4362,6 +4371,7 @@ def main(): # Increase QPixmapCache limit (default is usually small, ~10MB) to ~100MB QPixmapCache.setCacheLimit(102400) + thread_pool_manager = ThreadPoolManager() cache = ThumbnailCache() args = [a for a in sys.argv[1:] if a != "--x11"] @@ -4370,7 +4380,7 @@ def main(): if path.startswith("file:/"): path = path[6:] - win = MainWindow(cache, args) + win = MainWindow(cache, args, thread_pool_manager) shortcut_controller = AppShortcutController(win) win.shortcut_controller = shortcut_controller app.installEventFilter(shortcut_controller) diff --git a/changelog.txt b/changelog.txt index f4957a7..7acac87 100644 --- a/changelog.txt +++ b/changelog.txt @@ -3,10 +3,11 @@ v0.9.11 - · Añadida una nueva área llamada Body. · Refactorizaciones, optimizaciones y cambios a saco. +Add a `shutdown` signal or method to `ScannerWorker` to allow cleaner cancellation of long-running tasks like `generate_thumbnail`. +Implement a mechanism to dynamically adjust the thread pool size based on system load or user activity. - - -Refactor the `ImageScanner` to use a thread pool for parallel thumbnail generation for faster loading. +Implement a mechanism to monitor system CPU load and adjust the thread pool size accordingly. +Refactor the `ThreadPoolManager` to be a QObject and emit signals when the thread count changes. Implement a "Comparison" mode to view 2 or 4 images side-by-side in the viewer. diff --git a/constants.py b/constants.py index 1ad62a1..52a5d65 100644 --- a/constants.py +++ b/constants.py @@ -168,11 +168,6 @@ if importlib.util.find_spec("mediapipe") is not None: HAVE_FACE_RECOGNITION = importlib.util.find_spec("face_recognition") is not None HAVE_BAGHEERASEARCH_LIB = False -try: - import bagheera_search_lib - HAVE_BAGHEERASEARCH_LIB = True -except ImportError: - pass MEDIAPIPE_FACE_MODEL_PATH = os.path.join(CONFIG_DIR, "blaze_face_short_range.tflite") diff --git a/imagescanner.py b/imagescanner.py index 2699389..89c4fa3 100644 --- a/imagescanner.py +++ b/imagescanner.py @@ -28,9 +28,10 @@ import collections from pathlib import Path from contextlib import contextmanager import lmdb -from PySide6.QtCore import (QObject, QThread, Signal, QMutex, QReadWriteLock, QSize, - QWaitCondition, QByteArray, QBuffer, QIODevice, Qt, QTimer, - QRunnable, QThreadPool) +from PySide6.QtCore import ( + QObject, QThread, Signal, QMutex, QReadWriteLock, QSize, QSemaphore, QWaitCondition, + QByteArray, QBuffer, QIODevice, Qt, QTimer, QRunnable, QThreadPool, QFile +) from PySide6.QtGui import QImage, QImageReader, QImageIOHandler from constants import ( @@ -49,10 +50,193 @@ if HAVE_BAGHEERASEARCH_LIB: logger = logging.getLogger(__name__) -def generate_thumbnail(path, size): +class ThreadPoolManager: + """Manages a global QThreadPool to dynamically adjust thread count.""" + def __init__(self): + self.pool = QThreadPool() + self.default_thread_count = APP_CONFIG.get( + "generation_threads", + SCANNER_SETTINGS_DEFAULTS.get("generation_threads", 4) + ) + self.pool.setMaxThreadCount(self.default_thread_count) + self.is_user_active = False + logger.info(f"ThreadPoolManager initialized with {self.default_thread_count} threads.") + + def get_pool(self): + """Returns the managed QThreadPool instance.""" + return self.pool + + def set_user_active(self, active): + """ + Adjusts thread count based on user activity. + + Args: + active (bool): True if the user is interacting with the UI. + """ + if active == self.is_user_active: + return + self.is_user_active = active + if active: + # User is active, reduce threads to 1 to prioritize UI responsiveness. + self.pool.setMaxThreadCount(1) + logger.debug("User is active, reducing thread pool to 1.") + else: + # User is idle, restore to default thread count. + self.pool.setMaxThreadCount(self.default_thread_count) + logger.debug(f"User is idle, restoring thread pool to {self.default_thread_count}.") + + def update_default_thread_count(self): + """Updates the default thread count from application settings.""" + self.default_thread_count = APP_CONFIG.get( + "generation_threads", + SCANNER_SETTINGS_DEFAULTS.get("generation_threads", 4) + ) + # Only apply if not in a user-active (low-thread) state. + if not self.is_user_active: + self.pool.setMaxThreadCount(self.default_thread_count) + logger.info(f"Default thread count updated to {self.default_thread_count}.") + + +class ScannerWorker(QRunnable): + """ + Worker to process a single image in a thread pool. + Handles thumbnail retrieval/generation and metadata loading. + """ + def __init__(self, cache, path, target_sizes=None, load_metadata=True, + signal_emitter=None, semaphore=None): + super().__init__() + self.cache = cache + self.path = path + self.target_sizes = target_sizes + self.load_metadata_flag = load_metadata + self.emitter = signal_emitter + self.semaphore = semaphore + self._is_cancelled = False + # Result will be (path, thumb, mtime, tags, rating, inode, dev) or None + self.result = None + + def shutdown(self): + """Marks the worker as cancelled.""" + self._is_cancelled = True + + def run(self): + from constants import SCANNER_GENERATE_SIZES + + sizes_to_check = self.target_sizes if self.target_sizes is not None \ + else SCANNER_GENERATE_SIZES + + if self._is_cancelled: + if self.semaphore: + self.semaphore.release() + return + + fd = None + try: + # Optimize: Open file once to reuse FD for stat and xattrs + fd = os.open(self.path, os.O_RDONLY) + stat_res = os.fstat(fd) + curr_mtime = stat_res.st_mtime + curr_inode = stat_res.st_ino + curr_dev = stat_res.st_dev + + smallest_thumb_for_signal = None + min_size = min(sizes_to_check) if sizes_to_check else 0 + + # Ensure required thumbnails exist + for size in sizes_to_check: + if self._is_cancelled: + return + + # Check if a valid thumbnail for this size exists + thumb, mtime = self.cache.get_thumbnail(self.path, size, + curr_mtime=curr_mtime, + inode=curr_inode, + device_id=curr_dev) + if not thumb or mtime != curr_mtime: + # Use generation lock to prevent multiple threads generating + with self.cache.generation_lock( + self.path, size, curr_mtime, + curr_inode, curr_dev) as should_gen: + if self._is_cancelled: + return + + if should_gen: + # I am the owner, I generate the thumbnail + new_thumb = generate_thumbnail(self.path, size, fd=fd) + if self._is_cancelled: + return + if new_thumb and not new_thumb.isNull(): + self.cache.set_thumbnail( + self.path, new_thumb, curr_mtime, size, + inode=curr_inode, device_id=curr_dev, block=True) + if size == min_size: + smallest_thumb_for_signal = new_thumb + else: + # Another thread generated it, re-fetch + if size == min_size: + re_thumb, _ = self.cache.get_thumbnail( + self.path, size, curr_mtime=curr_mtime, + inode=curr_inode, device_id=curr_dev, + async_load=False) + smallest_thumb_for_signal = re_thumb + elif size == min_size: + # valid thumb exists, use it for signal + smallest_thumb_for_signal = thumb + + tags = [] + rating = 0 + if self.load_metadata_flag: + tags, rating = self._load_metadata(fd) + self.result = (self.path, smallest_thumb_for_signal, + curr_mtime, tags, rating, curr_inode, curr_dev) + except Exception as e: + logger.error(f"Error processing image {self.path}: {e}") + self.result = None + finally: + if fd is not None: + try: + os.close(fd) + except OSError: + pass + if self.emitter: + self.emitter.emit_progress() + if self.semaphore: + self.semaphore.release() + + def _load_metadata(self, path_or_fd): + """Loads tag and rating data for a path or file descriptor.""" + tags = [] + raw_tags = XattrManager.get_attribute(path_or_fd, XATTR_NAME) + if raw_tags: + tags = sorted(list(set(t.strip() + for t in raw_tags.split(',') if t.strip()))) + + raw_rating = XattrManager.get_attribute(path_or_fd, RATING_XATTR_NAME, "0") + try: + rating = int(raw_rating) + except ValueError: + rating = 0 + return tags, rating + + +def generate_thumbnail(path, size, fd=None): """Generates a QImage thumbnail for a given path and size.""" try: - reader = QImageReader(path) + qfile = None + if fd is not None: + try: + # Ensure we are at the beginning of the file + os.lseek(fd, 0, os.SEEK_SET) + qfile = QFile() + if qfile.open(fd, QIODevice.ReadOnly, QFile.DontCloseHandle): + reader = QImageReader(qfile) + else: + qfile = None + reader = QImageReader(path) + except OSError: + reader = QImageReader(path) + else: + reader = QImageReader(path) # Optimization: Instruct the image decoder to scale while reading. # This drastically reduces memory usage and CPU time for large images @@ -1046,45 +1230,6 @@ class CacheCleaner(QThread): self.finished_clean.emit(removed_count) -class ThumbnailRunnable(QRunnable): - """Runnable task to generate a single thumbnail.""" - def __init__(self, cache, path, size, signal_emitter): - super().__init__() - self.cache = cache - self.path = path - self.size = size - self.emitter = signal_emitter - - def run(self): - try: - # Optimization: Single stat call per file - stat_res = os.stat(self.path) - curr_mtime = stat_res.st_mtime - inode = stat_res.st_ino - dev = stat_res.st_dev - - # Check cache first to avoid expensive generation - thumb, mtime = self.cache.get_thumbnail( - self.path, self.size, curr_mtime=curr_mtime, - inode=inode, device_id=dev, async_load=False) - - if not thumb or mtime != curr_mtime: - # Use the generation lock to coordinate - with self.cache.generation_lock( - self.path, self.size, curr_mtime, inode, dev) as should_gen: - if should_gen: - # I am the owner, I generate the thumbnail - new_thumb = generate_thumbnail(self.path, self.size) - if new_thumb and not new_thumb.isNull(): - self.cache.set_thumbnail( - self.path, new_thumb, curr_mtime, self.size, - inode=inode, device_id=dev, block=True) - except Exception as e: - logger.error(f"Error generating thumbnail for {self.path}: {e}") - finally: - self.emitter.emit_progress() - - class ThumbnailGenerator(QThread): """ Background thread to generate thumbnails for a specific size for a list of @@ -1097,34 +1242,38 @@ class ThumbnailGenerator(QThread): """Helper to emit signals from runnables to the main thread.""" progress_tick = Signal() - def emit_progress(self): - self.progress_tick.emit() + def emit_progress(self): + self.progress_tick.emit() - def __init__(self, cache, paths, size): + def __init__(self, cache, paths, size, thread_pool_manager): super().__init__() self.cache = cache self.paths = paths self.size = size self._abort = False + self.thread_pool_manager = thread_pool_manager + self._workers = [] + self._workers_mutex = QMutex() def stop(self): """Stops the worker thread gracefully.""" self._abort = True + self._workers_mutex.lock() + for worker in self._workers: + worker.shutdown() + self._workers_mutex.unlock() self.wait() def run(self): """ Main execution loop. Uses a thread pool to process paths in parallel. """ - pool = QThreadPool() - max_threads = APP_CONFIG.get( - "generation_threads", - SCANNER_SETTINGS_DEFAULTS.get("generation_threads", 4)) - pool.setMaxThreadCount(max_threads) + pool = self.thread_pool_manager.get_pool() emitter = self.SignalEmitter() processed_count = 0 total = len(self.paths) + sem = QSemaphore(0) def on_tick(): nonlocal processed_count @@ -1138,14 +1287,34 @@ class ThumbnailGenerator(QThread): # The signal/slot mechanism handles thread safety automatically. emitter.progress_tick.connect(on_tick, Qt.QueuedConnection) + started_count = 0 for path in self.paths: if self._abort: break - runnable = ThumbnailRunnable(self.cache, path, self.size, emitter) - pool.start(runnable) + runnable = ScannerWorker(self.cache, path, target_sizes=[self.size], + load_metadata=False, signal_emitter=emitter, + semaphore=sem) + runnable.setAutoDelete(False) - pool.waitForDone() - self.generation_complete.emit() + self._workers_mutex.lock() + if self._abort: + self._workers_mutex.unlock() + break + self._workers.append(runnable) + self._workers_mutex.unlock() + + pool.start(runnable) + started_count += 1 + + if started_count > 0: + sem.acquire(started_count) + + self._workers_mutex.lock() + self._workers.clear() + self._workers_mutex.unlock() + + if not self._abort: + self.generation_complete.emit() class ImageScanner(QThread): @@ -1159,8 +1328,8 @@ class ImageScanner(QThread): progress_percent = Signal(int) finished_scan = Signal(int) # Total images found more_files_available = Signal(int, int) # Last loaded index, remainder - - def __init__(self, cache, paths, is_file_list=False, viewers=None): + def __init__(self, cache, paths, is_file_list=False, viewers=None, + thread_pool_manager=None): # is_file_list is not used if not paths or not isinstance(paths, (list, tuple)): logger.warning("ImageScanner initialized with empty or invalid paths") @@ -1168,6 +1337,7 @@ class ImageScanner(QThread): super().__init__() self.cache = cache self.all_files = [] + self.thread_pool_manager = thread_pool_manager self._viewers = viewers self._seen_files = set() self._is_file_list = is_file_list @@ -1196,12 +1366,23 @@ class ImageScanner(QThread): self.pending_tasks = [] self._priority_queue = collections.deque() self._processed_paths = set() + self._current_workers = [] + self._current_workers_mutex = QMutex() # Initial load self.pending_tasks.append((0, APP_CONFIG.get( "scan_batch_size", SCANNER_SETTINGS_DEFAULTS["scan_batch_size"]))) self._last_update_time = 0 + if self.thread_pool_manager: + self.pool = self.thread_pool_manager.get_pool() + else: + self.pool = QThreadPool() + max_threads = APP_CONFIG.get( + "generation_threads", + SCANNER_SETTINGS_DEFAULTS.get("generation_threads", 4)) + self.pool.setMaxThreadCount(max_threads) + logger.info(f"ImageScanner initialized with {len(paths)} paths") def set_auto_load(self, enabled): @@ -1455,84 +1636,129 @@ class ImageScanner(QThread): self.finished_scan.emit(self.count) return + if self.thread_pool_manager: + max_threads = self.thread_pool_manager.default_thread_count + else: + max_threads = APP_CONFIG.get( + "generation_threads", + SCANNER_SETTINGS_DEFAULTS.get("generation_threads", 4)) + self.pool.setMaxThreadCount(max_threads) + images_loaded = 0 batch = [] while i < len(self.all_files): if not self._is_running: return - self.msleep(1) # Force yield to UI thread per item while self._paused and self._is_running: self.msleep(100) - # 1. Check priority queue first - priority_path = None + # Collect paths for this chunk to process in parallel + chunk_size = max_threads * 2 + tasks = [] # List of (path, is_from_priority_queue) + + # 1. Drain priority queue up to chunk size self.mutex.lock() - while self._priority_queue: + while len(tasks) < chunk_size and self._priority_queue: p = self._priority_queue.popleft() if p not in self._processed_paths and p in self._seen_files: - priority_path = p - break + tasks.append((p, True)) self.mutex.unlock() - # 2. Determine file to process - if priority_path: - f_path = priority_path - # Don't increment 'i' yet, we are processing out of order - else: - f_path = self.all_files[i] - i += 1 # Only advance sequential index if processing sequentially + # 2. Fill remaining chunk space with sequential files + temp_i = i + while len(tasks) < chunk_size and temp_i < len(self.all_files): + p = self.all_files[temp_i] + # Skip if already processed (e.g. via priority earlier) + if p not in self._processed_paths \ + and Path(p).suffix.lower() in IMAGE_EXTENSIONS: + tasks.append((p, False)) + temp_i += 1 - if f_path not in self._processed_paths \ - and Path(f_path).suffix.lower() in IMAGE_EXTENSIONS: - # Pass the batch list to store result instead of emitting immediately - was_loaded = self._process_single_image(f_path, batch) + if not tasks: + # If no tasks found but still have files (e.g. all skipped extensions), + # update index and continue loop + i = temp_i + continue - # Emit batch if size is enough (responsiveness optimization) - # Dynamic batching: Start small for instant feedback. - # Keep batches small enough to prevent UI starvation during rapid cache - # reads. - if self.count <= 100: - target_batch_size = 20 - else: - target_batch_size = 200 + # Submit tasks to thread pool + sem = QSemaphore(0) + runnables = [] - if len(batch) >= target_batch_size: + self._current_workers_mutex.lock() + if not self._is_running: + self._current_workers_mutex.unlock() + return - self.images_found.emit(batch) - batch = [] - # Yield briefly to let the main thread process the emitted batch - # (update UI), preventing UI freeze during fast cache reading. - self.msleep(10) + for f_path, _ in tasks: + r = ScannerWorker(self.cache, f_path, semaphore=sem) + r.setAutoDelete(False) + runnables.append(r) + self._current_workers.append(r) + self.pool.start(r) + self._current_workers_mutex.unlock() - if was_loaded: - self._processed_paths.add(f_path) + # Wait only for this chunk to finish using semaphore + sem.acquire(len(runnables)) + + self._current_workers_mutex.lock() + self._current_workers.clear() + self._current_workers_mutex.unlock() + + if not self._is_running: + return + + # Process results + for r in runnables: + if r.result: + self._processed_paths.add(r.path) + batch.append(r.result) + self.count += 1 images_loaded += 1 - if images_loaded >= to_load and to_load > 0: - if batch: # Emit remaining items - self.images_found.emit(batch) - next_index = i + 1 - total_files = len(self.all_files) - self.index = next_index - self.progress_msg.emit(UITexts.LOADED_PARTIAL.format( - self.count, total_files - next_index)) + # Clean up runnables + runnables.clear() - if total_files > 0: - percent = int((self.count / total_files) * 100) - self.progress_percent.emit(percent) + # Advance sequential index + i = temp_i - self.more_files_available.emit(next_index, total_files) - # This loads all images continuously without pausing only if - # explicitly requested - if self._auto_load_enabled: - self.load_images( - next_index, - APP_CONFIG.get("scan_batch_size", - SCANNER_SETTINGS_DEFAULTS[ - "scan_batch_size"])) - return + # Emit batch if size is enough (responsiveness optimization) + if self.count <= 100: + target_batch_size = 20 + else: + target_batch_size = 200 + + if len(batch) >= target_batch_size: + self.images_found.emit(batch) + batch = [] + self.msleep(10) # Yield to UI + + # Check if loading limit reached + if images_loaded >= to_load and to_load > 0: + if batch: # Emit remaining items + self.images_found.emit(batch) + + next_index = i + total_files = len(self.all_files) + self.index = next_index + self.progress_msg.emit(UITexts.LOADED_PARTIAL.format( + self.count, total_files - next_index)) + + if total_files > 0: + percent = int((self.count / total_files) * 100) + self.progress_percent.emit(percent) + + self.more_files_available.emit(next_index, total_files) + # This loads all images continuously without pausing only if + # explicitly requested + if self._auto_load_enabled: + self.load_images( + next_index, + APP_CONFIG.get("scan_batch_size", + SCANNER_SETTINGS_DEFAULTS[ + "scan_batch_size"])) + return if self.count % 10 == 0: # Update progress less frequently self.progress_msg.emit( @@ -1547,88 +1773,17 @@ class ImageScanner(QThread): self.progress_percent.emit(100) self.finished_scan.emit(self.count) - def _load_metadata(self, path_or_fd): - """Loads tag and rating data for a path or file descriptor.""" - tags = [] - - raw_tags = XattrManager.get_attribute(path_or_fd, XATTR_NAME) - if raw_tags: - tags = sorted(list(set(t.strip() - for t in raw_tags.split(',') if t.strip()))) - - raw_rating = XattrManager.get_attribute(path_or_fd, RATING_XATTR_NAME, "0") - try: - rating = int(raw_rating) - except ValueError: - rating = 0 - return tags, rating - - def _process_single_image(self, f_path, batch_list): - from constants import SCANNER_GENERATE_SIZES - - fd = None - try: - # Optimize: Open file once to reuse FD for stat and xattrs - fd = os.open(f_path, os.O_RDONLY) - stat_res = os.fstat(fd) - curr_mtime = stat_res.st_mtime - curr_inode = stat_res.st_ino - curr_dev = stat_res.st_dev - - smallest_thumb_for_signal = None - - # Ensure required thumbnails exist - for size in SCANNER_GENERATE_SIZES: - # Check if a valid thumbnail for this size exists - thumb, mtime = self.cache.get_thumbnail(f_path, size, - curr_mtime=curr_mtime, - inode=curr_inode, - device_id=curr_dev) - if not thumb or mtime != curr_mtime: - # Use generation lock to prevent multiple threads generating the - # same thumb - with self.cache.generation_lock( - f_path, size, curr_mtime, - curr_inode, curr_dev) as should_gen: - if should_gen: - # I am the owner, I generate the thumbnail - new_thumb = generate_thumbnail(f_path, size) - if new_thumb and not new_thumb.isNull(): - self.cache.set_thumbnail( - f_path, new_thumb, curr_mtime, size, - inode=curr_inode, device_id=curr_dev, block=True) - if size == min(SCANNER_GENERATE_SIZES): - smallest_thumb_for_signal = new_thumb - else: - # Another thread generated it, re-fetch to use it for the - # signal - if size == min(SCANNER_GENERATE_SIZES): - re_thumb, _ = self.cache.get_thumbnail( - f_path, size, curr_mtime=curr_mtime, - inode=curr_inode, device_id=curr_dev, - async_load=False) - smallest_thumb_for_signal = re_thumb - elif size == min(SCANNER_GENERATE_SIZES): - # valid thumb exists, use it for signal - smallest_thumb_for_signal = thumb - - tags, rating = self._load_metadata(fd) - batch_list.append((f_path, smallest_thumb_for_signal, - curr_mtime, tags, rating, curr_inode, curr_dev)) - self.count += 1 - return True - except Exception as e: - logger.error(f"Error processing image {f_path}: {e}") - return False - finally: - if fd is not None: - try: - os.close(fd) - except OSError: - pass - def stop(self): + logger.info("ImageScanner stop requested") self._is_running = False + + # Cancel currently running workers in the active batch + self._current_workers_mutex.lock() + for worker in self._current_workers: + worker.shutdown() + self._current_workers_mutex.unlock() + + # Wake up the condition variable self.mutex.lock() self.condition.wakeAll() self.mutex.unlock()