Source code for colrev.env.local_index

#! /usr/bin/env python
"""Indexing and retrieving records locally."""
from __future__ import annotations

import sqlite3
import typing
from copy import deepcopy
from multiprocessing import Lock
from pathlib import Path

import git
from git.exc import GitCommandError

import colrev.env.environment_manager
import colrev.env.local_index_sqlite
import colrev.env.resources
import colrev.env.tei_parser
import colrev.env.utils
import colrev.exceptions as colrev_exceptions
import colrev.loader.load_utils
import colrev.ops.check
import colrev.record.record
from colrev.constants import ENTRYTYPES
from colrev.constants import Fields
from colrev.env.local_index_prep import prepare_record_for_return


[docs]class LocalIndex: """The LocalIndex implements indexing and retrieval of records across projects""" def __init__( self, *, index_tei: bool = False, verbose_mode: bool = False, ) -> None: self.verbose_mode = verbose_mode self.environment_manager = colrev.env.environment_manager.EnvironmentManager() self._index_tei = index_tei self.thread_lock = Lock()
[docs] def get_journal_rankings(self, journal: str) -> list: """Get the journal rankings from the sqlite database""" sqlite_index_ranking = colrev.env.local_index_sqlite.SQLiteIndexRankings() return sqlite_index_ranking.select(journal=journal)
def _retrieve_based_on_colrev_id( self, cids_to_retrieve: list ) -> colrev.record.record.Record: sqlite_index_record = colrev.env.local_index_sqlite.SQLiteIndexRecord() for cid_to_retrieve in cids_to_retrieve: try: retrieved_record = sqlite_index_record.get( key=Fields.COLREV_ID, value=cid_to_retrieve ) return colrev.record.record.Record(retrieved_record) except colrev_exceptions.RecordNotInIndexException: continue # continue with the next cid_to_retrieve finally: sqlite_index_record.connection.close() raise colrev_exceptions.RecordNotInIndexException(cids_to_retrieve[0]) def _retrieve_from_github_curation( self, record_dict: dict ) -> colrev.record.record.Record: # pragma: no cover ret = {} try: gh_url, record_id = record_dict[Fields.CURATION_ID].split("#") temp_path = Path.home().joinpath("colrev").joinpath("test") temp_path.mkdir(exist_ok=True, parents=True) target_path = Path(temp_path) / Path(gh_url.split("/")[-1]) if not target_path.is_dir(): git.Repo.clone_from( gh_url, # .replace("https://github.com/", "git@github.com:") + ".git", str(target_path), depth=1, ) ret = colrev.loader.load_utils.load( filename=target_path / Path("data/records.bib"), ) except GitCommandError: pass if record_id not in ret: raise colrev_exceptions.RecordNotInIndexException(record_dict[Fields.ID]) ret[record_id][Fields.CURATION_ID] = record_dict[Fields.CURATION_ID] return colrev.record.record.Record(ret[record_id]) def _retrieve_from_record_index( self, record_dict: dict ) -> colrev.record.record.Record: record = colrev.record.record.Record(record_dict) cids_to_retrieve = [record.get_colrev_id()] retrieved_record = self._retrieve_based_on_colrev_id(cids_to_retrieve) if retrieved_record.data[Fields.ENTRYTYPE] != record.data[Fields.ENTRYTYPE]: if record_dict.get(Fields.CURATION_ID, "NA").startswith( "https://github.com/" ): return self._retrieve_from_github_curation(record_dict=record_dict) raise colrev_exceptions.RecordNotInIndexException(record_dict[Fields.ID]) return retrieved_record
[docs] def search(self, query: str) -> list[colrev.record.record.Record]: """Run a search for records""" try: self.thread_lock.acquire(timeout=60) sqlite_index_record = colrev.env.local_index_sqlite.SQLiteIndexRecord() records_to_return = [] for record_dict in sqlite_index_record.search(query=query): record = prepare_record_for_return(record_dict, include_file=False) records_to_return.append(record) except sqlite3.OperationalError as exc: # pragma: no cover print(exc) finally: sqlite_index_record.connection.close() self.thread_lock.release() return records_to_return
[docs] def get_year_from_toc(self, record_dict: dict) -> str: """Determine the year of a paper based on its table-of-content (journal-volume-number)""" try: sqlite_index_toc = colrev.env.local_index_sqlite.SQLiteIndexTOC() toc_key = colrev.record.record.Record(record_dict).get_toc_key() toc_items = [] if self._toc_exists(toc_key): toc_items = sqlite_index_toc.get_toc_items(toc_key=toc_key) if not toc_items: raise colrev_exceptions.TOCNotAvailableException() toc_records_colrev_id = toc_items[0] sqlite_index_record = colrev.env.local_index_sqlite.SQLiteIndexRecord() record_dict = sqlite_index_record.get( key=Fields.COLREV_ID, value=toc_records_colrev_id ) year = record_dict.get(Fields.YEAR, "NA") return year except ( colrev_exceptions.NotEnoughDataToIdentifyException, colrev_exceptions.NotTOCIdentifiableException, colrev_exceptions.RecordNotInIndexException, ) as exc: raise colrev_exceptions.TOCNotAvailableException() from exc finally: sqlite_index_toc.connection.close()
def _toc_exists(self, toc_item: str) -> bool: try: self.thread_lock.acquire(timeout=60) sqlite_index_toc = colrev.env.local_index_sqlite.SQLiteIndexTOC() return sqlite_index_toc.exists(toc_item) except sqlite3.OperationalError: # pragma: no cover pass # return False except AttributeError: # pragma: no cover # ie. no sqlite database available pass # return False finally: if sqlite_index_toc is not None: # Check if it was initialized sqlite_index_toc.connection.close() self.thread_lock.release() return False def _get_toc_items(self, toc_key: str, *, search_across_tocs: bool) -> list: sqlite_index_toc = colrev.env.local_index_sqlite.SQLiteIndexTOC() toc_items = [] if self._toc_exists(toc_key): toc_items = sqlite_index_toc.get_toc_items(toc_key=toc_key) else: if not search_across_tocs: sqlite_index_toc.connection.close() raise colrev_exceptions.RecordNotInIndexException(toc_key) if not toc_items and search_across_tocs: try: partial_toc_key = toc_key.rsplit("|", 1)[0] toc_items = sqlite_index_toc.get_toc_items( partial_toc_key=partial_toc_key ) sqlite_index_toc.connection.close() except ( colrev_exceptions.NotTOCIdentifiableException, KeyError, ) as exc: raise colrev_exceptions.RecordNotInIndexException( partial_toc_key ) from exc if not toc_items: raise colrev_exceptions.RecordNotInIndexException(toc_key) return toc_items
[docs] def retrieve_from_toc( self, record: colrev.record.record.Record, *, include_file: bool = False, search_across_tocs: bool = False, ) -> colrev.record.record.Record: """Retrieve a record from the toc (table-of-contents)""" # Note: in NotTOCIdentifiableException cases, we still need a toc_key. # to accomplish this, the get_toc_key() may acced an "accept_incomplete" flag try: toc_key = record.get_toc_key() except colrev_exceptions.NotTOCIdentifiableException as exc: raise colrev_exceptions.RecordNotInIndexException( record.data[Fields.ID] ) from exc toc_items = self._get_toc_items(toc_key, search_across_tocs=search_across_tocs) # SQLiteIndexRecord() must be after _get_toc_items(), which also uses the sqlite file sqlite_index_record = colrev.env.local_index_sqlite.SQLiteIndexRecord() try: for toc_records_colrev_id in toc_items: record_dict = sqlite_index_record.get( key=Fields.COLREV_ID, value=toc_records_colrev_id ) if not colrev.record.record_similarity.matches( record, colrev.record.record.Record(record_dict) ): continue return prepare_record_for_return(record_dict, include_file=include_file) raise colrev_exceptions.RecordNotInTOCException( record_id=record.data[Fields.ID], toc_key=toc_key ) except ( colrev_exceptions.NotEnoughDataToIdentifyException, colrev_exceptions.NotTOCIdentifiableException, ): pass sqlite_index_record.connection.close() raise colrev_exceptions.RecordNotInIndexException(record.data[Fields.ID])
[docs] def retrieve_based_on_colrev_pdf_id( self, *, colrev_pdf_id: str ) -> colrev.record.record.Record: """ Convenience function to retrieve the indexed record_dict metadata based on a colrev_pdf_id """ try: sqlite_index_record = colrev.env.local_index_sqlite.SQLiteIndexRecord() record_dict = sqlite_index_record.get( key=Fields.PDF_ID, value=colrev_pdf_id ) record_to_import = prepare_record_for_return(record_dict, include_file=True) record_to_import.data.pop(Fields.FILE, None) finally: sqlite_index_record.connection.close() return record_to_import
[docs] def retrieve( self, record_dict: dict, *, include_file: bool = False, include_colrev_ids: bool = False, ) -> colrev.record.record.Record: """ Convenience function to retrieve the indexed record_dict metadata based on another record_dict """ # To avoid modifications to the original record record_dict = deepcopy(record_dict) # 1. Try the record index try: retrieved_record = self._retrieve_from_record_index(record_dict) retrieved_record_dict = retrieved_record.data except ( colrev_exceptions.RecordNotInIndexException, colrev_exceptions.NotEnoughDataToIdentifyException, ) as exc: # 2. Try using global-ids retrieved_record_dict = {} remove_colrev_id = False if Fields.COLREV_ID not in record_dict: try: record_dict[Fields.COLREV_ID] = colrev.record.record.Record( record_dict ).get_colrev_id() remove_colrev_id = True except colrev_exceptions.NotEnoughDataToIdentifyException: pass for key, value in record_dict.items(): if ( key not in colrev.env.local_index_sqlite.SQLiteIndexRecord.GLOBAL_KEYS or Fields.ID == key ): continue try: sqlite_index_record = ( colrev.env.local_index_sqlite.SQLiteIndexRecord() ) retrieved_record_dict = sqlite_index_record.get( key=key, value=value ) finally: sqlite_index_record.connection.close() if key in retrieved_record_dict: if retrieved_record_dict[key] == value: break retrieved_record_dict = {} if remove_colrev_id: del record_dict[Fields.COLREV_ID] if not retrieved_record_dict: raise colrev_exceptions.RecordNotInIndexException( record_dict.get(Fields.ID, "no-key") ) from exc return prepare_record_for_return( retrieved_record_dict, include_file=include_file, include_colrev_ids=include_colrev_ids, )
[docs] def get_fields_to_remove(self, record_dict: dict) -> list: """Compares the record to available toc items and returns fields to remove (if any), such as the volume or number.""" # pylint: disable=too-many-return-statements fields_to_remove: typing.List[str] = [] if ( Fields.JOURNAL not in record_dict and record_dict[Fields.ENTRYTYPE] != ENTRYTYPES.ARTICLE ): return fields_to_remove internal_record_dict = deepcopy(record_dict) if all( x in internal_record_dict.keys() for x in [Fields.VOLUME, Fields.NUMBER] ): try: toc_key_full = colrev.record.record.Record( internal_record_dict ).get_toc_key() if self._toc_exists(toc_key_full): return fields_to_remove except colrev_exceptions.NotTOCIdentifiableException: return fields_to_remove wo_nr = deepcopy(internal_record_dict) del wo_nr[Fields.NUMBER] toc_key_wo_nr = colrev.record.record.Record(wo_nr).get_toc_key() if toc_key_wo_nr != "NA": if self._toc_exists(toc_key_wo_nr): fields_to_remove.append(Fields.NUMBER) return fields_to_remove wo_vol = deepcopy(internal_record_dict) del wo_vol[Fields.VOLUME] toc_key_wo_vol = colrev.record.record.Record(wo_vol).get_toc_key() if toc_key_wo_vol != "NA": if self._toc_exists(toc_key_wo_vol): fields_to_remove.append(Fields.VOLUME) return fields_to_remove wo_vol_nr = deepcopy(internal_record_dict) del wo_vol_nr[Fields.VOLUME] del wo_vol_nr[Fields.NUMBER] toc_key_wo_vol_nr = colrev.record.record.Record(wo_vol_nr).get_toc_key() if toc_key_wo_vol_nr != "NA": if self._toc_exists(toc_key_wo_vol_nr): fields_to_remove.append(Fields.NUMBER) fields_to_remove.append(Fields.VOLUME) return fields_to_remove return fields_to_remove