Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- import math
- from collections import OrderedDict, defaultdict
- from dataclasses import dataclass, field
- from typing import Optional, Tuple, Union, Any
- from concurrent.futures import ThreadPoolExecutor
- import asyncio
- import math
- from functools import wraps
- import time
- from itertools import repeat
- from concurrent.futures import *
- from collections import defaultdict
- import pandas as pd
- import numpy as np
- import joblib
- import numba
- from numba import jit
- from numba import types
- from numba.typed import Dict
- #@jit(nopython=True)
- def confusion_matrix(y_true, y_pred, labels, n_labels):
- result = np.zeros((n_labels, n_labels), dtype=np.int32)
- for i in range(len(y_true)):
- result[y_true[i]][y_pred[i]] += 1
- return result
- @jit(nopython=True)
- def metrics_from_confusion_matrix(confusion_matrix, classes):
- metrics = Dict.empty(
- key_type=types.unicode_type,
- value_type=types.float64
- )
- zero_matrix = np.zeros(confusion_matrix.shape, dtype=numba.int32)
- for i in range(len(classes)):
- tp = confusion_matrix[i, i]
- # FN = row i - TP
- fn_mask = np.copy(zero_matrix)
- fn_mask[i, :] = 1
- fn_mask[i, i] = 0
- fn = np.sum(np.multiply(confusion_matrix, fn_mask))
- # FP = column i - TP
- fp_mask = np.copy(zero_matrix)
- fp_mask[:, i] = 1
- fp_mask[i, i] = 0
- fp = np.sum(np.multiply(confusion_matrix, fp_mask))
- # TN = everything - TP - FN - FP
- #tn_mask = 1 - (fn_mask + fp_mask)
- #tn_mask[i, i] = 0 # for TP
- #tn = np.sum(np.multiply(confusion_matrix, tn_mask))
- # TPR = TP/(TP+FN)
- tpr = tp / (tp + fn)
- # FPR = FP/(FP+TN)
- #fpr = fp / (fp + tn)
- precision = tp/(tp + fp)
- metrics[classes[i]] = {
- "precision": precision,
- "recall": tpr
- }
- return metrics
- def reporter(iteration):
- # bootstrap samples
- df_local = df_encoded_np[np.random.choice(df_length, size=df_length, replace=True), :]
- matrix = confusion_matrix(df_local[:,0], df_local[:,1], labels=labels, n_labels=n_labels)
- report = metrics_from_confusion_matrix(matrix, labels)
- # store in a nested dict by [class_name][metric_name]
- results = dict()
- for key, val in report.items():
- for metric_name, metric_val in val.items():
- if metric_name in ['precision','recall']:
- results[key + '_' + metric_name] = metric_val
- return results
- def interval_calculator(key, values):
- # return 95% confidence interval of values
- sorted_values = sorted(values)
- n_values = len(sorted_values)
- start_index = math.floor(n_values*0.025)
- end_index = math.floor(n_values*0.975)
- return [key, sorted_values[start_index], sorted_values[end_index]]
- def run():
- """with ProcessPoolExecutor(8) as executor:
- results = list(executor.map(reporter, range(n)))
- all_results = defaultdict(list)
- for result in results:
- # each result is a dict
- for key, val in result.items():
- all_results[key].append(val)
- results = list(executor.map(interval_calculator, *zip(*all_results.items())))
- print(results)"""
- # TODO: use numba parallel here instead
- results = reporter(1)
- print(results)
- estimates = defaultdict(lambda: defaultdict(list))
- intervals = defaultdict(lambda: defaultdict(list))
- df = pd.DataFrame({'id': list(range(10)), 'label': ['A','B','C','D','C','A','B','B','A','D'], 'predicted': ['B','A','D','A','B','C','C','B','D','A']})
- df_length = len(df)
- sample_weight = np.ones(df_length, dtype=np.int32)
- labels = np.unique(df['label'])
- n_labels = len(labels)
- df_subset = df[['id','label','predicted']]
- mapping_df = pd.DataFrame({'class': labels, 'value': range(len(labels))})
- df_label_encoded = df_subset.merge(mapping_df, left_on='label', right_on='class')
- df_predicted_encoded = df_subset.merge(mapping_df, left_on='predicted', right_on='class')
- df_encoded = df_label_encoded.merge(df_predicted_encoded, on='id', suffixes=['_label','_predicted'])
- df_encoded_np = df_encoded[['value_label','value_predicted']].to_numpy()
- n = 1000
- run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement