| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012 |
- import os
- import time
- import concurrent.futures
- from tqdm import tqdm
- import cv2
- from PIL import Image
- import numpy as np
- from scipy.ndimage import convolve, distance_transform_edt as bwdist
- from skimage.morphology import skeletonize
- from skimage.morphology import disk
- from skimage.measure import label
- _EPS = np.spacing(1)
- _TYPE = np.float64
- def load_single_image_pair(args):
- gt_path, pred_path, idx = args
-
- try:
- pred = pred_path[:-4] + '.png'
- valid_extensions = ['.png', '.jpg', '.PNG', '.JPG', '.JPEG']
- file_exists = False
- for ext in valid_extensions:
- if os.path.exists(pred[:-4] + ext):
- pred = pred[:-4] + ext
- file_exists = True
- break
- if not file_exists:
- print(f'Not exists: {pred}')
- return None
- gt_ary = cv2.imread(gt_path, cv2.IMREAD_GRAYSCALE)
- pred_ary = cv2.imread(pred, cv2.IMREAD_GRAYSCALE)
- if gt_ary is None or pred_ary is None:
- return None
- pred_ary = cv2.resize(pred_ary, (gt_ary.shape[1], gt_ary.shape[0]))
- return {
- 'idx': idx,
- 'gt': gt_ary,
- 'pred': pred_ary,
- 'gt_path': gt_path,
- 'pred_path': pred
- }
- except Exception as e:
- print(f"Error loading {gt_path}: {e}")
- return None
- def load_images_parallel(gt_paths, pred_paths, num_workers, verbose):
- args_list = [(gt_path, pred_path, idx)
- for idx, (gt_path, pred_path) in enumerate(zip(gt_paths, pred_paths))]
-
- with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
- if verbose:
- results = list(tqdm(executor.map(load_single_image_pair, args_list),
- total=len(args_list), desc="Loading images"))
- else:
- results = list(executor.map(load_single_image_pair, args_list))
-
- # 过滤掉加载失败的图像
- valid_results = [r for r in results if r is not None]
- return valid_results
- def process_metrics_single(data, measures, metrics):
- gt_ary = data['gt']
- pred_ary = data['pred']
- gt_path = data['gt_path']
- if 'E' in metrics:
- measures['E'].step(pred=pred_ary, gt=gt_ary)
- if 'S' in metrics:
- measures['S'].step(pred=pred_ary, gt=gt_ary)
- if 'F' in metrics:
- measures['F'].step(pred=pred_ary, gt=gt_ary)
- if 'MAE' in metrics:
- measures['MAE'].step(pred=pred_ary, gt=gt_ary)
- if 'MSE' in metrics:
- measures['MSE'].step(pred=pred_ary, gt=gt_ary)
- if 'WF' in metrics:
- measures['WF'].step(pred=pred_ary, gt=gt_ary)
- if 'HCE' in metrics:
- # HCE需要特殊处理
- ske_path = gt_path.replace('/gt/', '/ske/')
- if os.path.exists(ske_path):
- ske_ary = cv2.imread(ske_path, cv2.IMREAD_GRAYSCALE)
- ske_ary = ske_ary > 128
- else:
- ske_ary = skeletonize(gt_ary > 128)
- ske_save_dir = os.path.join(*ske_path.split(os.sep)[:-1])
- if ske_path[0] == os.sep:
- ske_save_dir = os.sep + ske_save_dir
- os.makedirs(ske_save_dir, exist_ok=True)
- cv2.imwrite(ske_path, ske_ary.astype(np.uint8) * 255)
- measures['HCE'].step(pred=pred_ary, gt=gt_ary, gt_ske=ske_ary)
- if 'MBA' in metrics:
- measures['MBA'].step(pred=pred_ary, gt=gt_ary)
- if 'BIoU' in metrics:
- measures['BIoU'].step(pred=pred_ary, gt=gt_ary)
- return measures
- def process_with_measures(args):
- """为并行处理创建独立的 measures 对象"""
- data, metrics = args
-
- # 创建临时的 measures 对象
- temp_measures = {}
- if 'E' in metrics:
- temp_measures['E'] = EMeasure()
- if 'S' in metrics:
- temp_measures['S'] = SMeasure()
- if 'F' in metrics:
- temp_measures['F'] = FMeasure()
- if 'MAE' in metrics:
- temp_measures['MAE'] = MAEMeasure()
- if 'MSE' in metrics:
- temp_measures['MSE'] = MSEMeasure()
- if 'WF' in metrics:
- temp_measures['WF'] = WeightedFMeasure()
- if 'HCE' in metrics:
- temp_measures['HCE'] = HCEMeasure()
- if 'MBA' in metrics:
- temp_measures['MBA'] = MBAMeasure()
- if 'BIoU' in metrics:
- temp_measures['BIoU'] = BIoUMeasure()
-
- # 处理单个图像
- process_metrics_single(data, temp_measures, metrics)
- return temp_measures
- def process_metrics_batch(image_data, measures, metrics, verbose, num_workers=10):
- if num_workers:
- # 并行处理
- num_workers = min(8, len(image_data))
- args_list = [(data, metrics) for data in image_data]
-
- with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
- if verbose:
- results = list(tqdm(executor.map(process_with_measures, args_list),
- total=len(args_list), desc="Computing metrics"))
- else:
- results = list(executor.map(process_with_measures, args_list))
-
- # 合并结果到主 measures
- for temp_measures in results:
- if temp_measures is not None:
- for metric_name in metrics:
- if metric_name in measures and metric_name in temp_measures:
- # 直接合并各个指标的列表
- if hasattr(measures[metric_name], 'sms'):
- measures[metric_name].sms.extend(temp_measures[metric_name].sms)
- if hasattr(measures[metric_name], 'maes'):
- measures[metric_name].maes.extend(temp_measures[metric_name].maes)
- if hasattr(measures[metric_name], 'mses'):
- measures[metric_name].mses.extend(temp_measures[metric_name].mses)
- if hasattr(measures[metric_name], 'adaptive_fms'):
- measures[metric_name].adaptive_fms.extend(temp_measures[metric_name].adaptive_fms)
- measures[metric_name].precisions.extend(temp_measures[metric_name].precisions)
- measures[metric_name].recalls.extend(temp_measures[metric_name].recalls)
- measures[metric_name].changeable_fms.extend(temp_measures[metric_name].changeable_fms)
- if hasattr(measures[metric_name], 'adaptive_ems'):
- measures[metric_name].adaptive_ems.extend(temp_measures[metric_name].adaptive_ems)
- measures[metric_name].changeable_ems.extend(temp_measures[metric_name].changeable_ems)
- if hasattr(measures[metric_name], 'weighted_fms'):
- measures[metric_name].weighted_fms.extend(temp_measures[metric_name].weighted_fms)
- if hasattr(measures[metric_name], 'hces'):
- measures[metric_name].hces.extend(temp_measures[metric_name].hces)
- if hasattr(measures[metric_name], 'bas'):
- measures[metric_name].bas.extend(temp_measures[metric_name].bas)
- # MBA 需要合并累积统计
- measures[metric_name].all_h += temp_measures[metric_name].all_h
- measures[metric_name].all_w += temp_measures[metric_name].all_w
- measures[metric_name].all_max += temp_measures[metric_name].all_max
- if hasattr(measures[metric_name], 'bious'):
- measures[metric_name].bious.extend(temp_measures[metric_name].bious)
- else:
- # 串行处理
- iterator = tqdm(image_data, desc="Computing metrics") if verbose else image_data
-
- for data in iterator:
- measures = process_metrics_single(data, measures, metrics)
-
- return measures
- def collect_results(measures, metrics):
- em = measures['E'].get_results()['em'] if 'E' in metrics else {'curve': np.array([np.float64(-1)]), 'adp': np.float64(-1)}
- sm = measures['S'].get_results()['sm'] if 'S' in metrics else np.float64(-1)
- fm = measures['F'].get_results()['fm'] if 'F' in metrics else {'curve': np.array([np.float64(-1)]), 'adp': np.float64(-1)}
- mae = measures['MAE'].get_results()['mae'] if 'MAE' in metrics else np.float64(-1)
- mse = measures['MSE'].get_results()['mse'] if 'MSE' in metrics else np.float64(-1)
- wfm = measures['WF'].get_results()['wfm'] if 'WF' in metrics else np.float64(-1)
- hce = measures['HCE'].get_results()['hce'] if 'HCE' in metrics else np.float64(-1)
- mba = measures['MBA'].get_results()['mba'] if 'MBA' in metrics else np.float64(-1)
- biou = measures['BIoU'].get_results()['biou'] if 'BIoU' in metrics else {'curve': np.array([np.float64(-1)])}
- return (em, sm, fm, mae, mse, wfm, hce, mba, biou)
- def evaluator(gt_paths, pred_paths, metrics=['S', 'MAE', 'E', 'F', 'WF', 'MBA', 'BIoU', 'MSE', 'HCE'], verbose=False, num_workers=8):
- start_time = time.time()
- measures = {}
- if 'E' in metrics:
- measures['E'] = EMeasure()
- if 'S' in metrics:
- measures['S'] = SMeasure()
- if 'F' in metrics:
- measures['F'] = FMeasure()
- if 'MAE' in metrics:
- measures['MAE'] = MAEMeasure()
- if 'MSE' in metrics:
- measures['MSE'] = MSEMeasure()
- if 'WF' in metrics:
- measures['WF'] = WeightedFMeasure()
- if 'HCE' in metrics:
- measures['HCE'] = HCEMeasure()
- if 'MBA' in metrics:
- measures['MBA'] = MBAMeasure()
- if 'BIoU' in metrics:
- measures['BIoU'] = BIoUMeasure()
- if isinstance(gt_paths, list) and isinstance(pred_paths, list):
- assert len(gt_paths) == len(pred_paths)
- if num_workers is None:
- num_workers = 1
- use_parallel = len(gt_paths) > 50 and num_workers > 1
- if use_parallel:
- if verbose:
- print(f"Loading {len(gt_paths)} images in parallel with {num_workers} workers...")
- image_data = load_images_parallel(gt_paths, pred_paths, num_workers, verbose)
- if verbose:
- print(f"Successfully loaded {len(image_data)}/{len(gt_paths)} images")
- measures = process_metrics_batch(image_data, measures, metrics, verbose, num_workers)
- else:
- iterator = tqdm(range(len(gt_paths)), total=len(gt_paths), desc="Loading images and computing metrics") if verbose else range(len(gt_paths))
- for idx_sample in iterator:
- gt = gt_paths[idx_sample]
- pred = pred_paths[idx_sample]
- pred = pred[:-4] + '.png'
- valid_extensions = ['.png', '.jpg', '.PNG', '.JPG', '.JPEG']
- file_exists = False
- for ext in valid_extensions:
- if os.path.exists(pred[:-4] + ext):
- pred = pred[:-4] + ext
- file_exists = True
- break
-
- if not file_exists:
- print('Not exists:', pred)
- continue
- gt_ary = cv2.imread(gt, cv2.IMREAD_GRAYSCALE)
- pred_ary = cv2.imread(pred, cv2.IMREAD_GRAYSCALE)
-
- if gt_ary is None or pred_ary is None:
- continue
-
- pred_ary = cv2.resize(pred_ary, (gt_ary.shape[1], gt_ary.shape[0]))
- if 'E' in metrics:
- measures['E'].step(pred=pred_ary, gt=gt_ary)
- if 'S' in metrics:
- measures['S'].step(pred=pred_ary, gt=gt_ary)
- if 'F' in metrics:
- measures['F'].step(pred=pred_ary, gt=gt_ary)
- if 'MAE' in metrics:
- measures['MAE'].step(pred=pred_ary, gt=gt_ary)
- if 'MSE' in metrics:
- measures['MSE'].step(pred=pred_ary, gt=gt_ary)
- if 'WF' in metrics:
- measures['WF'].step(pred=pred_ary, gt=gt_ary)
- if 'HCE' in metrics:
- ske_path = gt.replace('/gt/', '/ske/')
- if os.path.exists(ske_path):
- ske_ary = cv2.imread(ske_path, cv2.IMREAD_GRAYSCALE)
- ske_ary = ske_ary > 128
- else:
- ske_ary = skeletonize(gt_ary > 128)
- ske_save_dir = os.path.join(*ske_path.split(os.sep)[:-1])
- if ske_path[0] == os.sep:
- ske_save_dir = os.sep + ske_save_dir
- os.makedirs(ske_save_dir, exist_ok=True)
- cv2.imwrite(ske_path, ske_ary.astype(np.uint8) * 255)
- measures['HCE'].step(pred=pred_ary, gt=gt_ary, gt_ske=ske_ary)
- if 'MBA' in metrics:
- measures['MBA'].step(pred=pred_ary, gt=gt_ary)
- if 'BIoU' in metrics:
- measures['BIoU'].step(pred=pred_ary, gt=gt_ary)
- results = collect_results(measures, metrics)
-
- if verbose:
- total_time = time.time() - start_time
- processing_method = "parallel" if use_parallel else "serial"
- print(f"Evaluation completed in {total_time:.2f}s using {processing_method} processing")
-
- return results
- ## >>>>>>>>>>>> Definitions of metrics:
- def _prepare_data(pred: np.ndarray, gt: np.ndarray) -> tuple:
- gt = gt > 128
- pred = pred / 255
- if pred.max() != pred.min():
- pred = (pred - pred.min()) / (pred.max() - pred.min())
- return pred, gt
- def _get_adaptive_threshold(matrix: np.ndarray, max_value: float = 1) -> float:
- return min(2 * matrix.mean(), max_value)
- class FMeasure(object):
- def __init__(self, beta: float = 0.3):
- self.beta = beta
- self.precisions = []
- self.recalls = []
- self.adaptive_fms = []
- self.changeable_fms = []
- def step(self, pred: np.ndarray, gt: np.ndarray):
- pred, gt = _prepare_data(pred, gt)
- adaptive_fm = self.cal_adaptive_fm(pred=pred, gt=gt)
- self.adaptive_fms.append(adaptive_fm)
- precisions, recalls, changeable_fms = self.cal_pr(pred=pred, gt=gt)
- self.precisions.append(precisions)
- self.recalls.append(recalls)
- self.changeable_fms.append(changeable_fms)
- def cal_adaptive_fm(self, pred: np.ndarray, gt: np.ndarray) -> float:
- adaptive_threshold = _get_adaptive_threshold(pred, max_value=1)
- binary_predcition = pred >= adaptive_threshold
- area_intersection = binary_predcition[gt].sum()
- if area_intersection == 0:
- adaptive_fm = 0
- else:
- pre = area_intersection / np.count_nonzero(binary_predcition)
- rec = area_intersection / np.count_nonzero(gt)
- adaptive_fm = (1 + self.beta) * pre * rec / (self.beta * pre + rec)
- return adaptive_fm
- def cal_pr(self, pred: np.ndarray, gt: np.ndarray) -> tuple:
- pred = (pred * 255).astype(np.uint8)
- bins = np.linspace(0, 256, 257)
- fg_hist, _ = np.histogram(pred[gt], bins=bins)
- bg_hist, _ = np.histogram(pred[~gt], bins=bins)
- fg_w_thrs = np.cumsum(np.flip(fg_hist), axis=0)
- bg_w_thrs = np.cumsum(np.flip(bg_hist), axis=0)
- TPs = fg_w_thrs
- Ps = fg_w_thrs + bg_w_thrs
- Ps[Ps == 0] = 1
- T = max(np.count_nonzero(gt), 1)
- precisions = TPs / Ps
- recalls = TPs / T
- numerator = (1 + self.beta) * precisions * recalls
- denominator = np.where(numerator == 0, 1, self.beta * precisions + recalls)
- changeable_fms = numerator / denominator
- return precisions, recalls, changeable_fms
- def get_results(self) -> dict:
- adaptive_fm = np.mean(np.array(self.adaptive_fms, _TYPE))
- changeable_fm = np.mean(np.array(self.changeable_fms, dtype=_TYPE), axis=0)
- precision = np.mean(np.array(self.precisions, dtype=_TYPE), axis=0) # N, 256
- recall = np.mean(np.array(self.recalls, dtype=_TYPE), axis=0) # N, 256
- return dict(fm=dict(adp=adaptive_fm, curve=changeable_fm),
- pr=dict(p=precision, r=recall))
- class MAEMeasure(object):
- def __init__(self):
- self.maes = []
- def step(self, pred: np.ndarray, gt: np.ndarray):
- pred, gt = _prepare_data(pred, gt)
- mae = self.cal_mae(pred, gt)
- self.maes.append(mae)
- def cal_mae(self, pred: np.ndarray, gt: np.ndarray) -> float:
- mae = np.mean(np.abs(pred - gt))
- return mae
- def get_results(self) -> dict:
- mae = np.mean(np.array(self.maes, _TYPE))
- return dict(mae=mae)
- class MSEMeasure(object):
- def __init__(self):
- self.mses = []
- def step(self, pred: np.ndarray, gt: np.ndarray):
- pred, gt = _prepare_data(pred, gt)
- mse = self.cal_mse(pred, gt)
- self.mses.append(mse)
- def cal_mse(self, pred: np.ndarray, gt: np.ndarray) -> float:
- mse = np.mean((pred - gt) ** 2)
- return mse
- def get_results(self) -> dict:
- mse = np.mean(np.array(self.mses, _TYPE))
- return dict(mse=mse)
- class SMeasure(object):
- def __init__(self, alpha: float = 0.5):
- self.sms = []
- self.alpha = alpha
- def step(self, pred: np.ndarray, gt: np.ndarray):
- pred, gt = _prepare_data(pred=pred, gt=gt)
- sm = self.cal_sm(pred, gt)
- self.sms.append(sm)
- def cal_sm(self, pred: np.ndarray, gt: np.ndarray) -> float:
- y = np.mean(gt)
- if y == 0:
- sm = 1 - np.mean(pred)
- elif y == 1:
- sm = np.mean(pred)
- else:
- sm = self.alpha * self.object(pred, gt) + (1 - self.alpha) * self.region(pred, gt)
- sm = max(0, sm)
- return sm
- def object(self, pred: np.ndarray, gt: np.ndarray) -> float:
- fg = pred * gt
- bg = (1 - pred) * (1 - gt)
- u = np.mean(gt)
- object_score = u * self.s_object(fg, gt) + (1 - u) * self.s_object(bg, 1 - gt)
- return object_score
- def s_object(self, pred: np.ndarray, gt: np.ndarray) -> float:
- x = np.mean(pred[gt == 1])
- sigma_x = np.std(pred[gt == 1], ddof=1)
- score = 2 * x / (np.power(x, 2) + 1 + sigma_x + _EPS)
- return score
- def region(self, pred: np.ndarray, gt: np.ndarray) -> float:
- x, y = self.centroid(gt)
- part_info = self.divide_with_xy(pred, gt, x, y)
- w1, w2, w3, w4 = part_info['weight']
- pred1, pred2, pred3, pred4 = part_info['pred']
- gt1, gt2, gt3, gt4 = part_info['gt']
- score1 = self.ssim(pred1, gt1)
- score2 = self.ssim(pred2, gt2)
- score3 = self.ssim(pred3, gt3)
- score4 = self.ssim(pred4, gt4)
- return w1 * score1 + w2 * score2 + w3 * score3 + w4 * score4
- def centroid(self, matrix: np.ndarray) -> tuple:
- h, w = matrix.shape
- area_object = np.count_nonzero(matrix)
- if area_object == 0:
- x = np.round(w / 2)
- y = np.round(h / 2)
- else:
- # More details can be found at: https://www.yuque.com/lart/blog/gpbigm
- y, x = np.argwhere(matrix).mean(axis=0).round()
- return int(x) + 1, int(y) + 1
- def divide_with_xy(self, pred: np.ndarray, gt: np.ndarray, x, y) -> dict:
- h, w = gt.shape
- area = h * w
- gt_LT = gt[0:y, 0:x]
- gt_RT = gt[0:y, x:w]
- gt_LB = gt[y:h, 0:x]
- gt_RB = gt[y:h, x:w]
- pred_LT = pred[0:y, 0:x]
- pred_RT = pred[0:y, x:w]
- pred_LB = pred[y:h, 0:x]
- pred_RB = pred[y:h, x:w]
- w1 = x * y / area
- w2 = y * (w - x) / area
- w3 = (h - y) * x / area
- w4 = 1 - w1 - w2 - w3
- return dict(gt=(gt_LT, gt_RT, gt_LB, gt_RB),
- pred=(pred_LT, pred_RT, pred_LB, pred_RB),
- weight=(w1, w2, w3, w4))
- def ssim(self, pred: np.ndarray, gt: np.ndarray) -> float:
- h, w = pred.shape
- N = h * w
- x = np.mean(pred)
- y = np.mean(gt)
- sigma_x = np.sum((pred - x) ** 2) / (N - 1)
- sigma_y = np.sum((gt - y) ** 2) / (N - 1)
- sigma_xy = np.sum((pred - x) * (gt - y)) / (N - 1)
- alpha = 4 * x * y * sigma_xy
- beta = (x ** 2 + y ** 2) * (sigma_x + sigma_y)
- if alpha != 0:
- score = alpha / (beta + _EPS)
- elif alpha == 0 and beta == 0:
- score = 1
- else:
- score = 0
- return score
- def get_results(self) -> dict:
- sm = np.mean(np.array(self.sms, dtype=_TYPE))
- return dict(sm=sm)
- class EMeasure(object):
- def __init__(self):
- self.adaptive_ems = []
- self.changeable_ems = []
- def step(self, pred: np.ndarray, gt: np.ndarray):
- pred, gt = _prepare_data(pred=pred, gt=gt)
- self.gt_fg_numel = np.count_nonzero(gt)
- self.gt_size = gt.shape[0] * gt.shape[1]
- changeable_ems = self.cal_changeable_em(pred, gt)
- self.changeable_ems.append(changeable_ems)
- adaptive_em = self.cal_adaptive_em(pred, gt)
- self.adaptive_ems.append(adaptive_em)
- def cal_adaptive_em(self, pred: np.ndarray, gt: np.ndarray) -> float:
- adaptive_threshold = _get_adaptive_threshold(pred, max_value=1)
- adaptive_em = self.cal_em_with_threshold(pred, gt, threshold=adaptive_threshold)
- return adaptive_em
- def cal_changeable_em(self, pred: np.ndarray, gt: np.ndarray) -> np.ndarray:
- changeable_ems = self.cal_em_with_cumsumhistogram(pred, gt)
- return changeable_ems
- def cal_em_with_threshold(self, pred: np.ndarray, gt: np.ndarray, threshold: float) -> float:
- binarized_pred = pred >= threshold
- fg_fg_numel = np.count_nonzero(binarized_pred & gt)
- fg_bg_numel = np.count_nonzero(binarized_pred & ~gt)
- fg___numel = fg_fg_numel + fg_bg_numel
- bg___numel = self.gt_size - fg___numel
- if self.gt_fg_numel == 0:
- enhanced_matrix_sum = bg___numel
- elif self.gt_fg_numel == self.gt_size:
- enhanced_matrix_sum = fg___numel
- else:
- parts_numel, combinations = self.generate_parts_numel_combinations(
- fg_fg_numel=fg_fg_numel, fg_bg_numel=fg_bg_numel,
- pred_fg_numel=fg___numel, pred_bg_numel=bg___numel,
- )
- results_parts = []
- for i, (part_numel, combination) in enumerate(zip(parts_numel, combinations)):
- align_matrix_value = 2 * (combination[0] * combination[1]) / \
- (combination[0] ** 2 + combination[1] ** 2 + _EPS)
- enhanced_matrix_value = (align_matrix_value + 1) ** 2 / 4
- results_parts.append(enhanced_matrix_value * part_numel)
- enhanced_matrix_sum = sum(results_parts)
- em = enhanced_matrix_sum / (self.gt_size - 1 + _EPS)
- return em
- def cal_em_with_cumsumhistogram(self, pred: np.ndarray, gt: np.ndarray) -> np.ndarray:
- pred = (pred * 255).astype(np.uint8)
- bins = np.linspace(0, 256, 257)
- fg_fg_hist, _ = np.histogram(pred[gt], bins=bins)
- fg_bg_hist, _ = np.histogram(pred[~gt], bins=bins)
- fg_fg_numel_w_thrs = np.cumsum(np.flip(fg_fg_hist), axis=0)
- fg_bg_numel_w_thrs = np.cumsum(np.flip(fg_bg_hist), axis=0)
- fg___numel_w_thrs = fg_fg_numel_w_thrs + fg_bg_numel_w_thrs
- bg___numel_w_thrs = self.gt_size - fg___numel_w_thrs
- if self.gt_fg_numel == 0:
- enhanced_matrix_sum = bg___numel_w_thrs
- elif self.gt_fg_numel == self.gt_size:
- enhanced_matrix_sum = fg___numel_w_thrs
- else:
- parts_numel_w_thrs, combinations = self.generate_parts_numel_combinations(
- fg_fg_numel=fg_fg_numel_w_thrs, fg_bg_numel=fg_bg_numel_w_thrs,
- pred_fg_numel=fg___numel_w_thrs, pred_bg_numel=bg___numel_w_thrs,
- )
- results_parts = np.empty(shape=(4, 256), dtype=np.float64)
- for i, (part_numel, combination) in enumerate(zip(parts_numel_w_thrs, combinations)):
- align_matrix_value = 2 * (combination[0] * combination[1]) / \
- (combination[0] ** 2 + combination[1] ** 2 + _EPS)
- enhanced_matrix_value = (align_matrix_value + 1) ** 2 / 4
- results_parts[i] = enhanced_matrix_value * part_numel
- enhanced_matrix_sum = results_parts.sum(axis=0)
- em = enhanced_matrix_sum / (self.gt_size - 1 + _EPS)
- return em
- def generate_parts_numel_combinations(self, fg_fg_numel, fg_bg_numel, pred_fg_numel, pred_bg_numel):
- bg_fg_numel = self.gt_fg_numel - fg_fg_numel
- bg_bg_numel = pred_bg_numel - bg_fg_numel
- parts_numel = [fg_fg_numel, fg_bg_numel, bg_fg_numel, bg_bg_numel]
- mean_pred_value = pred_fg_numel / self.gt_size
- mean_gt_value = self.gt_fg_numel / self.gt_size
- demeaned_pred_fg_value = 1 - mean_pred_value
- demeaned_pred_bg_value = 0 - mean_pred_value
- demeaned_gt_fg_value = 1 - mean_gt_value
- demeaned_gt_bg_value = 0 - mean_gt_value
- combinations = [
- (demeaned_pred_fg_value, demeaned_gt_fg_value),
- (demeaned_pred_fg_value, demeaned_gt_bg_value),
- (demeaned_pred_bg_value, demeaned_gt_fg_value),
- (demeaned_pred_bg_value, demeaned_gt_bg_value)
- ]
- return parts_numel, combinations
- def get_results(self) -> dict:
- adaptive_em = np.mean(np.array(self.adaptive_ems, dtype=_TYPE))
- changeable_em = np.mean(np.array(self.changeable_ems, dtype=_TYPE), axis=0)
- return dict(em=dict(adp=adaptive_em, curve=changeable_em))
- class WeightedFMeasure(object):
- def __init__(self, beta: float = 1):
- self.beta = beta
- self.weighted_fms = []
- def step(self, pred: np.ndarray, gt: np.ndarray):
- pred, gt = _prepare_data(pred=pred, gt=gt)
- if np.all(~gt):
- wfm = 0
- else:
- wfm = self.cal_wfm(pred, gt)
- self.weighted_fms.append(wfm)
- def cal_wfm(self, pred: np.ndarray, gt: np.ndarray) -> float:
- # [Dst,IDXT] = bwdist(dGT);
- Dst, Idxt = bwdist(gt == 0, return_indices=True)
- # %Pixel dependency
- # E = abs(FG-dGT);
- E = np.abs(pred - gt)
- Et = np.copy(E)
- Et[gt == 0] = Et[Idxt[0][gt == 0], Idxt[1][gt == 0]]
- # K = fspecial('gaussian',7,5);
- # EA = imfilter(Et,K);
- K = self.matlab_style_gauss2D((7, 7), sigma=5)
- EA = convolve(Et, weights=K, mode="constant", cval=0)
- # MIN_E_EA = E;
- # MIN_E_EA(GT & EA<E) = EA(GT & EA<E);
- MIN_E_EA = np.where(gt & (EA < E), EA, E)
- # %Pixel importance
- B = np.where(gt == 0, 2 - np.exp(np.log(0.5) / 5 * Dst), np.ones_like(gt))
- Ew = MIN_E_EA * B
- TPw = np.sum(gt) - np.sum(Ew[gt == 1])
- FPw = np.sum(Ew[gt == 0])
- R = 1 - np.mean(Ew[gt == 1])
- P = TPw / (TPw + FPw + _EPS)
- # % Q = (1+Beta^2)*(R*P)./(eps+R+(Beta.*P));
- Q = (1 + self.beta) * R * P / (R + self.beta * P + _EPS)
- return Q
- def matlab_style_gauss2D(self, shape: tuple = (7, 7), sigma: int = 5) -> np.ndarray:
- """
- 2D gaussian mask - should give the same result as MATLAB's
- fspecial('gaussian',[shape],[sigma])
- """
- m, n = [(ss - 1) / 2 for ss in shape]
- y, x = np.ogrid[-m: m + 1, -n: n + 1]
- h = np.exp(-(x * x + y * y) / (2 * sigma * sigma))
- h[h < np.finfo(h.dtype).eps * h.max()] = 0
- sumh = h.sum()
- if sumh != 0:
- h /= sumh
- return h
- def get_results(self) -> dict:
- weighted_fm = np.mean(np.array(self.weighted_fms, dtype=_TYPE))
- return dict(wfm=weighted_fm)
- class HCEMeasure(object):
- def __init__(self):
- self.hces = []
- def step(self, pred: np.ndarray, gt: np.ndarray, gt_ske):
- # pred, gt = _prepare_data(pred, gt)
- hce = self.cal_hce(pred, gt, gt_ske)
- self.hces.append(hce)
- def get_results(self) -> dict:
- hce = np.mean(np.array(self.hces, _TYPE))
- return dict(hce=hce)
- def cal_hce(self, pred: np.ndarray, gt: np.ndarray, gt_ske: np.ndarray, relax=5, epsilon=2.0) -> float:
- # Binarize gt
- if(len(gt.shape)>2):
- gt = gt[:, :, 0]
- epsilon_gt = 128#(np.amin(gt)+np.amax(gt))/2.0
- gt = (gt>epsilon_gt).astype(np.uint8)
- # Binarize pred
- if(len(pred.shape)>2):
- pred = pred[:, :, 0]
- epsilon_pred = 128#(np.amin(pred)+np.amax(pred))/2.0
- pred = (pred>epsilon_pred).astype(np.uint8)
- Union = np.logical_or(gt, pred)
- TP = np.logical_and(gt, pred)
- FP = pred - TP
- FN = gt - TP
- # relax the Union of gt and pred
- Union_erode = Union.copy()
- Union_erode = cv2.erode(Union_erode.astype(np.uint8), disk(1), iterations=relax)
- # --- get the relaxed False Positive regions for computing the human efforts in correcting them ---
- FP_ = np.logical_and(FP, Union_erode) # get the relaxed FP
- for i in range(0, relax):
- FP_ = cv2.dilate(FP_.astype(np.uint8), disk(1))
- FP_ = np.logical_and(FP_, 1-np.logical_or(TP, FN))
- FP_ = np.logical_and(FP, FP_)
- # --- get the relaxed False Negative regions for computing the human efforts in correcting them ---
- FN_ = np.logical_and(FN, Union_erode) # preserve the structural components of FN
- ## recover the FN, where pixels are not close to the TP borders
- for i in range(0, relax):
- FN_ = cv2.dilate(FN_.astype(np.uint8), disk(1))
- FN_ = np.logical_and(FN_, 1-np.logical_or(TP, FP))
- FN_ = np.logical_and(FN, FN_)
- FN_ = np.logical_or(FN_, np.logical_xor(gt_ske, np.logical_and(TP, gt_ske))) # preserve the structural components of FN
- ## 2. =============Find exact polygon control points and independent regions==============
- ## find contours from FP_
- ctrs_FP, hier_FP = cv2.findContours(FP_.astype(np.uint8), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
- ## find control points and independent regions for human correction
- bdies_FP, indep_cnt_FP = self.filter_bdy_cond(ctrs_FP, FP_, np.logical_or(TP,FN_))
- ## find contours from FN_
- ctrs_FN, hier_FN = cv2.findContours(FN_.astype(np.uint8), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
- ## find control points and independent regions for human correction
- bdies_FN, indep_cnt_FN = self.filter_bdy_cond(ctrs_FN, FN_, 1-np.logical_or(np.logical_or(TP, FP_), FN_))
- poly_FP, poly_FP_len, poly_FP_point_cnt = self.approximate_RDP(bdies_FP, epsilon=epsilon)
- poly_FN, poly_FN_len, poly_FN_point_cnt = self.approximate_RDP(bdies_FN, epsilon=epsilon)
- # FP_points+FP_indep+FN_points+FN_indep
- return poly_FP_point_cnt+indep_cnt_FP+poly_FN_point_cnt+indep_cnt_FN
- def filter_bdy_cond(self, bdy_, mask, cond):
- cond = cv2.dilate(cond.astype(np.uint8), disk(1))
- labels = label(mask) # find the connected regions
- lbls = np.unique(labels) # the indices of the connected regions
- indep = np.ones(lbls.shape[0]) # the label of each connected regions
- indep[0] = 0 # 0 indicate the background region
- boundaries = []
- h,w = cond.shape[0:2]
- ind_map = np.zeros((h, w))
- indep_cnt = 0
- for i in range(0, len(bdy_)):
- tmp_bdies = []
- tmp_bdy = []
- for j in range(0, bdy_[i].shape[0]):
- r, c = bdy_[i][j,0,1],bdy_[i][j,0,0]
- if(np.sum(cond[r, c])==0 or ind_map[r, c]!=0):
- if(len(tmp_bdy)>0):
- tmp_bdies.append(tmp_bdy)
- tmp_bdy = []
- continue
- tmp_bdy.append([c, r])
- ind_map[r, c] = ind_map[r, c] + 1
- indep[labels[r, c]] = 0 # indicates part of the boundary of this region needs human correction
- if(len(tmp_bdy)>0):
- tmp_bdies.append(tmp_bdy)
- # check if the first and the last boundaries are connected
- # if yes, invert the first boundary and attach it after the last boundary
- if(len(tmp_bdies)>1):
- first_x, first_y = tmp_bdies[0][0]
- last_x, last_y = tmp_bdies[-1][-1]
- if((abs(first_x-last_x)==1 and first_y==last_y) or
- (first_x==last_x and abs(first_y-last_y)==1) or
- (abs(first_x-last_x)==1 and abs(first_y-last_y)==1)
- ):
- tmp_bdies[-1].extend(tmp_bdies[0][::-1])
- del tmp_bdies[0]
- for k in range(0, len(tmp_bdies)):
- tmp_bdies[k] = np.array(tmp_bdies[k])[:, np.newaxis, :]
- if(len(tmp_bdies)>0):
- boundaries.extend(tmp_bdies)
- return boundaries, np.sum(indep)
- # this function approximate each boundary by DP algorithm
- # https://en.wikipedia.org/wiki/Ramer%E2%80%93Douglas%E2%80%93Peucker_algorithm
- def approximate_RDP(self, boundaries, epsilon=1.0):
- boundaries_ = []
- boundaries_len_ = []
- pixel_cnt_ = 0
- # polygon approximate of each boundary
- for i in range(0, len(boundaries)):
- boundaries_.append(cv2.approxPolyDP(boundaries[i], epsilon, False))
- # count the control points number of each boundary and the total control points number of all the boundaries
- for i in range(0, len(boundaries_)):
- boundaries_len_.append(len(boundaries_[i]))
- pixel_cnt_ = pixel_cnt_ + len(boundaries_[i])
- return boundaries_, boundaries_len_, pixel_cnt_
- class MBAMeasure(object):
- def __init__(self):
- self.bas = []
- self.all_h = 0
- self.all_w = 0
- self.all_max = 0
- def step(self, pred: np.ndarray, gt: np.ndarray):
- # pred, gt = _prepare_data(pred, gt)
- refined = gt.copy()
- rmin = cmin = 0
- rmax, cmax = gt.shape
- self.all_h += rmax
- self.all_w += cmax
- self.all_max += max(rmax, cmax)
- refined_h, refined_w = refined.shape
- if refined_h != cmax:
- refined = np.array(Image.fromarray(pred).resize((cmax, rmax), Image.BILINEAR))
- if not(gt.sum() < 32*32):
- if not((cmax==cmin) or (rmax==rmin)):
- class_refined_prob = np.array(Image.fromarray(pred).resize((cmax-cmin, rmax-rmin), Image.BILINEAR))
- refined[rmin:rmax, cmin:cmax] = class_refined_prob
- pred = pred > 128
- gt = gt > 128
- ba = self.cal_ba(pred, gt)
- self.bas.append(ba)
- def get_disk_kernel(self, radius):
- return cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (radius*2+1, radius*2+1))
- def cal_ba(self, pred: np.ndarray, gt: np.ndarray) -> np.ndarray:
- """
- Calculate the mean absolute error.
- :return: ba
- """
-
- gt = gt.astype(np.uint8)
- pred = pred.astype(np.uint8)
- h, w = gt.shape
- min_radius = 1
- max_radius = (w+h)/300
- num_steps = 5
- pred_acc = [None] * num_steps
- for i in range(num_steps):
- curr_radius = min_radius + int((max_radius-min_radius)/num_steps*i)
- kernel = self.get_disk_kernel(curr_radius)
- boundary_region = cv2.morphologyEx(gt, cv2.MORPH_GRADIENT, kernel) > 0
- gt_in_bound = gt[boundary_region]
- pred_in_bound = pred[boundary_region]
- num_edge_pixels = (boundary_region).sum()
- num_pred_gd_pix = ((gt_in_bound) * (pred_in_bound) + (1-gt_in_bound) * (1-pred_in_bound)).sum()
- pred_acc[i] = num_pred_gd_pix / num_edge_pixels
- ba = sum(pred_acc)/num_steps
- return ba
- def get_results(self) -> dict:
- mba = np.mean(np.array(self.bas, _TYPE))
- return dict(mba=mba)
- class BIoUMeasure(object):
- def __init__(self, dilation_ratio=0.02):
- self.bious = []
- self.dilation_ratio = dilation_ratio
-
- def mask_to_boundary(self, mask):
- h, w = mask.shape
- img_diag = np.sqrt(h ** 2 + w ** 2)
- dilation = int(round(self.dilation_ratio * img_diag))
- if dilation < 1:
- dilation = 1
- # Pad image so mask truncated by the image border is also considered as boundary.
- new_mask = cv2.copyMakeBorder(mask, 1, 1, 1, 1, cv2.BORDER_CONSTANT, value=0)
- kernel = np.ones((3, 3), dtype=np.uint8)
- new_mask_erode = cv2.erode(new_mask, kernel, iterations=dilation)
- mask_erode = new_mask_erode[1 : h + 1, 1 : w + 1]
- # G_d intersects G in the paper.
- return mask - mask_erode
- def step(self, pred: np.ndarray, gt: np.ndarray):
- pred, gt = _prepare_data(pred, gt)
- bious = self.cal_biou(pred=pred, gt=gt)
- self.bious.append(bious)
- def cal_biou(self, pred, gt):
- pred = (pred * 255).astype(np.uint8)
- pred = self.mask_to_boundary(pred)
- gt = (gt * 255).astype(np.uint8)
- gt = self.mask_to_boundary(gt)
- gt = gt > 128
-
- bins = np.linspace(0, 256, 257)
- fg_hist, _ = np.histogram(pred[gt], bins=bins) # ture positive
- bg_hist, _ = np.histogram(pred[~gt], bins=bins) # false positive
- fg_w_thrs = np.cumsum(np.flip(fg_hist), axis=0)
- bg_w_thrs = np.cumsum(np.flip(bg_hist), axis=0)
- TPs = fg_w_thrs
- Ps = fg_w_thrs + bg_w_thrs # positives
- Ps[Ps == 0] = 1
- T = max(np.count_nonzero(gt), 1)
- ious = TPs / (T + bg_w_thrs)
- return ious
- def get_results(self) -> dict:
- biou = np.mean(np.array(self.bious, dtype=_TYPE), axis=0)
- return dict(biou=dict(curve=biou))
- def sort_and_round_scores(task, scores, r=3):
- em, sm, fm, mae, mse, wfm, hce, mba, biou = scores
- if task == 'DIS5K':
- scores = [fm['curve'].max().round(r), wfm.round(r), mae.round(r), sm.round(r), em['curve'].mean().round(r), int(hce.round()),
- em['curve'].max().round(r), fm['curve'].mean().round(r), em['adp'].round(r), fm['adp'].round(r),
- mba.round(r), biou['curve'].max().round(r), biou['curve'].mean().round(r),]
- elif task == 'COD':
- scores = [sm.round(r), wfm.round(r), fm['curve'].mean().round(r), em['curve'].mean().round(r), em['curve'].max().round(r), mae.round(r),
- fm['curve'].max().round(r), em['adp'].round(r), fm['adp'].round(r), int(hce.round()),
- mba.round(r), biou['curve'].max().round(r), biou['curve'].mean().round(r),]
- elif task == 'HRSOD':
- scores = [sm.round(r), fm['curve'].max().round(r), em['curve'].mean().round(r), mae.round(r),
- em['curve'].max().round(r), fm['curve'].mean().round(r), wfm.round(r), em['adp'].round(r), fm['adp'].round(r), int(hce.round()),
- mba.round(r), biou['curve'].max().round(r), biou['curve'].mean().round(r),]
- elif task == 'General':
- scores = [fm['curve'].max().round(r), wfm.round(r), mae.round(r), sm.round(r), em['curve'].mean().round(r), int(hce.round()),
- em['curve'].max().round(r), fm['curve'].mean().round(r), em['adp'].round(r), fm['adp'].round(r),
- mba.round(r), biou['curve'].max().round(r), biou['curve'].mean().round(r),]
- elif task == 'General-2K':
- scores = [fm['curve'].max().round(r), wfm.round(r), mae.round(r), sm.round(r), em['curve'].mean().round(r), int(hce.round()),
- em['curve'].max().round(r), fm['curve'].mean().round(r), em['adp'].round(r), fm['adp'].round(r),
- mba.round(r), biou['curve'].max().round(r), biou['curve'].mean().round(r),]
- elif task == 'Matting':
- scores = [sm.round(r), fm['curve'].max().round(r), em['curve'].mean().round(r), mse.round(5),
- em['curve'].max().round(r), fm['curve'].mean().round(r), wfm.round(r), em['adp'].round(r), fm['adp'].round(r), int(hce.round()),
- mba.round(r), biou['curve'].max().round(r), biou['curve'].mean().round(r),]
- else:
- scores = [sm.round(r), mae.round(r), em['curve'].max().round(r), em['curve'].mean().round(r),
- fm['curve'].max().round(r), fm['curve'].mean().round(r), wfm.round(r),
- em['adp'].round(r), fm['adp'].round(r), int(hce.round()),
- mba.round(r), biou['curve'].max().round(r), biou['curve'].mean().round(r),]
- return scores
|