#! /usr/bin/python3 # Last edited on 2025-08-31 22:09:16 by stolfi import sys, re, os from math import sqrt, sin, cos, log, exp, floor, pi, inf, isfinite import numpy from error_funcs import prog_error, arg_error, debug from process_funcs import bash, run_command import vms_linear_gray_image_funcs as gfn def extract_masked_pixel_spectra(C, M): # Receives a set {C} of multispectral images with shape {(ny,nx,nb,)}, # and a binary mask {M} with shape {(ny,nx,)}. # Extracts the elements where the mask image {M} is # nonzero, and returns them as a matrix with shape {(ns,nb,)}. ny, nx, nb = numpy.shape(C) if numpy.shape(M) != (ny, nx): arg_error(f"mask and clip have different sizes {(nx,ny)} {numpy.shape(M)}") # Colect the matrix {S} of spectra selected by {M}: sys.stderr.write(f"selecting relevant spectra ...\n") ns = None; # For now. S = None # For now. for ib in range(nb): samples = \ [ min(1.0, C[iy,ix,ib]) \ for iy in range(ny) \ for ix in range(nx) \ if M[iy,ix] != 0 \ ] if ns == None: ns = len(samples) S = numpy.zeros((ns,nb,)) else: assert len(samples) == ns, "bug inconsitent sample counts" S[:,ib] = numpy.array(samples) fs = ns/(ny*nx) sys.stderr.write(f"selected {ns} pixel samples from {nx*ny} ({fs:.4f})\n") return S # ---------------------------------------------------------------------- def get_pixel_cloud_mean(S): # Expects {S} to be a matrix of pixel spectra. It must have shape # {ns,nb}, where {ns} is the number of relevant pixels and {nb} is the # number of spectral bands. # # Returns the average {savg} of all the spectra. ns, nb = numpy.shape(S) # Compute the barycenter (mean) {savg} of the spectra: sys.stderr.write(f"computing mean of spectra ...\n") savg = numpy.sum(S, 0)/numpy.size(S, 0) debug("max(abs(savg))", numpy.max(numpy.abs(savg))) assert numpy.shape(savg) == (nb,), "bug: savg shape" return savg # ---------------------------------------------------------------------- def get_pixel_cloud_axes(S, sctr): # Expects {S} to be a matrix of pixel spectra. It must have shape # {ns,nb}, where {ns} is the number of relevant pixels and {nb} is the # number of spectral bands. # # Computes the covariance matrix {V} of those spectra, with shape {(nb,nb)}, # relative to the given center {sctr}. Then computes the eigenvectors of # {V} as the COLUMNS of a matrix {evec} with shape {(nb,nb)}, and the # vector of the respective eigenvalues {eval}. # # Returns {evec, eval}. ns, nb = numpy.shape(S) # Compute {D} whose rows are the displacements from barycenter: sys.stderr.write(f"subtracting given center from selected spactra ...\n") D = numpy.array([ S[ks,:] - sctr for ks in range(ns) ]) debug("max(abs(D))", numpy.max(numpy.abs(D))) # Compute the covariance matrix {V=D'*D}: V = (D.transpose() @ D) / float(ns) debug("max(V)", numpy.max(V)) # Compute the eigenvalues {evec} and eigenvectors {eval}: sys.stderr.write(f"computing eigenvalues of covariance matrix ...\n") eig = numpy.linalg.eigh(V) eval = eig.eigenvalues sys.stderr.write(f"eigenvalues:\n{eval}\n"); assert numpy.shape(eval) == (nb,), "bug: eigenvalues shape" evec = eig.eigenvectors sys.stderr.write(f"eigenvectors:\n{evec}\n") assert numpy.shape(evec) == (nb,nb,), "bug: eigenvectors shape" return evec, eval # ---------------------------------------------------------------------- def adjust_luminance(C, M, mlum_frac, mlum_lo, mlum_hi, mlum_min, mlum_max): # Given a multispectral set {C} of images as an array with shape {(ny,nx,nb,)}, # adjusts the luminance component (mean value) of all pixel spectra, in a uniform # way, so that the luminances of most of the pixels selected by the # mask {M} are affinely mapped to the range {{mlum_lo _ mlum_hi]}, and # the luminances of all pixels that would fall outside the range # {[mlum_min _ mlum_max]} are smoothly clipped to that range. ny, nx, nb = numpy.shape(C) # Now we decompose each residual pixel spectrum {cpix} into its # average value {clum} plus a vector {cchr} with zero sum, and map it # to {hpix = hlum + cchr} where {hlum} is {clum} scaled and shifted so # that it is between {mmin} and {mmax}. sys.stderr.write("computing luminances of selected pixels ...\n") clums = [] # Average spectral values of all pixels for iy in range(ny): for ix in range(nx): if M[iy,ix] != 0: cpix = C[iy,ix,:] clums.append(numpy.mean(cpix)) clum_lo = numpy.quantile(numpy.array(clums), mlum_frac, axis=0) clum_hi = numpy.quantile(numpy.array(clums), 1-mlum_frac, axis=0) sys.stderr.write("range of {clum} before shift = ") sys.stderr.write("[%+10.6f _ +10.6f]}\n" % (clum_lo, clum_hi)) S = numpy.zeros((ny,nx,nb,)) sys.stderr.write("adjusting luminances of all pixels ...\n") for iy in range(ny): for ix in range(nx): cpix = C[iy,ix,:] clum = numpy.mean(cpix) cchr = cpix - numpy.full(clum) slum = gfn.map_value_squashed(clum, clum_lo, clum_hi, mlum_lo, mlum_hi, mlum_min, mlum_max) spix = numpy.full(slum) + cchr S[iy,ix,:] = spix return S # ---------------------------------------------------------------------- def read_mask_analysis_data(page, clip, mask): # Reads the bands list {bands}, illumination types {illums}, wavelengths {wlens}, # and the analysis data {savg,smin,smax,rdev,tdev,P} for the given {page}, {clip}, # and each {mask}. # # Returns {bands,illums,wlens} as lists with length {nb}, # {savg,smin,smax,rdev,tdev} as arrays with shape {(nb,)}, # and {P} as an array with shape {nb,ne,)}}. # nb = None # For now, reset later. ne = None # For now, reset later. bands = [] illums = [] wlens = [] savg = []; smin = []; smax = []; rdev = []; tdev = []; P = []; afile = f"pages/{page}/clips/{clip}/masks/{mask}/analysis.txt" sys.stderr.write(f"reading analysis data from {afile} ...\n") with open(afile, "r") as rd: if nb == None: nb = afn.read_int_param(rd, "nb") ne = afn.read_int_param(rd, "ne") else: assert afn.read_int_param(rd, "nb") == nb, "inconsistent nb" assert afn.read_int_param(rd, "ne") == ne, "inconsistent ne" for ib in range(nb): line = rd.readline().strip() line = re.sub(r'[ ][ ]+', " ", line) flds = line.split(' ') assert len(flds) == 3 + 5 + ne, f"bad format: {len(flds)} fields" ifld = 0 band_ib = flds[0].strip(); ifld += 1; bands.append(band_ib) illum_ib = int(flds[ifld].strip()); illums.append(illum_ib); ifld += 1 wlen_ib = float(flds[ifld].strip()); wlens.append(wlen_ib); ifld += 1 savg_ib = float(flds[ifld].strip()); savg.append(savg_ib); ifld += 1 smin_ib = float(flds[ifld].strip()); smin.append(smin_ib); ifld += 1 smax_ib = float(flds[ifld].strip()); smax.append(smax_ib); ifld += 1 rdev_ib = float(flds[ifld].strip()); rdev.append(rdev_ib); ifld += 1 tdev_ib = float(flds[ifld].strip()); tdev.append(tdev_ib); ifld += 1 P_ib = [] for ie in range(ne): P_ib_ie = float(flds[ifld].strip()); P_ib.append(P_ib_ie); ifld += 1 P.append(P_ib) rd.close() savg = numpy.array(savg); smin = numpy.array(smin); smax = numpy.array(smax); rdev = numpy.array(rdev); tdev = numpy.array(tdev); P = numpy.array(P); return bands, illums, wlens, savg, smin, smax, rdev, tdev, P # ---------------------------------------------------------------------- def read_mask_list_analysis_data(page, clip, masks): # Reads the bands list {bands}, illumination types {illums}, wavelengths {wlens}, # and the analysis data {savg,smin,smax,rdev,tdev,P} for the given {page}, {clip}, # and each {mask} in {masks[0:nm]}. # # Returns {bands,illums,wlens} as lists with length {nb}, # {savg,smin,smax,rdev,tdev} as arrays with shape {(nb,nm,)}, # and {P} as an array with shape {nb,ne,nm,)}}. # nm = len(masks) nb = None # For now, reset later. ne = None # For now, reset later. bands = None; # Reset later illums = None; # Reset later wlens = None; # Reset later savg = None; smin = None; smax = None; rdev = None; tdev = None; P = None; for im in range (nm): mask = masks[im] bands_im, illums_im, wlens_im, \ savg_im, smin_im, smax_im, rdev_im, tdev_im, P_im = \ afn.read_mask_analysis_data(page, clip, mask) if nb == None: nb = len(bands_im) ne = numpy.size(P_im, 1) bands = bands_im; illums = illums_im; wlens = wlens_im savg = numpy.zeros((nb,nm,)); smin = numpy.zeros((nb,nm,)); smax = numpy.zeros((nb,nm,)); rdev = numpy.zeros((nb,nm,)); tdev = numpy.zeros((nb,nm,)); P = numpy.zeros((nb,ne,nm,)); else: assert bands == bands_im, "bug inconsistent bands" assert illums == illums_im, "bug inconsistent illums" assert wlens == wlens_im, "bug inconsistent wlens" savg[:,im] = savg_im; smin[:,im] = smin_im; smax[:,im] = smax_im; rdev[:,im] = rdev_im; tdev[:,im] = tdev_im; P[:,:,im] = P_im; return bands, illums, wlens, savg, smin, smax, rdev, tdev, P # ---------------------------------------------------------------------- def read_int_param(rd, name): # Reads one line from file {ird} and parses it as "{name} = {val}" # where {val} is an integer. Returns {val}. # line = rd.readline().strip() m = re.fullmatch((name + r" *[=] *([0-9]+) *"), line) assert m != None, f"bad format: {name} {line}" return int(m.group(1)) # ----------------------------------------------------------------------