""" Bagheera Search Library A Python interface for the Baloo search wrapper. """ import ctypes import json import re import sys from pathlib import Path from typing import Dict, Any, Iterator, Optional, Union from baloo_tools import (get_info, get_tags) from bagheera_query_parser_lib import parse_date from pyparsing import ( alphanums, one_of, infix_notation, Group, opAssoc, ParserElement, QuotedString, Word ) ParserElement.enable_packrat() def expression_contains_property(text): pattern = r"\b(?!tags\b)\w+[ \t]*(?:>=|<=|!=|=|>|<|:)" return bool(re.search(pattern, text, re.IGNORECASE)) def expression_contains_tags(text): pattern = r"\btags\b[ \t]*(?:>=|<=|!=|=|>|<|:)" return bool(re.search(pattern, text, re.IGNORECASE)) class EvaluateExpression: def __init__(self): # Pre-define the grammar structure during initialization self.grammar = self._build_grammar() def _compare_single(self, l_val, op, r_val): """ Atomic comparison logic for individual values. Handles numeric conversion and standard operators. """ # Numeric conversion for mathematical operators if op in (">", "<", ">=", "<="): try: # Attempt to treat both sides as floats curr_l, curr_r = float(l_val), float(r_val) except (ValueError, TypeError): # Fallback to string comparison if conversion fails curr_l, curr_r = str(l_val), str(r_val) else: # Default to string representation for other operators curr_l, curr_r = str(l_val), str(r_val) # Standard operator logic if op == "=": return l_val == r_val if op == "!=": return l_val != r_val if op == ">": return curr_l > curr_r if op == "<": return curr_l < curr_r if op == ">=": return curr_l >= curr_r if op == "<=": return curr_l <= curr_r if op == ":": return str(r_val).lower() in str(l_val).lower() return False def _compare(self, data, left_key, op, right_val): """ Main comparison router. Checks if the field is a list or a single value. """ # Normalize data keys to lowercase for case-insensitive lookup normalized_data = {k.lower(): v for k, v in data.items()} # Extract the left-hand value (the field from the JSON) l_val = normalized_data.get(left_key.lower(), left_key) # Extract the right-hand value (check if it's a literal or another field) r_val = normalized_data.get(str(right_val).lower(), right_val) # IF THE FIELD VALUE IS A LIST if isinstance(l_val, list): # Return True if ANY item in the list satisfies the condition return any(self._compare_single(item, op, r_val) for item in l_val) # IF THE FIELD VALUE IS A SINGLE DATA POINT return self._compare_single(l_val, op, r_val) def _build_grammar(self): """ Defines the pyparsing grammar for the expression engine. """ operators = one_of(">= <= != = > < :") identifier = Word(alphanums + "_./\\") quoted_string = QuotedString("'") | QuotedString('"') operand = quoted_string | identifier # Define basic condition (e.g., "width > 100" or "word") condition = Group((operand + operators + operand) | operand) # Attach the parse action to convert tokens into executable functions (lambdas) condition.set_parse_action(lambda t: self._create_evaluator_func(t[0])) return infix_notation( condition, [ ("NOT", 1, opAssoc.RIGHT, lambda t: ( lambda data: not t[0][1](data))), ("AND", 2, opAssoc.LEFT, lambda t: ( lambda data: all(f(data) for f in t[0] if callable(f)))), ("OR", 2, opAssoc.LEFT, lambda t: ( lambda data: any(f(data) for f in t[0] if callable(f)))), ], ) def _create_evaluator_func(self, tokens): """ Creates a closure that captures tokens and waits for the data dictionary. """ if len(tokens) == 1: # Rule: Single term -> path CONTAINS term return lambda data: self._compare(data, 'path', ':', tokens[0]) else: # Rule: Explicit triplet (key, operator, value) return lambda data: self._compare(data, tokens[0], tokens[1], tokens[2]) def compile(self, expression): """ Parses the expression once and returns a reusable function. """ try: return self.grammar.parse_string(expression, parse_all=True)[0] except Exception as e: print(f"Compilation Error: {e}") # Fallback: return a function that always fails gracefully return lambda data: False class BagheeraSearcher: """Class to handle Baloo searches and interact with the C wrapper.""" def __init__(self, lib_path: Optional[Union[str, Path]] = None) -> None: self.ids_processed: set[int] = set() self.baloo_lib = self._load_baloo_wrapper(lib_path) def _load_baloo_wrapper(self, custom_path: Optional[Union[str, Path]]) \ -> ctypes.CDLL: """Loads and configures the Baloo C wrapper library.""" if custom_path: lib_path = Path(custom_path) else: lib_name = "libbaloo_wrapper.so" if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'): base_dir = Path(getattr(sys, '_MEIPASS')) / 'lib' else: base_dir = Path(__file__).parent.absolute() search_paths = [base_dir] if sys.prefix != sys.base_prefix: venv_base = Path(sys.prefix) search_paths.append(venv_base / "lib64") search_paths.append(venv_base / "lib") search_paths.extend([ Path("/lib64"), Path("/lib"), Path("/usr/lib64"), Path("/usr/lib"), Path("/usr/local/lib64"), Path("/usr/local/lib") ]) lib_path = None for path in search_paths: potential_path = path / lib_name if potential_path.exists(): lib_path = potential_path break if lib_path is None or not lib_path.exists(): raise FileNotFoundError( f"ERROR: Baloo wrapper '{lib_name}' not found at {search_paths}" ) lib = ctypes.CDLL(str(lib_path)) lib.execute_baloo_query.argtypes = [ctypes.c_char_p] lib.execute_baloo_query.restype = ctypes.c_char_p lib.get_file_properties.argtypes = [ctypes.c_char_p] lib.get_file_properties.restype = ctypes.c_char_p return lib def get_baloo_info(self, file_path: str) -> Dict[str, str]: """Extract properties for a specific file directly from file.""" result = self.baloo_lib.get_file_properties(file_path.encode("utf-8")) if not result: return {} data_raw = result.decode("utf-8") properties = {} for entry in data_raw.split("|"): if ":" in entry: k, v = entry.split(":", 1) properties[k] = v return properties def _execute_query(self, options: Dict[str, Any]) -> list: """Helper method to execute the query against the C wrapper.""" query_json = json.dumps(options).encode("utf-8") result_ptr = self.baloo_lib.execute_baloo_query(query_json) if not result_ptr: return [] try: raw_results = result_ptr.decode("utf-8") return json.loads(raw_results) except json.JSONDecodeError as e: print(f"JSON decode error from Baloo wrapper: {e}") return [] def search_recursive( self, query_text: str, options: Dict[str, Any], search_opts: Dict[str, Any], files_count: int, exclude_evaluator: Any, exclude_sources: Dict[str, bool] ) -> Iterator[Dict[str, Any]]: """Executes a recursive search yielded item by item.""" options["query"] = query_text files = self._execute_query(options) for item in files: if search_opts.get("limit", 0) <= 0: break file_id = int(item["id"], 16) if file_id in self.ids_processed: continue self.ids_processed.add(file_id) if exclude_evaluator: file_info = {'path': item["path"]} if exclude_sources.get('properties'): file_info = file_info | get_info(file_id) if exclude_sources.get('tags'): file_info = file_info | get_tags(file_id) else: file_info = None if not file_info or not exclude_evaluator(file_info): if files_count >= search_opts.get("offset", 0): search_opts["limit"] -= 1 yield item files_count += 1 def search( self, query_text: str, main_options: Dict[str, Any], search_opts: Dict[str, Any], ) -> Iterator[Dict[str, Any]]: """ Main search generator. Yields file dictionaries. """ if search_opts['exclude']: ee = EvaluateExpression() exclude_evaluator = ee.compile(search_opts['exclude']) exclude_sources = {} if expression_contains_property(search_opts['exclude']): exclude_sources['properties'] = True if expression_contains_tags(search_opts['exclude']): exclude_sources['tags'] = True else: exclude_evaluator = None exclude_sources = {} if search_opts['recursive_exclude']: ee = EvaluateExpression() recurse_exclude_evaluator = ee.compile(search_opts['recursive_exclude']) recurse_exclude_sources = {} if expression_contains_property(search_opts['recursive_exclude']): recurse_exclude_sources['properties'] = True if expression_contains_tags(search_opts['recursive_exclude']): recurse_exclude_sources['tags'] = True else: recurse_exclude_evaluator = None recurse_exclude_sources = {} main_options["query"] = parse_date(query_text) files = self._execute_query(main_options) if not files: return is_recursive = search_opts.get("recursive") is not None if is_recursive: if search_opts.get("type"): main_options["type"] = search_opts["type"] elif "type" in main_options: main_options.pop("type") rec_query = search_opts.get("recursive") query_text = parse_date(rec_query) if rec_query else "" files_count = 0 for item in files: if search_opts.get("limit", 0) <= 0: break file_id = int(item["id"], 16) if file_id in self.ids_processed: continue self.ids_processed.add(file_id) if exclude_evaluator: file_info = {'path': item["path"]} if exclude_sources.get('properties'): file_info = file_info | get_info(file_id) if exclude_sources.get('tags'): file_info = file_info | get_tags(file_id) else: file_info = None if not file_info or not exclude_evaluator(file_info): if is_recursive: main_options["directory"] = item["path"] yield from self.search_recursive( query_text, main_options, search_opts, files_count, recurse_exclude_evaluator, recurse_exclude_sources ) else: yield item files_count += 1 def reset_state(self) -> None: """Clears the processed IDs to allow for fresh consecutive searches.""" self.ids_processed.clear() if __name__ == "__main__": # Test de integración rápido print(f"Testing {__file__} integration:") try: searcher = BagheeraSearcher() print("✔ Library and wrapper loaded successfully.") # Intento de búsqueda de prueba (limitado a 1 resultado) test_main_opts = {"limit": 1} test_search_opts = {"limit": 1} print("Searching for recent files...") results = list(searcher.search("MODIFIED TODAY", test_main_opts, test_search_opts)) if results: print(f"✔ Found: {results[0].get('path')}") else: print("? No files found for today, but search executed correctly.") except FileNotFoundError as e: print(f"✘ Setup error: {e}") except Exception as e: print(f"✘ Unexpected error: {e}") if __name__ == "__main__": # Integration test block print(f"Testing {__file__} integration:") try: searcher = BagheeraSearcher() print("✔ Library and wrapper loaded successfully.") # Test search (limited to 1 result for today) test_main_opts = {"limit": 1} test_search_opts = {"limit": 1} print("Searching for recent files...") results = list(searcher.search( "MODIFIED TODAY", test_main_opts, test_search_opts )) if results: print(f"✔ Found: {results[0].get('path')}") else: print("? No files found for today, but search executed correctly.") except FileNotFoundError as e: print(f"✘ Setup error: {e}") except Exception as e: print(f"✘ Unexpected error: {e}")