Source code for vcf_to_obsidian.vcf_converter

"""
VCF Converter module for handling VCF to Markdown conversion.
"""

import re
from datetime import datetime, timezone
from pathlib import Path
from .vcf_reader import VCFReader
from .markdown_writer import MarkdownWriter
from .filename_generator import FilenameGenerator


[docs] class VCFConverter: """Class responsible for converting VCF files to Markdown format.""" def __init__(self): """Initialize the VCF converter."""
[docs] self.reader = VCFReader()
[docs] self.writer = MarkdownWriter()
[docs] self.filename_gen = FilenameGenerator()
[docs] def _extract_rev_timestamp_from_markdown(self, markdown_path): """ Extract REV timestamp from existing Markdown file. Args: markdown_path (Path): Path to the Markdown file Returns: datetime or None: REV timestamp as datetime object, or None if not found """ try: if not markdown_path.exists(): return None with open(markdown_path, 'r', encoding='utf-8') as f: content = f.read() # Look for REV timestamp in format: REV: YYYYMMDDTHHMMSSZ match = re.search(r'REV: (\d{8}T\d{6}Z)', content) if match: timestamp_str = match.group(1) # Parse the timestamp format YYYYMMDDTHHMMSSZ return datetime.strptime(timestamp_str, "%Y%m%dT%H%M%SZ").replace(tzinfo=timezone.utc) return None except Exception: return None
[docs] def _should_skip_conversion(self, vcf_path, markdown_path): """ Check if conversion should be skipped based on file modification times. Args: vcf_path (Path): Path to the VCF file markdown_path (Path): Path to the Markdown file Returns: bool: True if conversion should be skipped, False otherwise """ if not markdown_path.exists(): return False # Get VCF file modification time vcf_mtime = datetime.fromtimestamp(vcf_path.stat().st_mtime, tz=timezone.utc) # Get REV timestamp from markdown rev_timestamp = self._extract_rev_timestamp_from_markdown(markdown_path) if rev_timestamp is None: # If we can't find REV timestamp, convert to be safe return False # Debug output # print(f"VCF mtime: {vcf_mtime}") # print(f"REV timestamp: {rev_timestamp}") # Skip conversion if VCF file is not newer than the REV timestamp # Use a small tolerance to account for filesystem timestamp precision return vcf_mtime <= rev_timestamp
[docs] def convert_vcf_to_markdown(self, vcf_path, output_dir): """ Convert a single VCF file to Markdown format. Args: vcf_path (Path): Path to the VCF file output_dir (Path): Output directory for Markdown files Returns: bool: True if successful, False otherwise """ try: # Read VCF file to get vcard for filename generation vcard = self.reader.read_vcf_file(vcf_path) # Generate filename output_filename = self.filename_gen.generate_filename(vcard, vcf_path) output_file = Path(output_dir) / f"{output_filename}.md" # Check if we should skip conversion based on modification times if self._should_skip_conversion(vcf_path, output_file): print(f"Skipped: {vcf_path.name} -> {output_file.name} (VCF not newer than markdown)") return True # Generate markdown content markdown_content = self.writer.generate_obsidian_markdown(vcard) # Remove existing files with the same UID if the filename would be different if hasattr(vcard, "uid") and vcard.uid and vcard.uid.value: existing_files = self.filename_gen.find_existing_files_with_uid( output_dir, vcard.uid.value ) for existing_file in existing_files: if existing_file != output_file: try: existing_file.unlink() print(f"Removed old file: {existing_file.name}") except Exception as e: print( f"Warning: Could not remove old file {existing_file.name}: {e}" ) # Write Markdown file with open(output_file, "w", encoding="utf-8") as f: f.write(markdown_content) print(f"Converted: {vcf_path.name} -> {output_file.name}") return True except Exception as e: print(f"Error converting {vcf_path}: {e}") return False
[docs] def convert_vcf_files_from_sources( self, folder_sources, file_sources, output_dir, ignore_files=None, verbose=False ): """ Convert VCF files from multiple sources (folders and individual files) to Markdown format. This method collects VCF files from the specified sources, applies ignore filters, and processes them directly using convert_vcf_to_markdown. Args: folder_sources (list): List of Path objects for directories containing VCF files file_sources (list): List of Path objects for individual VCF files output_dir (Path): Output directory for Markdown files ignore_files (list, optional): List of Path objects for files to ignore verbose (bool): Whether to enable verbose output Returns: tuple: (successful_count, total_count, all_vcf_files) """ import click # Collect all VCF files to process all_vcf_files = [] processed_paths = set() # Track processed file paths to avoid duplicates # Process folder sources for source_path in folder_sources: if not source_path.is_dir(): if verbose: click.echo( f"Error: Source path '{source_path}' is not a directory.", err=True, ) continue # Find all VCF files in this directory vcf_files = list(source_path.glob("*.vcf")) + list( source_path.glob("*.VCF") ) new_files_count = 0 for vcf_file in vcf_files: absolute_path = vcf_file.resolve() if absolute_path not in processed_paths: all_vcf_files.append(vcf_file) processed_paths.add(absolute_path) new_files_count += 1 if verbose: if new_files_count < len(vcf_files): click.echo( f"Found {len(vcf_files)} VCF file(s) in '{source_path}' ({new_files_count} new, {len(vcf_files) - new_files_count} duplicates)" ) else: click.echo(f"Found {len(vcf_files)} VCF file(s) in '{source_path}'") # Process individual file sources for file_path in file_sources: if not file_path.exists(): if verbose: click.echo(f"Error: File '{file_path}' does not exist.", err=True) continue if not file_path.is_file(): if verbose: click.echo(f"Error: Path '{file_path}' is not a file.", err=True) continue # Check if it's a VCF file by extension if file_path.suffix.lower() not in [".vcf"]: if verbose: click.echo( f"Warning: File '{file_path}' does not have a .vcf extension.", err=True, ) absolute_path = file_path.resolve() if absolute_path not in processed_paths: all_vcf_files.append(file_path) processed_paths.add(absolute_path) if verbose: click.echo(f"Added individual file: '{file_path}'") else: if verbose: click.echo(f"Skipping duplicate file: '{file_path}'") # Process ignore list - remove specified files from the conversion list if ignore_files: ignore_paths = set() for ignore_path in ignore_files: absolute_ignore_path = ignore_path.resolve() ignore_paths.add(absolute_ignore_path) if verbose: click.echo(f"Will ignore file: '{ignore_path}'") # Filter out ignored files initial_count = len(all_vcf_files) all_vcf_files = [ vcf_file for vcf_file in all_vcf_files if vcf_file.resolve() not in ignore_paths ] ignored_count = initial_count - len(all_vcf_files) if ignored_count > 0 and verbose: click.echo(f"Ignored {ignored_count} file(s)") # Create destination directory output_dir.mkdir(parents=True, exist_ok=True) if verbose: click.echo(f"Destination directory: '{output_dir}'") if verbose: click.echo(f"Converting to Markdown in '{output_dir}'") # Convert each VCF file to the destination directly successful_conversions = 0 total_conversions = len(all_vcf_files) for vcf_file in all_vcf_files: if self.convert_vcf_to_markdown(vcf_file, output_dir): successful_conversions += 1 return successful_conversions, total_conversions, all_vcf_files
[docs] def process_tasks(self, folder, obsidian, file, verbose, ignore): """ Process VCF conversion tasks from CLI arguments. This method handles the complete CLI workflow including validation, file collection, processing, and reporting. Args: folder: Tuple/list of folder paths containing VCF files obsidian: Path to destination directory for Markdown files file: Tuple/list of individual VCF file paths to process verbose: Boolean flag for verbose output ignore: Tuple/list of VCF file paths to ignore """ import click import sys # Validate that at least one source is specified if not folder and not file: click.echo( "Error: Must specify at least one --folder or --file option.", err=True ) sys.exit(1) # Validate file and folder sources exist before processing for folder_path in folder: if not folder_path.is_dir(): click.echo( f"Error: Source path '{folder_path}' is not a directory.", err=True ) sys.exit(1) for file_path in file: if not file_path.exists(): click.echo(f"Error: File '{file_path}' does not exist.", err=True) sys.exit(1) if not file_path.is_file(): click.echo(f"Error: Path '{file_path}' is not a file.", err=True) sys.exit(1) # Convert tuples to lists for easier handling folder_sources = list(folder) if folder else [] file_sources = list(file) if file else [] ignore_files = list(ignore) if ignore else [] # Use existing method to handle the conversion successful_conversions, total_conversions, all_vcf_files = ( self.convert_vcf_files_from_sources( folder_sources=folder_sources, file_sources=file_sources, output_dir=obsidian, ignore_files=ignore_files, verbose=verbose, ) ) # Handle edge cases for messaging if not all_vcf_files: if not folder_sources and not file_sources: click.echo("No VCF files found to process.", err=True) else: click.echo( "No VCF files remaining to process after applying ignore list.", err=True, ) sys.exit(1) # Report final results click.echo(f"Found {len(all_vcf_files)} VCF file(s) to process") click.echo( f"Successfully completed {successful_conversions}/{len(all_vcf_files)} conversions." )