Source code for marqo_instantapi.marqo_instantapi_adapter

import marqo
import tldextract
from collections import deque
import hashlib
import concurrent.futures
from marqo_instantapi.instant_api_client import InstantAPIClient

from typing import Optional, Union, Literal, Any


[docs] class InstantAPIMarqoAdapter: """A class for interfacing with Marqo and InstantAPI.""" def __init__( self, marqo_url: str = "http://localhost:8882", marqo_api_key: Optional[str] = None, instantapi_key: Optional[str] = None, ): self.mq = marqo.Client(url=marqo_url, api_key=marqo_api_key) self.instant_api = InstantAPIClient(api_key=instantapi_key) self.combination_field = "combination" self.default_marqo_settings_dict = { "type": "unstructured", "vectorNumericType": "float", "treatUrlsAndPointersAsImages": True, "model": "open_clip/ViT-L-14/laion2b_s32b_b82k", "normalizeEmbeddings": True, "textPreprocessing": { "splitLength": 3, "splitOverlap": 1, "splitMethod": "sentence", }, "annParameters": { "spaceType": "prenormalized-angular", "parameters": {"efConstruction": 512, "m": 16}, }, "filterStringMaxLength": 50, } self.default_text_model = "hf/snowflake-arctic-embed-m-v1.5" self.default_image_model = "open_clip/ViT-B-16-SigLIP-512/webli"
[docs] def create_index( self, index_name: str, multimodal: bool = False, model: Optional[str] = None, skip_if_exists: bool = False, ) -> dict: """Simplified method for creating a Marqo index, recommended when fine grained control is not needed. Args: index_name (str): The name of the index to create. multimodal (bool, optional): Toggles image downloading on or off, if model is not provided then also influences model selection. Defaults to False. model (Optional[str], optional): Optionally specify a specific model. Defaults to None. skip_if_exists (bool, optional): Skip index creation if the index already exists, does not check if the index conforms to the provided parameters. Defaults to False. Returns: dict: index creation response Examples: >>> marqo_adapter = InstantAPIMarqoAdapter() >>> marqo_adapter.create_index("my-index") """ settings = {**self.default_marqo_settings_dict} if model is None: if multimodal: settings["model"] = self.default_image_model else: settings["model"] = self.default_text_model settings["treatUrlsAndPointersAsImages"] = multimodal if self._check_index_exists(index_name) and skip_if_exists: self.mq.index(index_name).search(q="") return { "acknowledged": True, "index": index_name, "message": "Index already exists, skipping creation.", } response = self.mq.create_index(index_name, settings_dict=settings) self.mq.index(index_name).search(q="") return response
[docs] def delete_index( self, index_name: str, confirm: bool = False, skip_if_not_exists: bool = False ) -> dict: """Delete a Marqo index. Args: index_name (str): The name of the index to delete. confirm (bool, optional): Automatically confirms the deletion. Defaults to False. skip_if_not_exists (bool, optional): Skip deletion if the index does not exist. Defaults to False. Returns: dict: The deletion response. """ if not confirm: choice = None while choice not in ("y", "n"): choice = input( f"Are you sure you want to delete the index '{index_name}'? (y/n): " ) if choice == "n": return {"message": "Deletion cancelled."} if not self._check_index_exists(index_name) and skip_if_not_exists: return { "acknowledged": True, "index": index_name, "message": "Index does not exist, skipping deletion.", } response = self.mq.index(index_name).delete() return response
def _extract_page_data( self, webpage_url: str, api_method_name: str, api_response_structure: dict ): """ Extract structured page data from a webpage using the InstantAPI Retrieve API. Args: webpage_url (str): The URL of the webpage to extract. api_method_name (str): The name of the API method to use for data extraction. api_response_structure (dict): The expected structure of the API's response. Returns: dict: The structured data extracted from the webpage, or an error message. """ response = self.instant_api.retrieve( webpage_url=webpage_url, api_method_name=api_method_name, api_response_structure=api_response_structure, ) return response def _make_mappings( self, text_fields_to_index: list[str], image_fields_to_index: list[str], total_image_weight: float, total_text_weight: float, ) -> tuple[Union[dict, None], list]: """Automatically make a multimodal combination field mapping based on the text and image fields to index. Args: text_fields_to_index (list[str]): The text fields to index. image_fields_to_index (list[str]): The image fields to index. total_image_weight (float): The total weight for images. total_text_weight (float): The total weight for text. Returns: Union[Union[dict, None], list]: A mappings object for Marqo """ if not text_fields_to_index: return None, image_fields_to_index if not image_fields_to_index: return None, text_fields_to_index if total_image_weight == 0: image_fields_to_index = [] if total_text_weight == 0: text_fields_to_index = [] text_weight = ( total_text_weight / len(text_fields_to_index) if text_fields_to_index else 0 ) image_weight = ( total_image_weight / len(image_fields_to_index) if image_fields_to_index else 0 ) weights = {} for field in text_fields_to_index: weights[field] = text_weight for field in image_fields_to_index: weights[field] = image_weight mappings = { self.combination_field: { "type": "multimodal_combination", "weights": weights, } } return mappings, [self.combination_field] def _check_schema_for_marqo(self, schema: dict) -> None: """Check if a schema conforms to Marqo's requirements. Schemas must be flat documents. Args: schema (dict): The schema to check. Raises: ValueError: If the schema does not conform to Marqo's requirements. """ for k in schema: if not isinstance(schema[k], str): raise ValueError( "All schema values must be strings. Marqo only accepts flat documents so you cannot nest JSON." ) def _check_against_schema( self, schema: Union[dict, list, Any], response: Union[dict, list, Any] ) -> bool: """Check if a response conforms to a schema. Args: schema (Union[dict, list, Any]): The schema to check against. response (Union[dict, list, Any]): The response to check. Returns: bool: True if the response conforms to the schema, False otherwise. """ if isinstance(schema, dict): if not isinstance(response, dict): return False for key in schema: if key not in response or not self._check_against_schema( schema[key], response[key] ): return False if len(schema) != len(response): return False return True elif isinstance(schema, list): if not isinstance(response, list) or len(response) != len(schema): return False return all( self._check_against_schema(s, r) for s, r in zip(schema, response) ) return True def _check_index_exists(self, index_name: str) -> bool: """Check if a Marqo index exists. Args: index_name (str): The name of the index to check. Returns: bool: True if the index exists, False otherwise. """ response = self.mq.get_indexes() return index_name in [index["indexName"] for index in response["results"]] def _check_index_can_use_images(self, index_name: str) -> bool: """Check if a Marqo index can use images. Args: index_name (str): The name of the index to check. Returns: bool: True if the index can use images, False otherwise. """ response = self.mq.index(index_name).get_settings() return response["treatUrlsAndPointersAsImages"] def _create_index_from_fields( self, index_name: str, text_fields_to_index: list[str] = [], image_fields_to_index: list[str] = [], ) -> dict: """Create a Marqo index based on the fields to index. Args: index_name (str): The name of the index to create. text_fields_to_index (list[str], optional): A list of text fields for indexing. Defaults to []. image_fields_to_index (list[str], optional): A list of image fields for indexing. Defaults to []. Raises: ValueError: If no fields are provided for indexing. Returns: dict: The index creation response. """ if not text_fields_to_index and not image_fields_to_index: raise ValueError( "At least one field must be specified in text_fields_to_index and/or image_fields_to_index." ) if text_fields_to_index and not image_fields_to_index: return self.create_index(index_name, multimodal=False) return self.create_index(index_name, multimodal=True) def _process_page( self, webpage_url: str, api_method_name: str, api_response_structure: dict, enforce_schema: bool = True, ) -> dict: """Process a single page and extract data from it. Args: webpage_url (str): The URL of the webpage to process. api_method_name (str): The name of the API method to use for data extraction. api_response_structure (dict): The expected structure of the API's response. enforce_schema (bool, optional): Toggle strict enforcement of InstantAPI responses against the schema. Defaults to True. Returns: dict: The processed page data. """ page_data = self._extract_page_data( webpage_url=webpage_url, api_method_name=api_method_name, api_response_structure=api_response_structure, ) if enforce_schema: if not self._check_against_schema(api_response_structure, page_data): return { "failed_check": True, "data": { "url": webpage_url, "response_data": page_data, "response": "Schema check failed", }, } page_data["_id"] = hashlib.md5(webpage_url.encode()).hexdigest() page_data["_source_webpage_url"] = webpage_url return {"failed_check": False, "data": (webpage_url, page_data)}
[docs] def add_documents( self, webpage_urls: list[str], index_name: str, api_response_structure: dict, api_method_name: str, text_fields_to_index: list[str] = [], image_fields_to_index: list[str] = [], client_batch_size: int = 8, total_image_weight: float = 0.9, total_text_weight: float = 0.1, enforce_schema: bool = True, instantapi_threads: int = 10, ) -> list[dict]: """Add documents to a Marqo index from a list of webpage URLs, data is extracted using the InstantAPI Retrieve API. Args: webpage_urls (list[str]): A list of webpage URLs to index. index_name (str): The name of the index to add documents to. api_response_structure (dict): The expected structure of the API's response. api_method_name (str): The name of the API method to use for data extraction. text_fields_to_index (list[str], optional): A list of text fields for indexing. Defaults to []. image_fields_to_index (list[str], optional): A list of image fields for indexing. Defaults to []. client_batch_size (int, optional): The client batch size for Marqo. Defaults to 8. total_image_weight (float, optional): The total weight for images. Defaults to 0.9. total_text_weight (float, optional): The total weight for text. Defaults to 0.1. enforce_schema (bool, optional): Toggle strict enforcement of InstantAPI responses against the schema. Defaults to True. instantapi_threads (int, optional): The number of threads to use for InstantAPI requests. Defaults to 10. Raises: ValueError: If no fields are provided for indexing. Returns: list[dict]: A list of responses for each document added. """ if not text_fields_to_index and not image_fields_to_index: raise ValueError( "At least one field must be specified in text_fields_to_index and/or image_fields_to_index." ) if not self._check_index_exists(index_name): self._create_index_from_fields( index_name, text_fields_to_index, image_fields_to_index ) if image_fields_to_index and not self._check_index_can_use_images(index_name): raise ValueError( "The index does not support images, please recreate the index create_index(index_name, multimodal=True)" ) if not image_fields_to_index and self._check_index_can_use_images(index_name): raise ValueError( "The index supports images, please either provide image fields to index or recreate the index create_index(index_name, multimodal=False)" ) self._check_schema_for_marqo(api_response_structure) # Parallel processing with ThreadPoolExecutor failed_schema_checks = [] urls_to_index = [] documents = [] with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: results = executor.map( lambda url: self._process_page( url, api_method_name, api_response_structure, enforce_schema ), webpage_urls, ) # Process the results and separate failed schema checks for result in results: if result["failed_check"]: failed_schema_checks.append(result["data"]) else: webpage_url, page_data = result["data"] urls_to_index.append(webpage_url) documents.append(page_data) # Prepare the mappings and tensor fields mappings, tensor_fields = self._make_mappings( text_fields_to_index, image_fields_to_index, total_image_weight, total_text_weight, ) # Add the documents to Marqo responses = self.mq.index(index_name).add_documents( documents=documents, tensor_fields=tensor_fields, mappings=mappings, client_batch_size=client_batch_size, ) if not isinstance(responses, list): responses = [responses] # Collate all responses outcomes = failed_schema_checks for response in responses: for item in response["items"]: outcomes.append({"url_md5": item["_id"], "response": item}) return outcomes
def _get_root_domain(self, url: str) -> str: """Get the root domain of a URL. Args: url (str): The URL to extract the domain from. Returns: str: The subdomain, domain, and suffix of the URL combined. """ extracted = tldextract.extract(url) domain = f"{extracted.subdomain}.{extracted.domain}.{extracted.suffix}" return domain
[docs] def crawl( self, initial_webpage_urls: list[str], allowed_domains: set[str], index_name: str, api_response_structure: dict, text_fields_to_index: list[str] = [], image_fields_to_index: list[str] = [], client_batch_size: int = 8, total_image_weight: float = 0.9, total_text_weight: float = 0.1, enforce_schema: bool = True, max_pages: Optional[int] = None, ) -> list[dict]: """Crawl a set of webpages and add them to a Marqo index. Args: initial_webpage_urls (list[str]): A list of initial webpage URLs to start the crawl from. allowed_domains (set[str]): A set of domains to exclude from the crawl. index_name (str): The name of the index to add documents to. If the index does not exist, it will be created based on the fields to index. api_response_structure (dict): The expected structure of the API's response, this is passed to InstantAPI. text_fields_to_index (list[str], optional): A list of text fields for indexing. Defaults to []. image_fields_to_index (list[str], optional): A list of image fields for indexing. Defaults to []. client_batch_size (int, optional): The client batch size for Marqo, controls how many docs are sent at a time. Defaults to 8. total_image_weight (float, optional): The total weight for images, applies when both image and text fields are provided. Defaults to 0.9. total_text_weight (float, optional): The total weight for text, applies when both image and text fields are provided. Defaults to 0.1. enforce_schema (bool, optional): Toggle strict enforcement of InstantAPI responses against the schema. Defaults to True. max_pages (Optional[int], optional): The maximum number of pages to crawl. Defaults to None. Raises: ValueError: If no fields are provided for indexing. Returns: list[dict]: A list of responses for each document added. """ if not text_fields_to_index and not image_fields_to_index: raise ValueError( "At least one field must be specified in text_fields_to_index and/or image_fields_to_index." ) q = deque(initial_webpage_urls) pages = 0 responses = [] visited = set() while q: pages += 1 if max_pages and pages > max_pages: break webpage_url = q.popleft() if webpage_url in visited: continue visited.add(webpage_url) if self._get_root_domain(webpage_url) in allowed_domains: continue response = self.add_documents( [webpage_url], index_name, api_response_structure, text_fields_to_index, image_fields_to_index, client_batch_size, total_image_weight, total_text_weight, enforce_schema, ) responses += response next_pages = self.instant_api.next_pages(webpage_url) for next_page in next_pages["webpage_urls"]: q.append(next_page) return responses
[docs] def search( self, q: str, index_name: str, limit: int = 10, offset: int = 0, searchable_attributes: Optional[list] = None, search_method: Literal["tensor", "lexical", "hybrid"] = "hybrid", ) -> dict: """Search a Marqo index via a simplified interface. Args: q (str): The query string to search for. index_name (str): The name of the index to search. limit (int, optional): The number of results to retrieve. Defaults to 10. offset (int, optional): The offset for the search results. Defaults to 0. searchable_attributes (Optional[list], optional): The attributes to search. Defaults to None. search_method (Literal["tensor", "lexical", "hybrid"], optional): The search method to use, tensor uses only vectors, lexical uses only text, hybrid combines both with RRF. Defaults to "hybrid". Raises: ValueError: If an invalid search method is provided. Returns: dict: The search response from Marqo. """ if search_method not in ("tensor", "lexical", "hybrid"): raise ValueError( "Invalid search method, must be one of 'tensor', 'lexical', or 'hybrid'." ) ef_search = None if limit + offset > 2000: ef_search = limit + offset if searchable_attributes is not None: # TODO: Update when marqo reaches 2.12 raise NotImplementedError( "Search with unstructured indexes does not support searchable attributes yet. This will be implemented in Marqo 2.12." ) hybrid_parameters = None if search_method == "hybrid" and searchable_attributes is not None: hybrid_parameters = { "searchableAttributesLexical": searchable_attributes, "searchableAttributesTensor": searchable_attributes, } searchable_attributes = None response = self.mq.index(index_name).search( q, limit=limit, offset=offset, ef_search=ef_search, searchable_attributes=searchable_attributes, search_method=search_method, hybrid_parameters=hybrid_parameters, ) return response