Source code for colrev.ops.pdf_get

#! /usr/bin/env python
"""CoLRev pdf_get operation: Get PDF documents."""
from __future__ import annotations

import shutil
import typing
from glob import glob
from multiprocessing.pool import ThreadPool as Pool
from pathlib import Path

import colrev.env.tei_parser
import colrev.exceptions as colrev_exceptions
import colrev.process.operation
import colrev.record.record_pdf
from colrev import utils
from colrev.constants import Colors
from colrev.constants import EndpointType
from colrev.constants import Fields
from colrev.constants import OperationsType
from colrev.constants import PDFPathType
from colrev.constants import RecordState
from colrev.package_manager.package_manager import PackageManager
from colrev.writer.write_utils import write_file


[docs] class PDFGet(colrev.process.operation.Operation): """Get the PDFs""" to_retrieve: int retrieved: int not_retrieved: int type = OperationsType.pdf_get def __init__( self, *, review_manager: colrev.review_manager.ReviewManager, notify_state_transition_operation: bool = True, ) -> None: super().__init__( review_manager=review_manager, operations_type=self.type, notify_state_transition_operation=notify_state_transition_operation, ) self.package_manager = PackageManager() pdf_dir = self.review_manager.paths.pdf pdf_dir.mkdir(exist_ok=True, parents=True) self.pdf_qm = self.review_manager.get_pdf_qm() self.filepath_directory_pattern = "" pdf_endpoints = [ s for s in self.review_manager.settings.sources if s.platform == "colrev.files_dir" ] if pdf_endpoints: self.filepath_directory_pattern = ( pdf_endpoints[0].search_parameters["scope"].get("subdir_pattern", {}) )
[docs] def copy_pdfs_to_repo(self) -> None: """Copy the PDFs to the repository""" self.review_manager.logger.info("Copy PDFs to dir") records = self.review_manager.dataset.load_records_dict() for record_dict in records.values(): if Fields.FILE not in record_dict: continue fpath = Path(record_dict[Fields.FILE]) new_fpath = fpath.absolute() if fpath.is_symlink(): linked_file = fpath.resolve() if linked_file.is_file(): fpath.unlink() shutil.copyfile(linked_file, new_fpath) self.review_manager.logger.info( f" {Colors.GREEN}copied PDF for {record_dict[Fields.ID]} {Colors.END}" ) elif new_fpath.is_file() and self.review_manager.verbose_mode: self.review_manager.logger.info( f"No need to copy PDF - already exits ({record_dict[Fields.ID]})" )
[docs] def get_target_filepath(self, record: colrev.record.record.Record) -> Path: """Get the target filepath for a PDF""" target_filepath = self.review_manager.paths.PDF_DIR / Path( f"{record.data['ID']}.pdf" ) if self.filepath_directory_pattern == Fields.YEAR: target_filepath = self.review_manager.paths.PDF_DIR / Path( f"{record.data.get('year', 'no_year')}/{record.data['ID']}.pdf" ) elif self.filepath_directory_pattern == "volume_number": if Fields.VOLUME in record.data and Fields.NUMBER in record.data: target_filepath = self.review_manager.paths.PDF_DIR / Path( f"{record.data['volume']}/{record.data['number']}/{record.data['ID']}.pdf" ) if Fields.VOLUME in record.data and Fields.NUMBER not in record.data: target_filepath = self.review_manager.paths.PDF_DIR / Path( f"{record.data['volume']}/{record.data['ID']}.pdf" ) return target_filepath
[docs] def import_pdf(self, record: colrev.record.record.Record) -> None: """Import a file (PDF) and copy/symlink it""" # self.review_manager.pdf_dir.mkdir(exist_ok=True) # new_fp = self.review_manager.PDF_DIR_RELATIVE / Path(record.data[Fields.ID] + ".pdf").name new_fp = self.get_target_filepath(record) original_fp = Path(record.data[Fields.FILE]) if new_fp != original_fp and not new_fp.is_file(): new_fp.parents[0].mkdir(exist_ok=True, parents=True) if ( PDFPathType.symlink == self.review_manager.settings.pdf_get.pdf_path_type ): new_fp.symlink_to(original_fp) elif PDFPathType.copy == self.review_manager.settings.pdf_get.pdf_path_type: shutil.copyfile(original_fp, new_fp.resolve()) # Note : else: leave absolute paths record.data[Fields.FILE] = str(new_fp)
def _log_infos(self, record: colrev.record.record.Record) -> None: if Fields.FILE not in record.data: if ( not self.review_manager.settings.pdf_get.pdf_required_for_screen_and_synthesis ): return self.review_manager.logger.info( f" {Colors.ORANGE}{record.data['ID']}".ljust(46) + f"rev_prescreen_included → pdf_needs_manual_retrieval{Colors.END}" ) return if RecordState.pdf_prepared == record.data[Fields.STATUS]: self.review_manager.logger.info( f" {Colors.GREEN}{record.data['ID']}".ljust(46) + f"rev_prescreen_included → pdf_prepared{Colors.END}" ) elif RecordState.pdf_needs_manual_preparation == record.data[Fields.STATUS]: self.review_manager.logger.info( f" {Colors.ORANGE}{record.data['ID']}".ljust(46) + f"rev_prescreen_included → pdf_needs_manual_preparation{Colors.END}" ) # Note : no named arguments (multiprocessing)
[docs] def get_pdf(self, item: dict) -> dict: """Get PDFs (based on the package endpoints in the settings)""" record_dict = item["record"] if record_dict[Fields.STATUS] not in [ RecordState.rev_prescreen_included, RecordState.pdf_needs_manual_retrieval, ]: if Fields.FILE in record_dict: record = colrev.record.record_pdf.PDFRecord( record_dict, path=self.review_manager.path ) record.remove_field(key=Fields.FILE) return record.get_data() return record_dict record = colrev.record.record_pdf.PDFRecord( record_dict, path=self.review_manager.path ) for ( pdf_get_package_endpoint ) in self.review_manager.settings.pdf_get.pdf_get_package_endpoints: pdf_get_class = self.package_manager.get_package_endpoint_class( package_type=EndpointType.pdf_get, package_identifier=pdf_get_package_endpoint["endpoint"], ) endpoint = pdf_get_class( pdf_get_operation=self, settings=pdf_get_package_endpoint ) endpoint.get_pdf(record) # type: ignore if Fields.FILE in record.data: self.review_manager.report_logger.info( f"{endpoint.settings.endpoint}" # type: ignore f"({record.data[Fields.ID]}): retrieved .../" f"{Path(record.data[Fields.FILE]).name}" ) break if Fields.FILE in record.data: record.run_pdf_quality_model(self.pdf_qm, set_prepared=True) else: record.set_status(RecordState.pdf_needs_manual_retrieval) self._log_infos(record) return record.get_data()
def _fix_broken_symlinks(self) -> None: pdf_dir = self.review_manager.paths.pdf broken_symlinks = [] for pdf_candidate in list(pdf_dir.glob("**/*.pdf")): relative_path = pdf_candidate.relative_to(self.review_manager.path) if not relative_path.is_file(): if pdf_candidate.is_symlink(): broken_symlinks.append(pdf_candidate) continue if broken_symlinks: print("To fix broken symlinks:") old_path = input("Enter the old path: ") new_path = input("Enter the new path: ") for broken_symlink in broken_symlinks: new_file = str(broken_symlink.resolve()).replace(old_path, new_path) print(f"Fix {broken_symlink}") broken_symlink.unlink() broken_symlink.symlink_to(new_file) def _relink_pdfs_in_source( self, source: colrev.search_file.ExtendedSearchFile ) -> None: # pylint: disable=too-many-locals self.review_manager.logger.info( "Checking PDFs in same directory to reassign when " f"the cpid is identical {source.search_results_path}" ) pdf_dir = self.review_manager.paths.pdf pdf_candidates = {} for pdf_candidate in list(pdf_dir.glob("**/*.pdf")): colrev_pdf_id = colrev.record.record_pdf.PDFRecord.get_colrev_pdf_id( pdf_candidate ) relative_path = pdf_candidate.relative_to(self.review_manager.path) pdf_candidates[relative_path] = colrev_pdf_id source_records_dict = colrev.loader.load_utils.load( filename=source.search_results_path, logger=self.review_manager.logger, ) source_records = list(source_records_dict.values()) corresponding_origin = str(source.get_origin_prefix()) records = self.review_manager.dataset.load_records_dict() for record in records.values(): if Fields.FILE not in record: continue # Note: we check the source_records based on the cpids # in the record because cpids are not stored in the source_record # (pdf hashes may change after import/preparation) source_rec = {} if corresponding_origin != "": source_origin_l = [ o for o in record[Fields.ORIGIN] if corresponding_origin in o ] if len(source_origin_l) == 1: source_origin = source_origin_l[0] source_origin = source_origin.replace( f"{corresponding_origin}/", "" ) source_rec_l = [ s for s in source_records if s[Fields.ID] == source_origin ] if len(source_rec_l) == 1: source_rec = source_rec_l[0] if source_rec: if ( self.review_manager.path / Path(record[Fields.FILE]) ).is_file() and ( self.review_manager.path / Path(source_rec[Fields.FILE]) ).is_file(): continue else: if (self.review_manager.path / Path(record[Fields.FILE])).is_file(): continue self.review_manager.logger.info(record[Fields.ID]) for pdf_candidate, cpid in pdf_candidates.items(): if record.get("colrev_pdf_id", "") == cpid: record[Fields.FILE] = str(pdf_candidate) source_rec[Fields.FILE] = str(pdf_candidate) self.review_manager.logger.info( f"Found and linked PDF: {pdf_candidate}" ) break if len(source_records) > 0: source_records_dict = {r[Fields.ID]: r for r in source_records} write_file( records_dict=source_records_dict, filename=source.search_results_path ) self.review_manager.dataset.save_records_dict(records) self.review_manager.dataset.git_repo.add_changes(source.search_results_path)
[docs] def check_existing_unlinked_pdfs( self, records: dict, ) -> dict[str, dict[str, typing.Any]]: """Check for PDFs that are in the pdfs directory but not linked in the record file""" linked_pdfs = [ str(Path(x[Fields.FILE]).resolve()) for x in records.values() if Fields.FILE in x ] pdf_dir = self.review_manager.paths.pdf pdf_files = glob(str(pdf_dir) + "/**.pdf", recursive=True) unlinked_pdfs = [ Path(x) for x in pdf_files if str(Path(x).resolve()) not in linked_pdfs and not any(kw in x for kw in ["_with_lp.pdf", "_with_cp.pdf", "_ocr.pdf"]) ] if len(unlinked_pdfs) == 0: return records self.review_manager.logger.info("Check unlinked PDFs") for file in unlinked_pdfs: msg = f"Check unlinked PDF: {file.relative_to(self.review_manager.path)}" self.review_manager.logger.info(msg) if file.stem not in records.keys(): tei = colrev.env.tei_parser.TEIParser(pdf_path=file) pdf_record = tei.get_metadata() if "error" in pdf_record: continue max_similarity = 0.0 max_sim_record = None for record in records.values(): sim = colrev.record.record_pdf.PDFRecord.get_record_similarity( colrev.record.record_pdf.PDFRecord( pdf_record, path=self.review_manager.path ), colrev.record.record_pdf.PDFRecord( record.copy(), path=self.review_manager.path ), ) if sim > max_similarity: max_similarity = sim max_sim_record = record if max_sim_record: if max_similarity > 0.5: if RecordState.pdf_prepared == max_sim_record[Fields.STATUS]: continue record = colrev.record.record_pdf.PDFRecord( max_sim_record, path=self.review_manager.path ) record.update_field( key=Fields.FILE, value=str(file), source="linking-available-files", ) self.import_pdf(record) if ( RecordState.rev_prescreen_included == record.data[Fields.STATUS] ): record.set_status(RecordState.pdf_imported) self.review_manager.report_logger.info( "linked unlinked pdf:" f" {file.name}" ) self.review_manager.logger.info( "linked unlinked pdf:" f" {file.name}" ) # max_sim_record = \ # pdf_prep.validate_pdf_metadata(max_sim_record) # colrev_status = max_sim_record['colrev_status'] # if RecordState.pdf_needs_manual_preparation == colrev_status: # # revert? else: record = records[file.stem] self.link_pdf( colrev.record.record_pdf.PDFRecord( record, path=self.review_manager.path ) ) self.review_manager.dataset.save_records_dict(records) return records
def _rename_pdf( self, *, record_dict: dict, file: Path, new_filename: Path, pdfs_search_file: Path, ) -> None: record_dict[Fields.FILE] = new_filename if pdfs_search_file.is_file(): colrev.env.utils.inplace_change( filename=pdfs_search_file, old_string="{" + str(file) + "}", new_string="{" + str(new_filename) + "}", ) if not file.is_file(): corrected_path = Path(str(file).replace(" ", " ")) if corrected_path.is_file(): file = corrected_path if file.is_file(): shutil.move(str(file), str(new_filename)) elif file.is_symlink(): shutil.move(str(file), str(new_filename)) record_dict[Fields.FILE] = str(new_filename) self.review_manager.logger.info(f"rename {file.name} > {new_filename}") if RecordState.rev_prescreen_included == record_dict[Fields.STATUS]: record = colrev.record.record_pdf.PDFRecord( record_dict, path=self.review_manager.path ) record.set_status(RecordState.pdf_imported)
[docs] def rename_pdfs(self) -> None: """Rename the PDFs""" self.review_manager.logger.info("Rename PDFs") records = self.review_manager.dataset.load_records_dict() # We may use other pdfs_search_files from the sources: # review_manager.settings.sources pdfs_search_file = Path("data/search/pdfs.bib") for record_dict in records.values(): if Fields.FILE not in record_dict: continue if record_dict[Fields.STATUS] not in RecordState.get_post_x_states( state=RecordState.md_processed ): continue file = Path(record_dict[Fields.FILE]) new_filename = file.parents[0] / Path(f"{record_dict['ID']}{file.suffix}") # Possible option: move to top (pdfs) directory: # new_filename = self.review_manager.PDF_DIR_RELATIVE / Path( # f"{record['ID']}.pdf" # ) if str(file) == str(new_filename): continue self._rename_pdf( record_dict=record_dict, file=file, new_filename=new_filename, pdfs_search_file=pdfs_search_file, ) self.review_manager.dataset.save_records_dict(records) if pdfs_search_file.is_file(): self.review_manager.dataset.git_repo.add_changes(pdfs_search_file)
def _get_data(self) -> dict: # pylint: disable=duplicate-code records_headers = self.review_manager.dataset.load_records_dict( header_only=True ) record_header_list = list(records_headers.values()) nr_tasks = len( [ x for x in record_header_list if x[Fields.STATUS] in [ RecordState.pdf_needs_manual_retrieval, RecordState.rev_prescreen_included, ] ] ) items = self.review_manager.dataset.read_next_record( conditions=[ {Fields.STATUS: RecordState.rev_prescreen_included}, {Fields.STATUS: RecordState.pdf_needs_manual_retrieval}, ], ) self.to_retrieve = nr_tasks pdf_get_data = { "nr_tasks": nr_tasks, "items": [{"record": item} for item in items], } return pdf_get_data def _print_stats(self, retrieved_record_list: list) -> None: self.retrieved = len([r for r in retrieved_record_list if Fields.FILE in r]) self.not_retrieved = self.to_retrieve - self.retrieved retrieved_string = "Overall pdf_imported".ljust(34) if self.retrieved == 0: retrieved_string += f"{self.retrieved}".rjust(6, " ") retrieved_string += " PDFs" elif self.retrieved == 1: retrieved_string += f"{Colors.GREEN}" retrieved_string += f"{self.retrieved}".rjust(6, " ") retrieved_string += f"{Colors.END} PDF" else: retrieved_string += f"{Colors.GREEN}" retrieved_string += f"{self.retrieved}".rjust(6, " ") retrieved_string += f"{Colors.END} PDFs" not_retrieved_string = "Overall pdf_needs_manual_retrieval".ljust(34) if self.not_retrieved == 0: not_retrieved_string += f"{self.not_retrieved}".rjust(6, " ") not_retrieved_string += " PDFs" elif self.not_retrieved == 1: not_retrieved_string += f"{Colors.ORANGE}" not_retrieved_string += f"{self.not_retrieved}".rjust(6, " ") not_retrieved_string += f"{Colors.END} PDF" else: not_retrieved_string += f"{Colors.ORANGE}" not_retrieved_string += f"{self.not_retrieved}".rjust(6, " ") not_retrieved_string += f"{Colors.END} PDFs" self.review_manager.logger.info(retrieved_string) self.review_manager.logger.info(not_retrieved_string) def _set_status_if_pdf_linked( self, records: dict ) -> dict[str, dict[str, typing.Any]]: for record_dict in records.values(): if record_dict[Fields.STATUS] in [ RecordState.rev_prescreen_included, RecordState.pdf_needs_manual_retrieval, ]: record = colrev.record.record_pdf.PDFRecord( record_dict, path=self.review_manager.path ) if Fields.FILE in record_dict: if any( Path(fpath).is_file() for fpath in record.data[Fields.FILE].split(";") ): if ( RecordState.rev_prescreen_included == record.data[Fields.STATUS] ): record.set_status(RecordState.pdf_imported) else: self.review_manager.logger.warning( "Remove non-existent file link " f"({record_dict[Fields.ID]}: {record_dict[Fields.FILE]}" ) record.remove_field(key=Fields.FILE) self.review_manager.dataset.save_records_dict(records) return records
[docs] def setup_custom_script(self) -> None: """Setup a custom pfd-get script""" filedata = colrev.env.utils.get_package_file_content( module="colrev.ops", filename=Path("custom_scripts/custom_pdf_get_script.py"), ) if filedata: with open("custom_pdf_get_script.py", "w", encoding="utf-8") as file: file.write(filedata.decode("utf-8")) self.review_manager.dataset.git_repo.add_changes( Path("custom_pdf_get_script.py") ) self.review_manager.settings.pdf_get.pdf_get_man_package_endpoints.append( {"endpoint": "custom_pdf_get_script"} ) self.review_manager.save_settings()
@colrev.process.operation.Operation.decorate() def main(self) -> None: """Get PDFs (main entrypoint)""" if utils.in_ci_environment() and not self.review_manager.in_test_environment(): raise colrev_exceptions.ServiceNotAvailableException( dep="colrev pdf-prep", detailed_trace="pdf-prep not available in ci environment", ) self.review_manager.logger.info("Get PDFs") self.review_manager.logger.info( "Get PDFs of prescreen-included records from local and remote sources." ) self.review_manager.logger.info( "PDFs are stored in the directory data/pdfs " f"({Colors.ORANGE}colrev pdfs --dir{Colors.END})" ) self.review_manager.logger.info( "See https://colrev-environment.github.io/colrev/manual/pdf_retrieval/pdf_get.html" ) records = self.review_manager.dataset.load_records_dict() records = self._set_status_if_pdf_linked(records) records = self.check_existing_unlinked_pdfs(records) pdf_get_data = self._get_data() if pdf_get_data["nr_tasks"] == 0: self.review_manager.logger.info("No additional pdfs to retrieve") else: self.review_manager.logger.info( "PDFs to get".ljust(38) + f'{pdf_get_data["nr_tasks"]} PDFs' ) pool = Pool(4) retrieved_record_list = pool.map(self.get_pdf, pdf_get_data["items"]) pool.close() pool.join() self.review_manager.dataset.save_records_dict( {r[Fields.ID]: r for r in retrieved_record_list}, partial=True ) self._print_stats(retrieved_record_list) # Note: rename should be after copy. # Note : do not pass records as an argument. if self.review_manager.settings.pdf_get.rename_pdfs: self.rename_pdfs() self.review_manager.create_commit(msg="PDFs: get and prepare") self.review_manager.logger.info( f"{Colors.GREEN}Completed pdf-get operation{Colors.END}" )