#! /usr/bin/python3 # Last edited on 2026-03-06 22:48:34 by stolfi import sys, os, re from sys import stdout as out, stderr as err from math import log, sqrt, hypot, floor, ceil, inf, pow import numpy as np import vms_linear_gray_image_funcs as gf from process_funcs import bash from error_funcs import file_line_error # !!! Rethink averages now that we have nominal unit sizes. !!! def qdif(rsz1, avg1, rsz2, avg2): # Quadratic discrepancy between parag sizes {rsz1,rsz2} considering that # the respective average values should be {avg1,avg2}. # # The result is zero if both parag sizes have the same ratio to the # respective averages. # assert rsz1 > 0 and rsz2 > 0, "invalid parag sizes" fw1 = rsz1/avg1 - avg1/rsz1 fw2 = rsz2/avg2 - avg2/rsz2 d = fw1 - fw2; d2 = d*d return d2 # ---------------------------------------------------------------------- def importance(rsz1, avg1, rsz2, avg2): # The relative importance of matching parag sizes {rsz1,rsz2} considering that # the respective average values should be {avg1,avg2}. # The sizes whould have been scaled to equivalent hanzi counts. # The importance gets higher if either size is away from the # respective average. assert rsz1 > 0 and rsz2 > 0, "invalid parag sizes" fw1 = rsz1/avg1 - avg1/rsz1 fw2 = rsz2/avg2 - avg2/rsz2 imp = hypot(1, hypot(fw1, fw2)); return imp # ---------------------------------------------------------------------- def main(ivp_name1, utype1, ivp_name2, utype2): loc_list1, rsz_list1, avg_rsz1 = read_units_per_parag(f"out/{ivp_name1}.upp", utype1) npara1 = len(rsz_list1) loc_list2, rsz_list2, avg_rsz2 = read_units_per_parag(f"out/{ivp_name2}.upp", utype2) npara2 = len(rsz_list2) eps = 1/hypot(avg_rsz1, avg_rsz2) # Create image {D} such that {D[i1,i2]} is the similarty (in [0_1]) of # parag {i1} of book 1 and parag {i2} of book 2: err.write("computing the basic similarity matrix {D} ...\n") D = np.zeros((npara1,npara2)) for i1 in range(npara1): for i2 in range(npara2): qd = qdif(rsz_list1[i1], avg_rsz1, rsz_list2[i2], avg_rsz2) sim = 1/(1 + qd) D[i1,i2] = pow(sim, 8) dfile = f"res/{ivp_name1}-{ivp_name2}-rdif.png" gf.write_image_as_gray_png(dfile, D) bash(f"display -resize '200%' {dfile}") # Create an image {E} derived from {D} but taking adjacency into account: err.write("computing the basic similarity matrix {D} ...\n") E = np.zeros((npara1,npara2)); for i1 in range(1,npara1-1): for i2 in range(1,npara2-1): edif = max(D[i1-1,i2-1]*D[i1,i2], D[i1+1,i2+1]*D[i1,i2]) E[i1,i2] = edif efile = f"res/{ivp_name1}-{ivp_name2}-edif.png" gf.write_image_as_gray_png(efile, E) bash(f"display -resize '200%' {efile}") # Try to find a min weight matching, where the weight of pairing {i1} with {i2} # is proportional to the size of the parags times the abs difference of size, # corrected by the averages. passes = 20 K12, M, P = find_best_matching(rsz_list1, avg_rsz1, loc_list1, rsz_list2, avg_rsz2, loc_list2, D, passes) mfile = f"res/{ivp_name1}-{ivp_name2}-mval-{passes:02d}.png" gf.write_image_as_gray_png(mfile, M) bash(f"display -resize '200%' {mfile}") pfile = f"res/{ivp_name1}-{ivp_name2}-mper-{passes:02d}.png" gf.write_image_as_gray_png(pfile, P) bash(f"display -resize '200%' {pfile}") return # ---------------------------------------------------------------------- def find_best_matching(rsz_list1, avg_rsz1, loc_list1, rsz_list2, avg_rsz2, loc_list2, D, passes): # Tries to assign to each parag of book 1 a parag of book 2 in a way # that minimizes the total quadratic discrepancy weight # plus jump weights. # # Returns an array {K12[0..npara1]} that specifies that parag {i1} # of book 1 should be macthed to parag {i2 = K12[i1]} of book 2. # Also returns an array {M} that is {D} with columns permuted # as specified by {K12}. npara1 = len(rsz_list1) npara2 = len(rsz_list2) assert npara1 <= npara2, "should match smaller book to larger book" def mismatch_weight(i1, i2): # Quadratic mismatch term for matching parag {i1} of file 1 with parag {i2 = K12[i1]} of file 2: nonlocal npara1, rsz_list1, avg_rsz1, npara2, rsz_list2, avg_rsz2 assert i1 >= 0 and i1 < npara1, f"invalid index {i1 = }" assert i2 >= 0 and i2 < npara2, f"invalid index {i1 = }" imp = importance(rsz_list1[i1], avg_rsz1, rsz_list2[i2], avg_rsz2) qd = qdif(rsz_list1[i1], avg_rsz1, rsz_list2[i2], avg_rsz2) wm = imp*qd return wm # .................................................................... # Throw an initial matching: K12 = [ None ] * npara1 # The matching is of {loc_list1[i1]} to {loc_list2[K12[i1]]}. def jumps_at(ip): # Returns 0 if parags {ip} and {ip+1} of book 1 are mapped to # successive parags of book 2; else 1. # Returns 0 also if either of those two parags don't exist. nonlocal K12 return 0 if ip < 0 or ip+1 >= npara1 or K12[ip+1] == K12[ip]+1 else 1 # .................................................................... jump_wt = 0.01 def jump_weight(i1, j1): # Weight term due to jump of the current matchings # of {i1} and {j1} with the matchings of their neighbors. # We get a jump point when two successive parags of book 1 # are matched to successive parags of book 2. nonlocal jump_wt assert i1 != j1 nj = 0 # Number of jumps around {i1} and {j1}. nj += jumps_at(i1) + jumps_at(j1) if j1 != i1 + 1: nj += jumps_at(j1-1) if i1 != j1 + 1: nj += jumps_at(i1-1) return nj * jump_wt # .................................................................... def weight_gain_of_swap(i1, j1): # How much the total weight changes if we swap {K12[i1]} and {K12[j1]}. # Weight of current matching, for those two edges: w_curr = mismatch_weight(i1, K12[i1]) + mismatch_weight(j1, K12[j1]) + jump_weight(i1, j1) # Perform the swap temporarily: K12[i1],K12[j1] = K12[j1],K12[i1] # Weight of modified matching, for those two edges: w_swap = mismatch_weight(i1, K12[i1]) + mismatch_weight(j1, K12[j1]) + jump_weight(i1, j1) # Undo the swap for now: K12[i1],K12[j1] = K12[j1],K12[i1] return w_swap - w_curr # .................................................................... # Create an initial match that pairs parags by size rank: J1 = [ (i1, rsz_list1[i1],) for i1 in range(npara1) ] J2 = [ (i2, rsz_list2[i2],) for i2 in range(npara2) ] for b in range(2): J, rsz_list, avg, loc_list = ((J1, rsz_list1, avg_rsz1, loc_list1), (J2, rsz_list2, avg_rsz2, loc_list2))[b] J.sort(key = lambda x: x[1]) err.write("most extreme parags in book {b}:\n") for k in range(5): i = J[k][0]; err.write(f" {i:3d} {loc_list[i]:<12s} {rsz_list[i]:3d}\n") err.write("...\n") for k in range(5): i = J[npara1-6+k][0]; err.write(f" {i:3d} {loc_list[i]:<12s} {rsz_list[i]:3d}\n") err.write("\n") for k1 in range(npara1): i1 = J1[k1][0]; k2 = k1*(npara2-1)//(npara1-1) i2 = J2[k2][0] K12[i1] = i2 err.write("initial matching:\n") for i1 in range(npara1): i2 = K12[i1] err.write(f" {i1:3d} : {rsz_list1[i1]:2d} -> {i2:3d} : {rsz_list2[i2]:2d}\n") improve_matching(K12, weight_gain_of_swap, passes) # Compute {M} a col-perm of {D} with matching parags on the diagonal: M = np.zeros((npara1,npara1)); for i1 in range(npara1): i2 = K12[i1] for k1 in range(npara1): M[k1,i1] = D[k1,i2] # Create matrix P that shows the permutation: P = np.zeros((npara1,npara2)) for i1 in range(npara1): i2 = K12[i1] P[i1,i2] = 1 # Print runs of consecutive matches: err.write("consecutive matching runs:\n") prt = [ False ] * npara1 # {prt[i1]} is true if parag {i1} was listed already. for i1 in range(npara1-1): if not prt[i1] and K12[i1+1] == K12[i1]+1: # Start of a run: j1 = i1; while j1 < npara1 and (j1 == i1 or K12[j1-1] == K12[j1]-1): j2 = K12[j1] qwt = mismatch_weight(j1, j2) fwp1 = rsz_list1[j1]/avg_rsz1 fwp2 = rsz_list2[j2]/avg_rsz2 err.write(f"{loc_list1[j1]:<12s} {rsz_list1[j1]:2d} {fwp1:5.3f} -> {loc_list2[j2]:<12s} {rsz_list2[j2]:2d} {fwp2:5.3f} w = {qwt:10.4f}\n") prt[j1] = True j1 += 1 err.write("\n") return K12, M, P # ---------------------------------------------------------------------- def improve_matching(K12, weight_gain_of_swap, passes): # Tries to improve the matching {K12} by swapping matching parags. # The function {weight_gain_of_swap(i1,j1)} shoudl return the change in the total weight # if one were to swap {K12[i1]} and {K12[j1]} npara1 = len(K12) # Try to swap matchings to reduce the total weight: for p in range(passes): err.write(f"pass {p} ...\n") npair = 0 # Number of pairs tried in pass. nswap = 0 # Number of swaps done in pass. for i1 in range(npara1): for j1 in range(i1+1,npara1): dbg = (npair < 10) npair += 1 i2 = K12[i1]; j2 = K12[j1] if dbg: err.write(f" swapping {i1:3d}->{i2:3d}, {j1:3d}->{j2:3d}") err.write(f" for {i1:3d}->{j2:3d}, {j1:3d}->{i2:3d}") assert i1 != j1 and i2 != j2 assert i1 >= 0 and i1 < npara1 assert j1 >= 0 and j1 < npara1 # Try to swap matchings of {i1} and {j1}: w_gain = weight_gain_of_swap(i1, j1) if dbg: err.write(f" {w_gain = :10.4f}") if w_gain < 0: # Swap the matchings: if dbg: err.write(f" (swapped)") K12[i1],K12[j1] = K12[j1],K12[i1] nswap += 1 if dbg: err.write(f"\n") err.write(f"{nswap:8d} swaps executed in {npair:8d} attempts\n") return # ---------------------------------------------------------------------- def read_units_per_parag(fname, utype): # Reads file {fname}, assumed to be an ".upp" file with format # "{LOC} {UCOUNT}" # that maps locus ID {LOC} to the count of units of type {UTYPE} # in that parag. # # Returns {loc_list,rsz_list,avg_rsz} where # # {loc_list} is a list of all locus IDs read, # # {rsz_list} is a matching list with the sizes of those parags, # # listed in that # file converted to the assumed (fractional) equivalents in hanzi chars. rd = open(fname, "r") rd.reconfigure(encoding='utf-8') unit_size = spf.hanzi_per_unit(utype) # Nominal avg num of Chinese chars per unit. err.write(f"assumed avg units per recipe = {unit_size:8.2f}\n") nread = 0 # Number of lines read from file. rsz_list = [] loc_list = [] tot_nun = 0 sum_rsz = 0 max_nun = -inf min_nun = +inf while True: line = rd.readline() if line == "": # End of file: break nread += 1 line = line.strip() if re.match(r"[ ]*([#]|$)", line): continue m = re.match(r" *([a-z0-9.;]+)[ ]+([0-9]+)", line) if m == None: file_line_error(fname,nread,"bad format",line) loc_k = m.group(1) nun_k = int(m.group(2)) rsz_k = nun_k*unit_size loc_list.append(loc_k) rsz_list.append(rsz_k) tot_nun += nun_k sum_rsz += rsz_k max_nun = max(max_nun, nun_k) min_nun = min(min_nun, nun_k) npara = npara avg_rsz = sum_rsz/npara err.write(f"{nread:5d} lines read from {fname}\n") err.write(f"{npara:5d} parag sizes found\n") err.write(f"{tot_nun:5d} total raw unit count\n") err.write(f"{tot_nun/npara:8.3f} average raw unit count per parag\n") err.write(f"parag raw unit count range {min_nun}..{max_nun}\n") err.write(f"{avg_rsz:8.3f} average hanzi size per parag\n") err.write("\n") return loc_list, rsz_list, avg_rsz # ---------------------------------------------------------------------- narg = len(sys.argv) iarg = 1 ivp_name1 = sys.argv[iarg]; iarg += 1 utype1 = sys.argv[iarg]; iarg += 1 ivp_name2 = sys.argv[iarg]; iarg += 1 utype2 = sys.argv[iarg]; iarg += 1 if iarg != narg: arg_error("too many arguments") main(ivp_name1, utype1, ivp_name2, utype2)