#! /bin/python3 # Last edited on 2026-05-26 18:21:47 by stolfi import sys, os, re import regex as rex from sys import stderr as err, stdin as inp, stdout as out from process_funcs import bash, basic_line_loop from error_funcs import arg_error, file_line_error, prog_error from chinese_funcs import read_chinese_char_set import size_position_funcs as spf import match_multi_funcs as mmf import bimatching_eval_funcs as bef from math import sqrt, hypot, exp, log, pi, inf, nan, floor, ceil, isfinite def evaluate_all_starps_parags(ivt_file, nh, eval_func): # Compares all parags of the SPS with an entry of the SBJ, # according to a specific macro-parsing of the latter into # keyword hits and gaps. # # Arguments: # # {ivt_file} name of input file with all candidate parags, or "-". # {nh} number of keywords that {eval_func} will look for. # {eval_func} an SBJ-SPS matching function. # # The function reads from {ivt_file} (assumed to be in Unicode UTF-8 # encoding) the transcription of one or more SPS parags, which should # by an IVTFF-like format, with one parag per line in the format # "<{LOC}> {TEXT}". The {TEXT} should be the complete text of one # parag EVA encoding. # # The function then applies to the {TEXT} of each line the cleanup an # normalization appropriate for {utype = "ec"}, obtaining a cleaned # EVA text {cleantx_ec}. See {clean_up_starps_raw_text} and # {normalize_starps_text} in {size_position_funcs.py} for details. # # The function then calls {eval_func(cleantx_ec)}, # which must return return a numeric badness {score}, two macro-parsings {segs_ch[0..ns-1]} # and {segs_ec[0..ns-1]}, and the total {key_penalty}. # # The macro_parsings {segs_ch,segs_ec} must split the clean texts of # the SBJ entry and of the SPS parags into thethe same number {nh} of # hits and same number {ng=nh+1} gaps. The {key_penalty} should be # the part of the {score} that is due to non-canonical keyword # choices, like @daiin instead of @daiin and @chedo instead of @chedy. # If not available, it should be set to zero. # # If the {eval_func} returns {+inf} as the score, it is assumed that that # {cleantx_ec} cannot be matched. This function then assigns a score of # {+inf} to the parag and sets {segs_ec} to {None}. # # At the end, the procedure returns a list {parevs} of tuples of the # form {(score, loc_ec, segs_ch, segs_ec, key_penalty)} sorted by # increasing badness score; and the dictionary {data} with various # counts of the operation. debug_file = False ng = nh+1; ns = ng + nh if debug_file: err.write(f"!a {ivt_file = !r}\n") rd = inp if ivt_file == "-" else open(ivt_file, "r") rd.reconfigure(encoding='utf-8') utype_ec = "ec" pat_line, pat_unit, pat_sepa, clean_sepa = spf.get_parsing_patterns(utype_ec) data = dict() # Counts of various things. data['npar_read'] = 0 # Count of input data lines (SPS parags). data['npar_with'] = 0 # Count of SPS parags for which {match-func} succeeded. data['min_size'] = +inf # Minimum size of parags that matched. data['max_size'] = 0 # Maximum size of parags that matched. parevs = [] # Candidates after analysis. def process_input_line(nline, line): nonlocal data, parevs # # Parses a line {line} assuming it is line {nline} of the file. The # {line} is always a string (never {None}), but may be "" if the # line is empty. # # Ignores the line if it is a blank or #-comment. # # Otherwise the line must be a data line, matching {pat_line} # # Increments {data['npar_read']} for each data line. # # Gets {cleantx_ec} by cleaning up {text}. Calls {match-func} to try # to obtain viable parallel macro-pasings of the SBJ entry's clean # text {cleantx_ch} and {cleantx_ec}, with {nh} hits and {nh+1} # gaps. # # If succeeded, increments {data['npar_with']}, # updates {data['min_size'],data['max_size']} creates the candidate # parsing tuple {parev} and appends it to the list {parevs}. debug_line = debug_file def data_error(msg): nonlocal ivt_file, nline, line file_line_error(ivt_file, nline, msg, line) assert False # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ assert line != None, "The {line} arg must not be {None}" # Ignore comments and blank lines: if re.match(r" *([#]|$)", line): return # Just in case, ignore IVTFF page headers: if re.match(r"", line): return data['npar_read'] += 1 m = re.match(pat_line, line) if m is None: # Invalid line format. data_error("invalid line format") # Parse the line into locus ID {loc_ec} and raw text: loc_ec = m.group(1) rawtx_ec = m.group(2) # Normalize and cleanup the raw text: utype = "ec" cleantx_ec, head, tail = spf.clean_up_starps_raw_text(rawtx_ec, utype, data_error) cleantx_ec = spf.normalize_starps_text(cleantx_ec, utype, data_error) debug_line |= (cleantx_ec[:15] == "pokararkeeyokee") if debug_line: err.write(f"!a {cleantx_ec = !r}\n") err.write(f"······································································\n") err.write(f"parag {loc_ec}: {utype} size raw = {len(rawtx_ec)} normalized = {len(cleantx_ec)}:\n") score, segs_ch, segs_ec, key_penalty = eval_func(cleantx_ec) if isfinite(score): # Was able to match: assert segs_ch != None and segs_ec != None assert 0 <= key_penalty and key_penalty <= score assert len(segs_ec) == ns for sg in segs_ch: assert isinstance(sg, str) for sg in segs_ec: assert isinstance(sg, str) data['npar_with'] += 1 if debug_line: err.write(f"!a {loc_ec:<12s} {score = :+8.3f}\n") # Collect statistics of parag lengths: psize_ec = len(cleantx_ec) if psize_ec < data['min_size']: data['min_size'] = psize_ec if psize_ec > data['max_size']: data['max_size'] = psize_ec parev = ( score, loc_ec, segs_ch, segs_ec, key_penalty ) parevs.append(parev) return # :::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: err.write(f"reading file '{ivt_file}' ...\n") nread = basic_line_loop(rd, process_input_line) rd.close() err.write(f"sorting parsed parags ...\n") parevs.sort(key = lambda x : x[0]) err.write(f"{nread:6d} lines read\n") err.write(f"{data['npar_read']:6d} parags read\n") err.write(f"{data['npar_with']:6d} parags matched the keywords\n") err.write(f" size range = {data['min_size']}..{data['max_size']}\n") return parevs, data # ---------------------------------------------------------------------- def output_parev(wr, parev, hipat_ec): # Writes to {wr} the parag evaluation tuple {parev} in the # format suitable for the report. # The {hipat_ec} should be an EVA pattern used to highlight # keywrds in the gaps of the EVA macro-parsing. parev_str = format_starps_parag_evaluation(parev, hipat_ec) wr.write(parev_str); return # ---------------------------------------------------------------------- def add_bencao_starps_matching_INFO(st): # Appends to {st] the explanation for how SPS parags are evaluated. h.parags(st, """The subpages about individual SBJ entries contain the results of structural matching of the entry in question against the SPS parags. These reports consist of one or more /parag evaluation blocks/ or /parevs/, each evaluating one parag of the SPS, which is summarized in a numeric badness score. The reports assume that the SBJ entry, minus all markup and puntuation, is parsed according to a given list of {N} /hanzi keyword patterns/, for example [ '主治|主', '气' ]. None of these patterns should match the empty string. The parsing splits the entry (cleaned of all punctuation and metadata) into {N} non-overlapping hanzi strings that match those patterns (the /hanzi hits/), in the given order, and {N+1} hanzi strings before, between, and after those hits (the /hanzi gaps/). This parsing is shown before all the parevs. Each SPS parag is evauated by deleting all word space markers [,.-] and other markup, and parsing the resulting EVA string too into {N} /EVA hits/ and {N+1} /EVA gaps/, by another list of {N} /EVA keyword patterns/, for example [ 'daiin|dain|laiin', 'chedy|chedo' ]. None of these patterns should match the empty string. Then the badness score is computed by comparing actual and predicted lengths (in EVA letters) of the whole SPS parag and of the {N+1} EVA gaps. The prediction for each of these substrings is based on the number of hanzi in the corresponding substring of the SBJ entry, multiplied by a fixed scale factor. If there are multiple ways to match the {N} EVA keywords within the parag's text, the parev data reflects the choice of the {N} matches that gave the lowest badness score. Conversely, if there is no way to match {N} EVA keywords within the parag's text, the parag is not considered to be a candidate match and is omitted from the matching report.""") return # ---------------------------------------------------------------------- def add_format_starps_parag_evaluation_INFO(st): # Appends to {st] the explanation for the output of # {format_starps_parag_evaluation}. h.parags(st, """The first line of each block is the page and line number of the head line of the candidate SPS parag, followed by its its badness score, by the total length and its discrepancy, and by keyword penalty. The length discrepancy is the difference between the actual length of the parag (in EVA letters, ignoring word spaces) and the length predicted from the count of hanzi in the SBJ entry. The key penalty is the the sum of of the penalty points associated with the specific alternatives of the keywords that were used. That line is followed by {N+1} lines showing the assumed parsing of the parag's text by the specified EVA keyword patterns. There is one line for each EVA gap of that parsing. Each of these lines has tree fields: the EVA hit that precedes the gap (blank for the first gap), the difference between the actual and predicted lengths of the gap, the contribution of that gap to the parag's score, and the gap itself. The gap's score is the squared relative discrepancy between the gap's size in the SPS entry and the size expected from the corresponding gap in the SBJ entry. The discrepancy is the difference between the two, computed in log scale, and divided by the estimated deviation of that difference. Within the gaps, any substrings that matches any of the possible EVA keywords is highighted in boldface, even though they were not considered hits for the purposes of parsing or computation. All size discrepancies are in EVA characters. The badness score is the sum of the gap scores and the keyword hit penalties, and would be zero for a perfect match (all gap sizes match their predicted values and all keyword hits are canonical).""") return # ---------------------------------------------------------------------- def format_starps_parag_evaluation(parev, hipat_ec): # Formats the SPS parag evaluation tuple {parev} as a multiline # string. Each line of it ends with "\n". # # The {parev} must be a tuple {(score, loc_ec, segs_ch, segs_ec, key_penalty)} # where {score} is the badness score of the parag, {loc_ec} is the # locus ID of its first line, {segs_ec} is the macro-parsing of its # pure EVA text into gaps and hits, and {segs_ch} is the corresponding # macro-parsing of the pure hanzi text of the SBJ entry. # # Let {ns} be {len(segs_ch) = len(segs_ec)}. Let {nh = ns//2} be the # number of hits in both parsings, and {ng = nh+1} be the number of # gaps. # # If {hipat_ec} is not {None}, it must be an EVA RE pattern # used to highlight substrings of the gaps of the EVA macro-parsing {segs-ec} # in {parev}. # # The size discrepancies shown in the output are computed as the # difference between the observed individual gap sizes in # {segs_ec} and the expected values computed from the observed hanzi # gap sizes in {segs_ch}. # # actual and predicted lengths of the whole EVA parag text and individual # EVA gaps in the parsing {segs_ec}. The predicted values are derived from # the hanzi counds in the whole SBJ entry and in the gaps of its # parsing {segs_ch}. # # See {format_starps_parag_evaluation_INFO} for the output format. debug = False score, loc_ec, segs_ch, segs_ec, key_penalty = parev assert segs_ec != None ns = len(segs_ec); assert len(segs_ch) == ns nh = ns//2; ng = nh + 1; assert ns == ng + nh # Gap and total bencao sizes: gsizes_ch = bef.get_gap_sizes(segs_ch) tsize_ch = 0; for ks in range(ns): tsize_ch += len(segs_ch[ks]) # Expected starps sizes from bencao sizes: if debug: err.write(f"!> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n") err.write(f"!> {segs_ch = !r}\n") err.write(f"!> {gsizes_ch = }\n") err.write(f"!> {loc_ec = }\n") # Actual starps gap, hit, and text size: gsizes_ec = bef.get_gap_sizes(segs_ec) hsizes_ec = bef.get_hit_sizes(segs_ec) if debug: err.write(f"!> {loc_ec = }\n") err.write(f"!> {segs_ec = !r}\n") err.write(f"!> {gsizes_ec = }\n") tsize_ec = 0 for ks in range(ns): tsize_ec += len(segs_ec[ks]) def pcterr(s, es): eps = 1 if s < es[0]: ferr = (s - es[0])/hypot(es[0], eps) elif s > es[1]: ferr = (s - es[1])/hypot(es[1], eps) else: ferr = 0 return 100*ferr # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ cbits = [] # SPS parag locus ID, badness score, total text size error, key penalty: loc_ec = f"<{loc_ec}>" terr = size_error(tsize_ch, tsize_ec) terr_str = f"({terr:+d})" cbits.append(f" {loc_ec:<10s}") cbits.append(f" {score:7.3f}") cbits.append(f" {tsize_ec:3d}") cbits.append(f"{terr_str:<6s}") cbits.append(f" {key_penalty:.3f}") cbits.append("\n") # EVA hits and gaps: hwd = 0 for hsz in hsizes_ec: hwd = max(hwd, hsz) for ig in range(ng): hit_ec = f"{segs_ec[2*ig-1]}" if ig > 0 else "" gap_ec = segs_ec[2*ig] cbits.append(" "); # The hit string hit_ec = hit_ec.ljust(hwd, " ") cbits.append(f"{hit_ec}") # The gap size error and score: gsz_ec = gsizes_ec[ig] gsz_ch = gsizes_ch[ig] assert gsz_ec == len(gap_ec); cbits.append(f" {gsz_ec:3d}") gerr_ec = size_error(gsz_ch, gsz_ec) gerr_ec_str = f"({gerr_ec:+d})" cbits.append(f"{gerr_ec_str:<6s}") gsc = bef.compute_single_gap_score(gsz_ch, "ch", gsz_ec, "ec", ig, ng) gsc_str = f" {gsc:6.3f} " cbits.append(gsc_str) # The gap text, with highlights: if hipat_ec != None: gap_ec = highlight_keywords_in_text(gap_ec, hipat_ec, "", "") cbits.append(gap_ec) cbits.append("\n") parev_str = "".join(cbits) return parev_str # ---------------------------------------------------------------------- def size_error(size_ch, size_ec): # Signed integer difference between and EVA size {size_ec} and the # value predicted from the hanzi size {size_ch}. debug = False esz_lo, esz_hi = bef.expected_size1_from_size0(size_ch, "ch", "ec") if debug: err.write(f"!: esz = {esz_lo}..{esz_hi}\n") eps = 1 if size_ec < esz_lo: size_err = size_ec - int(floor((esz_lo + esz_hi)/2)) elif size_ec > esz_hi: size_err = size_ec - int(ceil((esz_lo + esz_hi)/2)) else: size_err = 0 return size_err # ---------------------------------------------------------------------- def highlight_keywords_in_text(text, hipat, hbeg, hend): # Insert {hbeg} and {hend} around all substrings of {text} that match the # pattern {hipat}, which should not match the empty string. # # The substrings that match may overlapm but the marekrs # {hbeg} and {hend} will be simplified so that they # are never nested or tangled. # str_hi = "" # Highlighted string. end_hi = -1 # End of current highlight, or {-1}. nt = len(text) for it in range(nt + 1): matched = False # The "(?b)" says use the longest match: m = rex.match("(?b)" + hipat, text[it:], ) if m != None: end_this = it + m.end(0) if end_this > it: # Matched a non-empty prefix: matched = True if end_hi < it: str_hi += hbeg end_hi = max(end_hi, end_this) if not matched and end_hi == it: str_hi += hend if it < nt: str_hi += text[it] return str_hi # ---------------------------------------------------------------------- def format_macro_parsing_ch(loc_ch, segs_ch, hipat_ch): # Formats the macro-parsing {segs_ch} of the pure hanzi # text of an SBJ entry. Each line ends with "\n". # # If {hipat_ch} is not {None}, it must be a hanzi RE pattern # used to highlight substrings of the gaps of the macro-parsing {segs_ch}. debug = False assert segs_ch != None ns = len(segs_ch) nh = ns//2; ng = nh+1; assert ns == ng + nh tsize_ch = 0 for ks in range(ns): tsize_ch += len(segs_ch[ks]) gsizes_ch = bef.get_gap_sizes(segs_ch) hsizes_ch = bef.get_hit_sizes(segs_ch) if debug: err.write(f"!< <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n") err.write(f"!< {segs_ch = !r}\n") err.write(f"!< {tsize_ch = }\n") err.write(f"!< {gsizes_ch = }\n") err.write(f"!< {hsizes_ch = }\n") cbits = [] # SBJ entry locus ID: loc_ch = f"<{loc_ch}>" cbits.append(f" {loc_ch:<10s}") cbits.append(f" {tsize_ch:3d}") cbits.append("\n") # Gaps and hits: hwd = 0 for hs in hsizes_ch: hwd = max(hwd, hs) for ig in range(ng): hit_ch = segs_ch[2*ig-1] if ig > 0 else "" gap_ch = segs_ch[2*ig] cbits.append(" "); # Hit and gap sizes in hanzi: hsz_str = f"{hsizes_ch[ig-1]:2d}" if ig > 0 else " " cbits.append(hsz_str); gsz_str = f" {gsizes_ch[ig]:2d}" cbits.append(gsz_str); # The hit string: cbits.append(" ") hit_ch = hit_ch.ljust(hwd, " ") cbits.append(f"{hit_ch} ") # The gap string: if hipat_ch != None: gap_ch = highlight_keywords_in_text(gap_ch, hipat_ch, "", "") cbits.append(gap_ch) cbits.append("\n") entry_str = "".join(cbits) return entry_str # ---------------------------------------------------------------------- def test_stuff(): err.write("TESTING\n") err.write("----------------------------------------\n") test_other_stuff() err.write("----------------------------------------\n") return # ---------------------------------------------------------------------- def test_other_stuff(): err.write("----------------------------------------\n") gsizes_ch_str = "20..30,7,15,12..13,8" gsizes_ch = spf.parse_size_ranges(gsizes_ch_str) err.write(f"{gsizes_ch_str =!r}\n") err.write(f"{gsizes_ch =!r}\n") err.write("----------------------------------------\n") segs_ch = [ '黍米无毒', '主', '益', '气', '补中多热', '令', '人烦' ] err.write(f"{segs_ch = !r}\n") segs_ec = [ 'psheodalodarsheodalqotedyqote', 'saiin', 'okalal', 'shdy', 'otaiinshedytchodaiint', 'cheod', 'lchy' ] err.write(f"{segs_ec = !r}\n") key_penalty = 1000 score = bef.compute_full_score_from_macro_parsings(segs_ch, "ch", segs_ec, "ec", key_penalty) err.write(f"{score = :6.1f}\n") err.write("----------------------------------------\n") loc_ec = "f117r" parev = ( score, loc_ec, segs_ch, segs_ec, key_penalty ) hipat = [ 'FO+', 'nar', 'BAR', 'BIR', 'BAZ', 'cux', 'QUX' ] hipat = '|'.join(hipat) output_parev(err, parev, hipat) # Paranoia: score_check = bef.compute_full_score_from_macro_parsings(segs_ch, "ch", segs_ec, "ec", key_penalty) if score != score_check: err.write(f"{score = :24.16e}\n") err.write(f"{score_check = :24.16e}\n") assert score == score_check return # ---------------------------------------------------------------------- if len(sys.argv) == 2 and sys.argv[1] == "ANN.TEST": test_stuff()