285 lines
8.9 KiB
Plaintext
285 lines
8.9 KiB
Plaintext
"""
|
|
Bagheera Search Library
|
|
A Python interface for the Baloo search wrapper.
|
|
"""
|
|
|
|
import ctypes
|
|
import json
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, Any, Iterator, Optional, Union
|
|
|
|
from baloo_tools import get_resolution
|
|
from date_query_parser import parse_date
|
|
|
|
|
|
class BagheeraSearcher:
|
|
"""Class to handle Baloo searches and interact with the C wrapper."""
|
|
|
|
def __init__(self, lib_path: Optional[Union[str, Path]] = None) -> None:
|
|
self.ids_processed: set[int] = set()
|
|
self.baloo_lib = self._load_baloo_wrapper(lib_path)
|
|
|
|
def _load_baloo_wrapper(self, custom_path: Optional[Union[str, Path]]) \
|
|
-> ctypes.CDLL:
|
|
"""Loads and configures the Baloo C wrapper library."""
|
|
if custom_path:
|
|
lib_path = Path(custom_path)
|
|
else:
|
|
if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
|
|
current_dir = Path(getattr(sys, '_MEIPASS')) / 'lib'
|
|
else:
|
|
current_dir = Path(__file__).parent.absolute()
|
|
|
|
lib_name = "libbaloo_wrapper.so"
|
|
lib_path = current_dir / lib_name
|
|
|
|
if not lib_path.exists():
|
|
raise FileNotFoundError(
|
|
f"ERROR: Baloo wrapper '{lib_path.name}' not found at {lib_path}"
|
|
)
|
|
|
|
lib = ctypes.CDLL(str(lib_path))
|
|
lib.execute_baloo_query.argtypes = [ctypes.c_char_p]
|
|
lib.execute_baloo_query.restype = ctypes.c_char_p
|
|
lib.get_file_properties.argtypes = [ctypes.c_char_p]
|
|
lib.get_file_properties.restype = ctypes.c_char_p
|
|
|
|
return lib
|
|
|
|
def check_keywords(
|
|
self, text: str, query: str, file_path: str = "", file_id: int = 0
|
|
) -> bool:
|
|
"""
|
|
Evaluates if a text meets a logical query.
|
|
Supports: AND, OR, ( ), dimensions (width=height, etc.), and shapes.
|
|
"""
|
|
if file_path:
|
|
try:
|
|
w, h = get_resolution(file_id)
|
|
except Exception:
|
|
w, h = -1, -1
|
|
|
|
def replace_dim(match: re.Match) -> str:
|
|
if w <= 0 or h <= 0:
|
|
return "__false__"
|
|
|
|
s = match.group(0).upper()
|
|
if "PORTRAIT" in s:
|
|
return "__true__" if w < h else "__false__"
|
|
if "LANDSCAPE" in s:
|
|
return "__true__" if w > h else "__false__"
|
|
if "SQUARE" in s:
|
|
return "__true__" if w == h else "__false__"
|
|
|
|
op = match.group(1)
|
|
ops_map = {
|
|
"=": w == h,
|
|
">": w > h,
|
|
"<": w < h,
|
|
">=": w >= h,
|
|
"<=": w <= h,
|
|
"!=": w != h,
|
|
}
|
|
return "__true__" if ops_map.get(op, False) else "__false__"
|
|
|
|
query = re.sub(
|
|
r"\b(PORTRAIT|LANDSCAPE|SQUARE)\b",
|
|
replace_dim,
|
|
query,
|
|
flags=re.IGNORECASE,
|
|
)
|
|
query = re.sub(
|
|
r"\bwidth\s*(<=|>=|!=|<|>|=)\s*height\b",
|
|
replace_dim,
|
|
query,
|
|
flags=re.IGNORECASE,
|
|
)
|
|
|
|
text = text.lower()
|
|
query = re.sub(r"(?<=\w)\s+(?=\w)", " AND ", query)
|
|
|
|
tokens = re.findall(r"\(|\)|OR|AND|[^\s()]+", query)
|
|
regex_parts = []
|
|
|
|
for t in tokens:
|
|
if t in ("(", ")"):
|
|
regex_parts.append(t)
|
|
elif t == "OR":
|
|
regex_parts.append("|")
|
|
elif t == "AND":
|
|
continue
|
|
elif t == "__true__":
|
|
regex_parts.append("(?=.*)")
|
|
elif t == "__false__":
|
|
regex_parts.append("(?!)")
|
|
else:
|
|
regex_parts.append(rf"(?=.*{re.escape(t)})")
|
|
|
|
final_regex = "".join(regex_parts)
|
|
|
|
try:
|
|
return bool(re.search(f"^{final_regex}.*", text, re.DOTALL))
|
|
except re.error:
|
|
return False
|
|
|
|
def get_baloo_info(self, file_path: str) -> Dict[str, str]:
|
|
"""Retrieves properties for a specific file from Baloo."""
|
|
result = self.baloo_lib.get_file_properties(file_path.encode("utf-8"))
|
|
if not result:
|
|
return {}
|
|
|
|
data_raw = result.decode("utf-8")
|
|
properties = {}
|
|
for entry in data_raw.split("|"):
|
|
if ":" in entry:
|
|
k, v = entry.split(":", 1)
|
|
properties[k] = v
|
|
|
|
return properties
|
|
|
|
def _execute_query(self, options: Dict[str, Any]) -> list:
|
|
"""Helper method to execute the query against the C wrapper."""
|
|
query_json = json.dumps(options).encode("utf-8")
|
|
result_ptr = self.baloo_lib.execute_baloo_query(query_json)
|
|
|
|
if not result_ptr:
|
|
return []
|
|
|
|
try:
|
|
raw_results = result_ptr.decode("utf-8")
|
|
return json.loads(raw_results)
|
|
except json.JSONDecodeError as e:
|
|
print(f"JSON decode error from Baloo wrapper: {e}")
|
|
return []
|
|
|
|
def search_recursive(
|
|
self,
|
|
query_text: str,
|
|
options: Dict[str, Any],
|
|
search_opts: Dict[str, Any],
|
|
files_count: int,
|
|
) -> Iterator[Dict[str, Any]]:
|
|
"""Executes a recursive search yielded item by item."""
|
|
options["query"] = query_text
|
|
files = self._execute_query(options)
|
|
|
|
for item in files:
|
|
if search_opts.get("limit", 0) <= 0:
|
|
break
|
|
|
|
file_id = int(item["id"], 16)
|
|
if file_id in self.ids_processed:
|
|
continue
|
|
|
|
self.ids_processed.add(file_id)
|
|
rec_exclude = search_opts.get("recursive_exclude")
|
|
|
|
if not rec_exclude or not self.check_keywords(
|
|
item["path"], rec_exclude, item["path"], file_id
|
|
):
|
|
if files_count >= search_opts.get("offset", 0):
|
|
search_opts["limit"] -= 1
|
|
yield item
|
|
|
|
files_count += 1
|
|
|
|
def search(
|
|
self,
|
|
query_text: str,
|
|
main_options: Dict[str, Any],
|
|
search_opts: Dict[str, Any],
|
|
) -> Iterator[Dict[str, Any]]:
|
|
"""
|
|
Main search generator. Yields file dictionaries.
|
|
"""
|
|
main_options["query"] = parse_date(query_text)
|
|
files = self._execute_query(main_options)
|
|
|
|
if not files:
|
|
return
|
|
|
|
is_recursive = search_opts.get("recursive") is not None
|
|
if is_recursive:
|
|
if search_opts.get("type"):
|
|
main_options["type"] = search_opts["type"]
|
|
elif "type" in main_options:
|
|
main_options.pop("type")
|
|
|
|
rec_query = search_opts.get("recursive")
|
|
query_text = parse_date(rec_query) if rec_query else ""
|
|
|
|
files_count = 0
|
|
for item in files:
|
|
if search_opts.get("limit", 0) <= 0:
|
|
break
|
|
|
|
file_id = int(item["id"], 16)
|
|
if file_id in self.ids_processed:
|
|
continue
|
|
|
|
self.ids_processed.add(file_id)
|
|
exclude_pattern = search_opts.get("exclude")
|
|
|
|
if not exclude_pattern or not self.check_keywords(
|
|
item["path"], exclude_pattern, item["path"], file_id
|
|
):
|
|
if is_recursive:
|
|
main_options["directory"] = item["path"]
|
|
yield from self.search_recursive(
|
|
query_text, main_options, search_opts, files_count
|
|
)
|
|
else:
|
|
yield item
|
|
files_count += 1
|
|
|
|
def reset_state(self) -> None:
|
|
"""Clears the processed IDs to allow for fresh consecutive searches."""
|
|
self.ids_processed.clear()
|
|
|
|
|
|
# from bagheera_search_lib import BagheeraSearcher
|
|
#
|
|
# def main():
|
|
# # ... tu lógica de argparse existente ...
|
|
#
|
|
# try:
|
|
# # Inicializamos la librería
|
|
# searcher = BagheeraSearcher()
|
|
#
|
|
# # Consumimos el generador
|
|
# for file_info in searcher.search(query_text, main_options, other_options):
|
|
# output = file_info['path']
|
|
# if other_options.get('konsole'):
|
|
# output = f"file:/'{output}'"
|
|
# if other_options.get('id'):
|
|
# output += f" [ID: {file_info['id']}]"
|
|
#
|
|
# print(output)
|
|
#
|
|
# except FileNotFoundError as e:
|
|
# print(e)
|
|
# sys.exit(1)
|
|
#
|
|
|
|
|
|
# if __name__ == "__main__":
|
|
# try:
|
|
# # Inicializamos la librería
|
|
# searcher = BagheeraSearcher()
|
|
|
|
# # Consumimos el generador
|
|
# for file_info in searcher.search(query_text, main_options, other_options):
|
|
# output = file_info['path']
|
|
# if other_options.get('konsole'):
|
|
# output = f"file:/'{output}'"
|
|
# if other_options.get('id'):
|
|
# output += f" [ID: {file_info['id']}]"
|
|
|
|
# print(output)
|
|
|
|
# except FileNotFoundError as e:
|
|
# print(e)
|
|
# sys.exit(1)
|