Source code for colrev.ops.pdf_get

#! /usr/bin/env python
"""CoLRev pdf_get operation: Get PDF documents."""
from __future__ import annotations

import shutil
from glob import glob
from multiprocessing.pool import ThreadPool as Pool
from pathlib import Path

import colrev.exceptions as colrev_exceptions
import colrev.process.operation
import colrev.record.record_pdf
from colrev.constants import Colors
from colrev.constants import EndpointType
from colrev.constants import Fields
from colrev.constants import OperationsType
from colrev.constants import PDFPathType
from colrev.constants import RecordState
from colrev.writer.write_utils import write_file


[docs]class PDFGet(colrev.process.operation.Operation): """Get the PDFs""" to_retrieve: int retrieved: int not_retrieved: int type = OperationsType.pdf_get def __init__( self, *, review_manager: colrev.review_manager.ReviewManager, notify_state_transition_operation: bool = True, ) -> None: super().__init__( review_manager=review_manager, operations_type=self.type, notify_state_transition_operation=notify_state_transition_operation, ) self.package_manager = self.review_manager.get_package_manager() pdf_dir = self.review_manager.paths.pdf pdf_dir.mkdir(exist_ok=True, parents=True) self.pdf_qm = self.review_manager.get_pdf_qm() self.filepath_directory_pattern = "" pdf_endpoints = [ s for s in self.review_manager.settings.sources if s.endpoint == "colrev.files_dir" ] if pdf_endpoints: self.filepath_directory_pattern = ( pdf_endpoints[0].search_parameters["scope"].get("subdir_pattern", {}) )
[docs] def copy_pdfs_to_repo(self) -> None: """Copy the PDFs to the repository""" self.review_manager.logger.info("Copy PDFs to dir") records = self.review_manager.dataset.load_records_dict() for record_dict in records.values(): if Fields.FILE not in record_dict: continue fpath = Path(record_dict[Fields.FILE]) new_fpath = fpath.absolute() if fpath.is_symlink(): linked_file = fpath.resolve() if linked_file.is_file(): fpath.unlink() shutil.copyfile(linked_file, new_fpath) self.review_manager.logger.info( f" {Colors.GREEN}copied PDF for {record_dict[Fields.ID]} {Colors.END}" ) elif new_fpath.is_file() and self.review_manager.verbose_mode: self.review_manager.logger.info( f"No need to copy PDF - already exits ({record_dict[Fields.ID]})" )
[docs] def get_target_filepath(self, record: colrev.record.record.Record) -> Path: """Get the target filepath for a PDF""" target_filepath = self.review_manager.paths.PDF_DIR / Path( f"{record.data['ID']}.pdf" ) if self.filepath_directory_pattern == Fields.YEAR: target_filepath = self.review_manager.paths.PDF_DIR / Path( f"{record.data.get('year', 'no_year')}/{record.data['ID']}.pdf" ) elif self.filepath_directory_pattern == "volume_number": if Fields.VOLUME in record.data and Fields.NUMBER in record.data: target_filepath = self.review_manager.paths.PDF_DIR / Path( f"{record.data['volume']}/{record.data['number']}/{record.data['ID']}.pdf" ) if Fields.VOLUME in record.data and Fields.NUMBER not in record.data: target_filepath = self.review_manager.paths.PDF_DIR / Path( f"{record.data['volume']}/{record.data['ID']}.pdf" ) return target_filepath
[docs] def import_pdf(self, record: colrev.record.record.Record) -> None: """Import a file (PDF) and copy/symlink it""" # self.review_manager.pdf_dir.mkdir(exist_ok=True) # new_fp = self.review_manager.PDF_DIR_RELATIVE / Path(record.data[Fields.ID] + ".pdf").name new_fp = self.get_target_filepath(record) original_fp = Path(record.data[Fields.FILE]) if new_fp != original_fp and not new_fp.is_file(): new_fp.parents[0].mkdir(exist_ok=True, parents=True) if ( PDFPathType.symlink == self.review_manager.settings.pdf_get.pdf_path_type ): new_fp.symlink_to(original_fp) elif PDFPathType.copy == self.review_manager.settings.pdf_get.pdf_path_type: shutil.copyfile(original_fp, new_fp.resolve()) # Note : else: leave absolute paths record.data[Fields.FILE] = str(new_fp)
def _log_infos(self, record: colrev.record.record.Record) -> None: if Fields.FILE not in record.data: if ( not self.review_manager.settings.pdf_get.pdf_required_for_screen_and_synthesis ): return self.review_manager.logger.info( f" {Colors.ORANGE}{record.data['ID']}".ljust(46) + f"rev_prescreen_included → pdf_needs_manual_retrieval{Colors.END}" ) return if RecordState.pdf_prepared == record.data[Fields.STATUS]: self.review_manager.logger.info( f" {Colors.GREEN}{record.data['ID']}".ljust(46) + f"rev_prescreen_included → pdf_prepared{Colors.END}" ) elif RecordState.pdf_needs_manual_preparation == record.data[Fields.STATUS]: self.review_manager.logger.info( f" {Colors.ORANGE}{record.data['ID']}".ljust(46) + f"rev_prescreen_included → pdf_needs_manual_preparation{Colors.END}" ) # Note : no named arguments (multiprocessing)
[docs] def get_pdf(self, item: dict) -> dict: """Get PDFs (based on the package endpoints in the settings)""" record_dict = item["record"] if record_dict[Fields.STATUS] not in [ RecordState.rev_prescreen_included, RecordState.pdf_needs_manual_retrieval, ]: if Fields.FILE in record_dict: record = colrev.record.record_pdf.PDFRecord( record_dict, path=self.review_manager.path ) record.remove_field(key=Fields.FILE) return record.get_data() return record_dict record = colrev.record.record_pdf.PDFRecord( record_dict, path=self.review_manager.path ) for ( pdf_get_package_endpoint ) in self.review_manager.settings.pdf_get.pdf_get_package_endpoints: pdf_get_class = self.package_manager.get_package_endpoint_class( package_type=EndpointType.pdf_get, package_identifier=pdf_get_package_endpoint["endpoint"], ) endpoint = pdf_get_class( pdf_get_operation=self, settings=pdf_get_package_endpoint ) endpoint.get_pdf(record) # type: ignore if Fields.FILE in record.data: self.review_manager.report_logger.info( f"{endpoint.settings.endpoint}" # type: ignore f"({record.data[Fields.ID]}): retrieved .../" f"{Path(record.data[Fields.FILE]).name}" ) break if Fields.FILE in record.data: record.run_pdf_quality_model(self.pdf_qm, set_prepared=True) else: record.set_status(RecordState.pdf_needs_manual_retrieval) self._log_infos(record) return record.get_data()
def _fix_broken_symlinks(self) -> None: pdf_dir = self.review_manager.paths.pdf broken_symlinks = [] for pdf_candidate in list(pdf_dir.glob("**/*.pdf")): relative_path = pdf_candidate.relative_to(self.review_manager.path) if not relative_path.is_file(): if pdf_candidate.is_symlink(): broken_symlinks.append(pdf_candidate) continue if broken_symlinks: print("To fix broken symlinks:") old_path = input("Enter the old path: ") new_path = input("Enter the new path: ") for broken_symlink in broken_symlinks: new_file = str(broken_symlink.resolve()).replace(old_path, new_path) print(f"Fix {broken_symlink}") broken_symlink.unlink() broken_symlink.symlink_to(new_file) def _relink_pdfs_in_source(self, source: colrev.settings.SearchSource) -> None: # pylint: disable=too-many-locals self.review_manager.logger.info( "Checking PDFs in same directory to reassign when " f"the cpid is identical {source.filename}" ) pdf_dir = self.review_manager.paths.pdf pdf_candidates = {} for pdf_candidate in list(pdf_dir.glob("**/*.pdf")): colrev_pdf_id = colrev.record.record_pdf.PDFRecord.get_colrev_pdf_id( pdf_candidate ) relative_path = pdf_candidate.relative_to(self.review_manager.path) pdf_candidates[relative_path] = colrev_pdf_id source_records_dict = colrev.loader.load_utils.load( filename=source.filename, logger=self.review_manager.logger, ) source_records = list(source_records_dict.values()) corresponding_origin = str(source.filename) records = self.review_manager.dataset.load_records_dict() for record in records.values(): if Fields.FILE not in record: continue # Note: we check the source_records based on the cpids # in the record because cpids are not stored in the source_record # (pdf hashes may change after import/preparation) source_rec = {} if corresponding_origin != "": source_origin_l = [ o for o in record[Fields.ORIGIN] if corresponding_origin in o ] if len(source_origin_l) == 1: source_origin = source_origin_l[0] source_origin = source_origin.replace( f"{corresponding_origin}/", "" ) source_rec_l = [ s for s in source_records if s[Fields.ID] == source_origin ] if len(source_rec_l) == 1: source_rec = source_rec_l[0] if source_rec: if ( self.review_manager.path / Path(record[Fields.FILE]) ).is_file() and ( self.review_manager.path / Path(source_rec[Fields.FILE]) ).is_file(): continue else: if (self.review_manager.path / Path(record[Fields.FILE])).is_file(): continue self.review_manager.logger.info(record[Fields.ID]) for pdf_candidate, cpid in pdf_candidates.items(): if record.get("colrev_pdf_id", "") == cpid: record[Fields.FILE] = str(pdf_candidate) source_rec[Fields.FILE] = str(pdf_candidate) self.review_manager.logger.info( f"Found and linked PDF: {pdf_candidate}" ) break if len(source_records) > 0: source_records_dict = {r[Fields.ID]: r for r in source_records} write_file(records_dict=source_records_dict, filename=source.filename) self.review_manager.dataset.save_records_dict(records) self.review_manager.dataset.add_changes(source.filename)
[docs] def check_existing_unlinked_pdfs( self, records: dict, ) -> dict: """Check for PDFs that are in the pdfs directory but not linked in the record file""" linked_pdfs = [ str(Path(x[Fields.FILE]).resolve()) for x in records.values() if Fields.FILE in x ] pdf_dir = self.review_manager.paths.pdf pdf_files = glob(str(pdf_dir) + "/**.pdf", recursive=True) unlinked_pdfs = [ Path(x) for x in pdf_files if str(Path(x).resolve()) not in linked_pdfs and not any(kw in x for kw in ["_with_lp.pdf", "_with_cp.pdf", "_ocr.pdf"]) ] if len(unlinked_pdfs) == 0: return records grobid_service = self.review_manager.get_grobid_service() grobid_service.start() self.review_manager.logger.info("Check unlinked PDFs") for file in unlinked_pdfs: msg = f"Check unlinked PDF: {file.relative_to(self.review_manager.path)}" self.review_manager.logger.info(msg) if file.stem not in records.keys(): tei = self.review_manager.get_tei(pdf_path=file) pdf_record = tei.get_metadata() if "error" in pdf_record: continue max_similarity = 0.0 max_sim_record = None for record in records.values(): sim = colrev.record.record_pdf.PDFRecord.get_record_similarity( colrev.record.record_pdf.PDFRecord( pdf_record, path=self.review_manager.path ), colrev.record.record_pdf.PDFRecord( record.copy(), path=self.review_manager.path ), ) if sim > max_similarity: max_similarity = sim max_sim_record = record if max_sim_record: if max_similarity > 0.5: if RecordState.pdf_prepared == max_sim_record[Fields.STATUS]: continue record = colrev.record.record_pdf.PDFRecord( max_sim_record, path=self.review_manager.path ) record.update_field( key=Fields.FILE, value=str(file), source="linking-available-files", ) self.import_pdf(record) if ( RecordState.rev_prescreen_included == record.data[Fields.STATUS] ): record.set_status(RecordState.pdf_imported) self.review_manager.report_logger.info( "linked unlinked pdf:" f" {file.name}" ) self.review_manager.logger.info( "linked unlinked pdf:" f" {file.name}" ) # max_sim_record = \ # pdf_prep.validate_pdf_metadata(max_sim_record) # colrev_status = max_sim_record['colrev_status'] # if RecordState.pdf_needs_manual_preparation == colrev_status: # # revert? else: record = records[file.stem] self.link_pdf( colrev.record.record_pdf.PDFRecord( record, path=self.review_manager.path ) ) self.review_manager.dataset.save_records_dict(records) return records
def _rename_pdf( self, *, record_dict: dict, file: Path, new_filename: Path, pdfs_search_file: Path, ) -> None: record_dict[Fields.FILE] = new_filename if pdfs_search_file.is_file(): colrev.env.utils.inplace_change( filename=pdfs_search_file, old_string="{" + str(file) + "}", new_string="{" + str(new_filename) + "}", ) if not file.is_file(): corrected_path = Path(str(file).replace(" ", " ")) if corrected_path.is_file(): file = corrected_path if file.is_file(): shutil.move(str(file), str(new_filename)) elif file.is_symlink(): shutil.move(str(file), str(new_filename)) record_dict[Fields.FILE] = str(new_filename) self.review_manager.logger.info(f"rename {file.name} > {new_filename}") if RecordState.rev_prescreen_included == record_dict[Fields.STATUS]: record = colrev.record.record_pdf.PDFRecord( record_dict, path=self.review_manager.path ) record.set_status(RecordState.pdf_imported)
[docs] def rename_pdfs(self) -> None: """Rename the PDFs""" self.review_manager.logger.info("Rename PDFs") records = self.review_manager.dataset.load_records_dict() # We may use other pdfs_search_files from the sources: # review_manager.settings.sources pdfs_search_file = Path("data/search/pdfs.bib") for record_dict in records.values(): if Fields.FILE not in record_dict: continue if record_dict[Fields.STATUS] not in RecordState.get_post_x_states( state=RecordState.md_processed ): continue file = Path(record_dict[Fields.FILE]) new_filename = file.parents[0] / Path(f"{record_dict['ID']}{file.suffix}") # Possible option: move to top (pdfs) directory: # new_filename = self.review_manager.PDF_DIR_RELATIVE / Path( # f"{record['ID']}.pdf" # ) if str(file) == str(new_filename): continue self._rename_pdf( record_dict=record_dict, file=file, new_filename=new_filename, pdfs_search_file=pdfs_search_file, ) self.review_manager.dataset.save_records_dict(records) if pdfs_search_file.is_file(): self.review_manager.dataset.add_changes(pdfs_search_file)
def _get_data(self) -> dict: # pylint: disable=duplicate-code records_headers = self.review_manager.dataset.load_records_dict( header_only=True ) record_header_list = list(records_headers.values()) nr_tasks = len( [ x for x in record_header_list if x[Fields.STATUS] in [ RecordState.pdf_needs_manual_retrieval, RecordState.rev_prescreen_included, ] ] ) items = self.review_manager.dataset.read_next_record( conditions=[ {Fields.STATUS: RecordState.rev_prescreen_included}, {Fields.STATUS: RecordState.pdf_needs_manual_retrieval}, ], ) self.to_retrieve = nr_tasks pdf_get_data = { "nr_tasks": nr_tasks, "items": [{"record": item} for item in items], } return pdf_get_data def _print_stats(self, retrieved_record_list: list) -> None: self.retrieved = len([r for r in retrieved_record_list if Fields.FILE in r]) self.not_retrieved = self.to_retrieve - self.retrieved retrieved_string = "Overall pdf_imported".ljust(34) if self.retrieved == 0: retrieved_string += f"{self.retrieved}".rjust(6, " ") retrieved_string += " PDFs" elif self.retrieved == 1: retrieved_string += f"{Colors.GREEN}" retrieved_string += f"{self.retrieved}".rjust(6, " ") retrieved_string += f"{Colors.END} PDF" else: retrieved_string += f"{Colors.GREEN}" retrieved_string += f"{self.retrieved}".rjust(6, " ") retrieved_string += f"{Colors.END} PDFs" not_retrieved_string = "Overall pdf_needs_manual_retrieval".ljust(34) if self.not_retrieved == 0: not_retrieved_string += f"{self.not_retrieved}".rjust(6, " ") not_retrieved_string += " PDFs" elif self.not_retrieved == 1: not_retrieved_string += f"{Colors.ORANGE}" not_retrieved_string += f"{self.not_retrieved}".rjust(6, " ") not_retrieved_string += f"{Colors.END} PDF" else: not_retrieved_string += f"{Colors.ORANGE}" not_retrieved_string += f"{self.not_retrieved}".rjust(6, " ") not_retrieved_string += f"{Colors.END} PDFs" self.review_manager.logger.info(retrieved_string) self.review_manager.logger.info(not_retrieved_string) def _set_status_if_pdf_linked(self, records: dict) -> dict: for record_dict in records.values(): if record_dict[Fields.STATUS] in [ RecordState.rev_prescreen_included, RecordState.pdf_needs_manual_retrieval, ]: record = colrev.record.record_pdf.PDFRecord( record_dict, path=self.review_manager.path ) if Fields.FILE in record_dict: if any( Path(fpath).is_file() for fpath in record.data[Fields.FILE].split(";") ): if ( RecordState.rev_prescreen_included == record.data[Fields.STATUS] ): record.set_status(RecordState.pdf_imported) else: self.review_manager.logger.warning( "Remove non-existent file link " f"({record_dict[Fields.ID]}: {record_dict[Fields.FILE]}" ) record.remove_field(key=Fields.FILE) self.review_manager.dataset.save_records_dict(records) return records
[docs] def setup_custom_script(self) -> None: """Setup a custom pfd-get script""" filedata = colrev.env.utils.get_package_file_content( module="colrev.ops", filename=Path("custom_scripts/custom_pdf_get_script.py"), ) if filedata: with open("custom_pdf_get_script.py", "w", encoding="utf-8") as file: file.write(filedata.decode("utf-8")) self.review_manager.dataset.add_changes(Path("custom_pdf_get_script.py")) self.review_manager.settings.pdf_get.pdf_get_man_package_endpoints.append( {"endpoint": "custom_pdf_get_script"} ) self.review_manager.save_settings()
@colrev.process.operation.Operation.decorate() def main(self) -> None: """Get PDFs (main entrypoint)""" if ( self.review_manager.in_ci_environment() and not self.review_manager.in_test_environment() ): raise colrev_exceptions.ServiceNotAvailableException( dep="colrev pdf-prep", detailed_trace="pdf-prep not available in ci environment", ) self.review_manager.logger.info("Get PDFs") self.review_manager.logger.info( "Get PDFs of prescreen-included records from local and remote sources." ) self.review_manager.logger.info( "PDFs are stored in the directory data/pdfs " f"({Colors.ORANGE}colrev pdfs --dir{Colors.END})" ) self.review_manager.logger.info( "See https://colrev-environment.github.io/colrev/manual/pdf_retrieval/pdf_get.html" ) records = self.review_manager.dataset.load_records_dict() records = self._set_status_if_pdf_linked(records) records = self.check_existing_unlinked_pdfs(records) pdf_get_data = self._get_data() if pdf_get_data["nr_tasks"] == 0: self.review_manager.logger.info("No additional pdfs to retrieve") else: self.review_manager.logger.info( "PDFs to get".ljust(38) + f'{pdf_get_data["nr_tasks"]} PDFs' ) pool = Pool(4) retrieved_record_list = pool.map(self.get_pdf, pdf_get_data["items"]) pool.close() pool.join() self.review_manager.dataset.save_records_dict( {r[Fields.ID]: r for r in retrieved_record_list}, partial=True ) self._print_stats(retrieved_record_list) # Note: rename should be after copy. # Note : do not pass records as an argument. if self.review_manager.settings.pdf_get.rename_pdfs: self.rename_pdfs() self.review_manager.dataset.create_commit(msg="PDFs: get and prepare") self.review_manager.logger.info( f"{Colors.GREEN}Completed pdf-get operation{Colors.END}" )