#! /usr/bin/python3 # _*_ coding: iso-latin-1 _*_ # Last edited on 2025-07-11 12:52:06 by stolfi from math import inf, sqrt, log, exp, sin, cos, hypot; import sys, re; from error_funcs import arg_error, prog_error, file_line_error import ivtff_format, ivtff_align # Reads two VMS transcription files {file0} and {file1} with /data lines/ in the # format # # "<{PAGE}[.]{LSEQ}[,]{POSTY}[;]{TRANS}> {DATA}" # # where # # {PAGE} matches the RE "f[0-9]+[rv][0-9]*" (the page's f-number). # {LSEQ} matches the RE "[0-9]+" (the locus's sequence within the page). # {POSTY} matches the RE "[@+*=&~/!][A-Z][a-z0-9]" (the locus position and type). # {TRANS} is a letter or digit string (the transcriber's code). # {DATA} is a line of VMS transcription data in the IVTFF format. # # # The parts "[.]{POSTY}" and "[;]{TRANS}" may be omitted, in which case # {POSTY} and {TRANS} are assumed to be {None}. # # The characters [-] must not occur in the {DATA} field except # as part of a complete inline comment "" or the special # delimiters "<%>", "<$>", "<->", and "<~>". The {DATA} field should # not contain the characters [@°] except inside inline comments "". # # The input file may have a line before each page with format # "<{PAGE}> ...". The rest of the line is ignored. # # The input file may also have blank lines and lines that begin with # '#', .which are ignored. # # All data lines from the same file must have the same {TRANS} code. # # The pair {LOC} = "{PAGE}.{LSEQ}" is the /locus ID/ # # The {LOC}s of {file1} must be a subset of those of {file0}, # and must occur in the same order as they occur in {file0}. # # For each {LOC} that is present in both files, the program writes to # stdout the two corresponding data lines, with fillers '°' inserted in # the respective {DATA} so as to produce the best alignment of the two # lines. In the line from {file1}, each {DATA} char that is equal to the # aligned char of {file0} is replaced by a space. # # Embedded blanks are allowed in the {DATA} field, but are removed # before the comparison. Embedded comments are either deleted or # replaced by "". In the second case, the '!'s are flagged as # non-matching, even if the comments have the same position and contents # in both files. # # For each {LOC} that is present only in {file0}, the program prints one # line of warning. # # The {POSTY} field is used only if and when printing the original data # lines. It is otherwise ignored. def main(): sys.stdout.reconfigure(encoding='iso-8859-1') assert len(sys.argv) == 3, "expects exactly two arguments" st = initial_state(); for i in range(2): st['filename'][i] = sys.argv[i+1]; st['file'][i] = open(st['filename'][i], "r"); st['file'][i].reconfigure(encoding='iso-8859-1') write_output_preamble(st); loop_on_file_1(st) write_output_postamble(st) for i in range(2): st['file'][i].close() sys.stdout.flush() return 0 # ...................................................................... def initial_state(): # Sets up the parsing state as of the start of the file. st = {}; # Global parameters: st['max_pdiff'] = 10 # Max finite discrepancy between unmatched chars. st['max_skip'] = 10 # Unmatched substrings should not be longer than this. # Set {min_pskip} so that chars with finite {pdiff} are paired instead of skipped: st['min_pskip'] = st['max_pdiff'] + 1 st['debug'] = False st['no_comments'] = True; # Should eliminate comments? Else replace by "". st['blots_equal'] = True; # Should '?' be equal to '?'? # Global mutable state: st['unpaired'] = 0 # Number of data lines from file0 missing in file1. st['perfect'] = 0 # Number of paired data lines with perfect match. st['imperfect'] = 0 # Number of paired data lines with detected diffs. # Data about each file: st['filename'] = [None,None] # File name. st['file'] = [None,None] # File object. st['nread'] = [ 0, 0 ] # Number of lines (data or not) read. st['ndata'] = [ 0, 0 ] # Number of data lines read. st['npage'] = [ 0, 0 ] # Number of page headers read. # Items from last data line read from each file: st['fields'] = [ None, None] # Whole line and parsed fields. return st # ...................................................................... def loop_on_file_1(st): # Loops on file 1 input lines, looking for the matching file 0 lines # and comparing them whenever found. while True: read_file_line(st, 1) if st['fields'][1] == None: # EOF return elif st['fields'][1]['lseq'] != None: # Not blank, comment, or page header line: assert st['fields'][1]['page'] != None loc1 = st['fields'][1]['page'] + "." + str(st['fields'][1]['lseq']) get_matching_file0_line(st, loc1) if st['fields'][0] == None: data_error(st, 0, f"locus '{loc1}' from file 1 not in file 0") compute_and_write_alignment(st) return # ...................................................................... def read_file_line(st, i): # Reads a new line from file {i}. # # If succeeded, parses it according to the {IVTFF} format and sets the parsed fields # {st['fields'][i]}. If it is a blank line or a '#'-comment, # only the 'line' field is not {None}. If it is a page header line, only # the 'line', 'page', and data are not None. Otherwise 'line', 'page', 'lseq', # and {data} are not {None}; the fields 'posty' and 'trans' may be {None}. # # If failed (end-of-file), sets {st['fields'][i]} to {None}. line = st['file'][i].readline() if line == "": # End of file: st['fields'][i] = None return st['nread'][i] += 1 line = line.rstrip() fsi = ivtff_format.parse_line(line) st['fields'][i] = fsi if fsi['page'] == None: # Must be comment or blank: if re.match(r"[ ]*([#]|$)", line) == None: data_error(st, i, "invalid line format") elif fsi['lseq'] == None: # Page header: st['npage'][i] += 1 else: # Data line st['ndata'][i] += 1 return; # ...................................................................... def get_matching_file0_line(st, loc1): # Reads one or more lines from {file0} until finding a data line # whose locus ID matches {loc1}. # # If it succeeds, sets {st['fields'][0]} to the parsed fields. # # If reaches the end of {file0} without finding such a line, # returns with {st['fields'][0] = None}. # # Increments {st['unpaired']} for each line of file 0 that is skipped. # # Ignores, blank lines, '#'-comments, and page headers "<{PAGE}>" # with no {LSEQ} field. # while True: read_file_line(st, 0) fs0 = st['fields'][0] if fs0 == None: # End of file: return elif fs0['lseq'] != None: # Not a comment, blank, or page header line: assert fs0['page'] != None loc0 = fs0['page'] + '.' + str(fs0['lseq']) if loc0 == loc1: return else: skipmsg = ("<%s> from file 0 not in in file 1\n" % loc0) out("# "); out(skipmsg); out("\n") if st['unpaired'] < 10: err(skipmsg) elif st['unpaired'] == 10: err("...\n") st['unpaired'] += 1 assert False # ...................................................................... def compute_and_write_alignment(st): # Computes the optimum alignment for the {DATA} parts of # {st['line'][0..1]}, then writes it out. max_pdiff = st['max_pdiff'] max_skip = st['max_skip'] min_pskip = st['min_pskip'] blots_equal = st['blots_equal'] # Before comparing the two data lines, the procedure truncates or # deletes inline comments so that their length is significantly # smaller than {max_skip}. Otherwise this procedure may fail because # it was unable to skip over a long inline comment in one of the # strings. def pdiff(x, y): nonlocal max_pdiff, blots_equal # A numerical discrepancy between charaters {x} and {y}. # It is # 1 if {x,y} {'!','!'}, otherwise # 1 if {x,y} is {'?','?'} and {blots_equal} is false, otherwise # 0 if {x==y}, otherwise # {+inf} if {x} and {y} belong to different classes, otherwise # a positive number between 0 and {max_pdiff}. # depending on how similar the characters are. # # The classes are [<], [>], [{], [}], [!], parag markers [%], [$], # [-~], weirdo markers [&] and [;], rail alignment markers [»=«], # punctuation [-,.], and the EVA characters [a-zA-Z0-9&;?]. The # latter, ignoring case, are divided into subclasses # # [ao] [bnu] [cehi] [dgjm] [fkptwz] [l] [qy] [rs] [0-9] [v] [x] [?] # assert x != None and y != None, "operands must not be {None}" assert not x in "° " and not y in "° ", "invalid char in data" assert len(x) == 1 and len(y) == 1, "strings are not single chars" # There is no need to test singleton classes separately except '!' and '?'. p_punct = 1 # Penalty for different punctuation of same class. p_rails = 2 # Penalty for different rail alignment markers. p_capital = 2 # Penalty for same chars with different case. p_same_class = 2 # Penalty for different chars of same class (case sensitive). p_diff_class = 5 # Penalty for matchable chars of different (sub)classes. assert max_pdiff >= p_diff_class if (x == "!" and y == "!"): return p_punct # So that comments are considered non-matching; elif (x == "?" and y == "?" and not blots_equal): return p_punct # So that "?" are considered non-matching; elif x == y: return 0 else: # Characters are different; min discrepancy is 1. # Case difference only has penalty {}. x = x.lower() y = y.lower() if x == y: # Must be a case difference only: return p_capital elif x in "-~" and y in "-~": # Both internal line breaks, but different: return p_punct elif x in ",." and y in ",.": # Both EVA punctuation, but different: return p_punct elif x in "»=«" and y in "»=«": # Both rail alignment markers, but different: return p_rails elif x in "%$" and y in "%$": # Both parag markers, but different: return p_same_class elif x in "ao" and y in "ao": return p_same_class elif x in "bnu" and y in "bnu": return p_same_class elif x in "cehi" and y in "cehi": return p_same_class elif x in "dgjm" and y in "dgjm": return p_same_class elif x in "fkptwz" and y in "fkptwz": return p_same_class elif x in "qy" and y in "qy": return p_same_class elif x in "rs" and y in "rs": return p_same_class elif re.fullmatch(r"[a-z0-9?]", x) and re.fullmatch(r"[a-z0-9?]", y): # Both EVA characters but different subclasses: return p_diff_class else: # Different classes: return +inf assert False # . . . . . . . . . . . . . . . . . . . . . . . . def pskip(d): nonlocal max_skip, min_pskip # Slippage penalty for not matching {d} characters in either string. assert d >= 0, "invalid skip amount" if d == 0: return 0 elif d > max_skip: return +inf else: # Quadratic function so that it prefers multiple small skips to a larger one: return min_pskip + d*d - 1 assert False # . . . . . . . . . . . . . . . . . . . . . . . . fs = st['fields'] # Fields of each data line. locid = [None, None] # Locus ID from each data line. trans = [None, None] # Transcriber code from each data line. data = [None, None] # The data from each data line, cleaned. for i in range(2): locid[i] = fs[i]['page'] + '.' + str(fs[i]['lseq']) trans[i] = fs[i]['trans'] if trans[i] == None: trans[i] = str(i) data[i] = cleanup_data_field(st, i, fs[i]['data']) assert locid[0] == locid[1], f"locus IDs don't match '{locid[0]}' '{locid[1]}'" if trans[0] == trans[1]: data_error(st, 1, f"same transcriber code '{trans[0]}'") align, penalty = ivtff_align.optimum_alignment(st, data, pdiff, pskip, max_skip) if st['debug']: write_data_lines_as_comment(st, locid, trans, data) out("\n") if st['debug']: debug_alignment(st, locid, trans, align) out("\n") if penalty == 0: write_perfect_match(st, locid, trans) st['perfect'] += 1 else: write_alignment(st, locid, trans, data, align, pdiff) st['imperfect'] += 1 out("\n") return # ...................................................................... def cleanup_data_field(st, i, dt): # Prepares the {DATA} field of a data line for comparison. # Remove embedded blanks: dt = re.sub(r" ", "", dt) if dt == "": data_error(st, i, "empty data line") cmtrep = "" if st['no_comments'] else "" dt = re.sub(r"[<][!][^<>]*[>]", cmtrep, dt) # Now check for forbidden charactes: chbad_pat = r"[°@]" # Not allowed anywhere. mislt_pat = r"(^|[^<])[-~%$!]" # Special [-~%$!] without the "<". misgt_pat = r"[-~%$!]([^>]|$)" # Special [-~%$!] without the ">". badlt_pat = r"[<]($|[^-~%$!])" # "<" not followed by [-~%$!]. badgt_pat = r"(^|[^-~%$!])[>]" # ">" not preceded by [-~%$!]. bad_pat = f"({chbad_pat})|({mislt_pat})|({misgt_pat})|({badlt_pat})|({badgt_pat})" m = re.search(bad_pat, dt) if m != None: ixbad = m.start(0) chbad = m.group(0) data_error(st, i, f"invalid char '{chbad}' at pos {ixbad} of data field") return dt # ...................................................................... def write_perfect_match(st, locid, trans): # Writes a message saying that two data lines matched perfectly. out("# perfect match") for i in range(2): lotra = f" <{locid[i]};{trans[i]}>" out(lotra) out("\n") return # ...................................................................... def debug_alignment(st, locid, trans, align): # Writes the alignment {align} as two lines of indices. # Arguments {locid,trans,align} must be tuples of 2 elements. for i in range(2): lotr = f"<{locid[i]};{trans[i]}>" out("%-18s" % lotr) nk = len(align[i]) - 1 for k in range(nk+1): ixmax = max(align[0][k], align[1][k]) wd = 1 if ixmax <= 9 else (2 if ixmax <= 99 else 3) out(" %*d" % (wd, align[i][k])) out("\n") return # ...................................................................... def write_data_lines_as_comment(st, locid, trans, data): # Writes a pair of data lines as a comments. # Arguments {locid,posty,trans,data} must be tuples of 2 elements. for i in range(2): ivtff_format.write_line(sys.stdout, st['fields'][i], "# ") out("\n") return # ...................................................................... def write_alignment(st, locid, trans, data, align, pdiff): # Writes two aligned data lines for {i} in {0..1]}. # Arguments {locid,trans,data,align} must be tuples of 2 elements. # Output line {i} starts with its locus ID and trans code, # then has the string {data[i]} printed with inserted spaces as needed # for the alignemnt. # # The {pdiff} function should take two chars and return 0 if # they are considered equivalent, or a positive value if they # are to be flagged as not equivalent. For instance one may want to # consider '?' and '?' not equivalent, or '-' and '~' equivalent. # for i in range(2): lotr = f"<{locid[i]};{trans[i]}>" out("%-18s" % lotr) write_aligned_string(i, data, align, pdiff) out("\n") return # ...................................................................... def write_aligned_string(i, data, align, pdiff): # Writes the chars of {data[i]} spaced out as needed according to # {align}. If {i} is 1, replaces by spaces any chars of # {data[i]} that are exactly matched in {data[1-i]}. na = len(data[i]) # Length of data string to print. nb = len(data[1-i]) # Length of the other data string. nk = len(align[i]) # Number of alignment pairs including {(na,nb)}. assert len(align[1-i]) == nk, "alignment length mismatch" ia_prev = -1; ib_prev = -1 for k in range(nk): ia = align[i][k]; ib = align[1-i][k] # Number of characters in unmatched section: dmax = max(ia - ia_prev, ib - ib_prev) - 1 # Print unmatched segment of {data[i]}: for r in range(dmax): ja = ia_prev + r + 1 out(data[i][ja] if ja < ia else "°") # Print the matched character: if ia < na: if i == 1 and pdiff(data[i][ia], data[1-i][ib]) == 0: out(" ") else: out(data[i][ia]) ia_prev = ia; ib_prev = ib return # ...................................................................... def write_output_preamble(st): out("# Created by {compare_ivtff_files.py}\n") out("# \n") out("# File 0: %s:\n" % st['filename'][0]) out("# File 1: %s:\n" % st['filename'][1]) out("# \n") return # ...................................................................... def write_output_postamble(st): assert st['perfect'] + st['imperfect'] == st['ndata'][1] for fil in sys.stderr, sys.stdout: for i in range(2): fil.write(f"# read {st['nread'][i]:5d} lines") fil.write(f" ({st['ndata'][i]:5d} data, {st['npage'][i]:3d} pages)") fil.write(f" from file {i} = {st['filename'][i]}\n") fil.write(f"# {st['unpaired']:5d} loci from file0 missing in file1\n") fil.write(f"# {st['perfect']:5d} perfectly matching line pairs\n") fil.write(f"# {st['imperfect']:5d} imperfectly matching line pairs\n") return # ...................................................................... def out(str): sys.stdout.write(str) return # ...................................................................... def err(str): sys.stderr.write(str) return # ...................................................................... def data_error(st, i, msg): file_line_error(st['filename'][i], st['nread'][i], msg, st['fields'][i]['line']) assert False # ...................................................................... main()