Loading auditory_feature_helpers.py +111 −20 Original line number Diff line number Diff line Loading @@ -5,28 +5,35 @@ import pandas as pd import joblib import seaborn as sns import dill from sklearn.linear_model import LinearRegression from copy import deepcopy from coef_helper_functions import remove_BF_from_coefs, get_cluster_coefs_from_estimator, make_df_for_lineplot from coef_helper_functions import remove_BF_from_coefs, make_df_for_lineplot def test_latent_space_reconstruction(feature, latent_activity, estimator=None, **kwargs): '''Returns the cross-validated explained variance (averaged across 8 folds) for predicting feature from latent_activity''' '''Returns the cross-validated explained variance (averaged across 8 folds) for predicting feature from latent_activity''' from sklearn.model_selection import cross_validate from sklearn.linear_model import RidgeCV if estimator is None: estimator = RidgeCV(alphas=[1e-5, 1e-3, 1e-1, 1, 1e3, 1e5]) cv_result = cross_validate(estimator, latent_activity, feature, scoring='explained_variance', cv=8, **kwargs) cv_result = cross_validate(estimator, latent_activity, feature, scoring='explained_variance', cv=8, **kwargs) if 'estimator' in cv_result: return cv_result['test_score'], cv_result['estimator'] else: return cv_result['test_score'] def get_feature_scores(feature_dict, latent_activity, ratings_idx, estimator=None, **kwargs): def get_feature_scores(feature_dict, latent_activity, ratings_idx, estimator=None, **kwargs): scores_dict = dict() for label in ['Time-Frequency Separability', 'Sound level (db)', 'Speech duration (s)']: scores_dict[label] = test_latent_space_reconstruction(feature_dict[label], latent_activity, estimator=estimator, **kwargs) scores_dict['Noise rating'] = test_latent_space_reconstruction(feature_dict['Noise rating'], latent_activity[ratings_idx], estimator=estimator, **kwargs) feature_names = ['Time-Frequency Separability', 'Sound level (db)', 'Speech duration (s)'] for label in feature_names: scores_dict[label] = test_latent_space_reconstruction( feature_dict[label], latent_activity, estimator=estimator, **kwargs) scores_dict['Noise rating'] = test_latent_space_reconstruction( feature_dict['Noise rating'], latent_activity[ratings_idx], estimator=estimator, **kwargs) return scores_dict def get_average_estimator(): Loading @@ -39,17 +46,19 @@ def get_feature_dict(): separability = joblib.load('mean_sep.pkl') separability_pos = joblib.load('sep_of_pos_Ws_only.pkl') separability_pos[np.isnan(separability_pos)] = 0 db = joblib.load('db_dict.pkl') db = np.array([db[str(i)][1] for i in range(3539)]) decibel = joblib.load('db_dict.pkl') decibel = np.array([decibel[str(i)][1] for i in range(3539)]) speech_overlap = joblib.load('speech_overlap.pkl') #pcs = joblib.load('testtest.pkl')[..., :3] #average_pcs = pcs.mean(axis=0) ratings_dict = joblib.load('ratings_dict.pkl') feature_dict = {'Time-Frequency Separability' : separability, 'Sound level (db)' : db, 'Positive separability' : separability_pos, 'Speech duration (s)' : speech_overlap, 'Noise rating' : ratings_dict['ratings'], 'BSC' : bsc} feature_dict = {'Time-Frequency Separability': separability, 'Sound level (decibel)': decibel, 'Positive separability': separability_pos, 'Speech duration (s)': speech_overlap, 'Noise rating': ratings_dict['ratings'], 'BSC': bsc} return feature_dict def get_cluster_infos(means_file='cluster_means_reordered.pkl', idx_file='compressed_cluster_identity_reordered.pkl'): def get_cluster_infos(means_file='cluster_means_reordered.pkl', idx_file='compressed_cluster_identity_reordered.pkl'): cluster_means = joblib.load(means_file) cluster_idx = joblib.load(idx_file) return {'means': cluster_means, 'index': cluster_idx} Loading Loading @@ -81,7 +90,8 @@ def get_seps(features, separability, excl_idx=None): for ft in features: if ft.any(): if excl_idx is not None: separabilities_sample.append(np.array([separability[loc] for loc in np.where(ft)[0] if not np.isin(loc, excl_idx)])) separabilities_sample.append( np.array([separability[loc] for loc in np.where(ft)[0] if not np.isin(loc, excl_idx)])) else: separabilities_sample.append(separability[np.where(ft)[0]]) return np.concatenate(separabilities_sample) Loading Loading @@ -109,3 +119,84 @@ def compute_mps_time_and_freq_labels(n_fft=882, sr=16000, fmax=8000, n_mels=48): 1. / 100.)) mps_freqs = np.fft.fftshift(np.fft.fftfreq(fft_freq.shape[0], freq_step)) return mps_times/10, mps_freqs def bin_component_indices(component, n_bins=5): '''Computes n_bins bins of component and returns the bin edges and indices''' assert len(component.shape) == 1 _, edges = np.histogram(component, bins=n_bins) indices = np.digitize(component, edges) return edges, indices def get_features_in_sample(bsc, feature): '''Melts the occurences of feature in each row of bsc into a list of lists IN: bsc - ndarray of shape (samples, 12000) feature - ndarray of shape (200,) OUT: feature_list - list of lists ''' feature_list = [] bsc = np.reshape(bsc, (-1, 60, 200)) for bsc_sample in bsc: temp_list = [] for bsc_ts in bsc_sample: active_BFs = np.where(bsc_ts)[0] if active_BFs.size > 0: temp_list.append(feature[active_BFs]) feature_list.append(np.concatenate(temp_list)) return feature_list def feature_list_to_df(feature_list, indices_samples, feature_name='value'): '''Converts a feature list to a melted dataframe annotated by the bins from indices_samples''' list_of_indices = [[idx]*len(feature_list[i]) for i, idx in enumerate(indices_samples)] return pd.DataFrame({'bin': np.concatenate(list_of_indices), feature_name: np.concatenate(feature_list)}) def make_df_for_feature_sensitivity(bf_feature_dict, bsc, component, n_bins=5): '''Creates a melted pandas DataFrame for each feature in bf feature_dict IN: bf_feature_dict - dictionary with auditory feature names as keys and shape (200,) ndarrays quantifying the feature for each BSC basis function bsc - ndarray of the Binary Sparse Coding basis function activations component - ndarray of component activation in each sample n_bins - number of bins for each principal component ''' from functools import reduce _, indices = bin_component_indices(component, n_bins=n_bins) list_of_dfs = [feature_list_to_df(get_features_in_sample(bsc, feature), indices, feature_name=feature_name) for feature_name, feature in bf_feature_dict.items()] joint_df = reduce(lambda x, y: pd.concat([x,y.drop('bin', axis=1)], axis=1), list_of_dfs) return joint_df def annotate_df_with_pc_number(df, pc_number): '''Adds a column to df with pc_number''' return pd.concat([df, pd.Series([pc_number]*df.shape[0], name='PC')], axis=1) #TODO: think about how to do a unittest def make_feature_pc_df(bf_feature_dict, bsc, pcs, n_bins=5): '''Creates a melted pandas DataFrame for each feature in bf feature_dict and each component in pcs.shape[1] IN: bf_feature_dict - dictionary with auditory feature names as keys and shape (200,) ndarrays quantifying the feature for each BSC basis function bsc - ndarray of the Binary Sparse Coding basis function activations pcs - principal component values for each sample n_bins - number of bins for each principal component''' # test that the number of samples is the same assert pcs.shape[0] == bsc.shape[0] pc_df_list = [annotate_df_with_pc_number( make_df_for_feature_sensitivity(bf_feature_dict, bsc, component, n_bins=n_bins), i+1) for i, component in enumerate(pcs.T)] return pd.concat(pc_df_list, axis=0, ignore_index=True) Loading
auditory_feature_helpers.py +111 −20 Original line number Diff line number Diff line Loading @@ -5,28 +5,35 @@ import pandas as pd import joblib import seaborn as sns import dill from sklearn.linear_model import LinearRegression from copy import deepcopy from coef_helper_functions import remove_BF_from_coefs, get_cluster_coefs_from_estimator, make_df_for_lineplot from coef_helper_functions import remove_BF_from_coefs, make_df_for_lineplot def test_latent_space_reconstruction(feature, latent_activity, estimator=None, **kwargs): '''Returns the cross-validated explained variance (averaged across 8 folds) for predicting feature from latent_activity''' '''Returns the cross-validated explained variance (averaged across 8 folds) for predicting feature from latent_activity''' from sklearn.model_selection import cross_validate from sklearn.linear_model import RidgeCV if estimator is None: estimator = RidgeCV(alphas=[1e-5, 1e-3, 1e-1, 1, 1e3, 1e5]) cv_result = cross_validate(estimator, latent_activity, feature, scoring='explained_variance', cv=8, **kwargs) cv_result = cross_validate(estimator, latent_activity, feature, scoring='explained_variance', cv=8, **kwargs) if 'estimator' in cv_result: return cv_result['test_score'], cv_result['estimator'] else: return cv_result['test_score'] def get_feature_scores(feature_dict, latent_activity, ratings_idx, estimator=None, **kwargs): def get_feature_scores(feature_dict, latent_activity, ratings_idx, estimator=None, **kwargs): scores_dict = dict() for label in ['Time-Frequency Separability', 'Sound level (db)', 'Speech duration (s)']: scores_dict[label] = test_latent_space_reconstruction(feature_dict[label], latent_activity, estimator=estimator, **kwargs) scores_dict['Noise rating'] = test_latent_space_reconstruction(feature_dict['Noise rating'], latent_activity[ratings_idx], estimator=estimator, **kwargs) feature_names = ['Time-Frequency Separability', 'Sound level (db)', 'Speech duration (s)'] for label in feature_names: scores_dict[label] = test_latent_space_reconstruction( feature_dict[label], latent_activity, estimator=estimator, **kwargs) scores_dict['Noise rating'] = test_latent_space_reconstruction( feature_dict['Noise rating'], latent_activity[ratings_idx], estimator=estimator, **kwargs) return scores_dict def get_average_estimator(): Loading @@ -39,17 +46,19 @@ def get_feature_dict(): separability = joblib.load('mean_sep.pkl') separability_pos = joblib.load('sep_of_pos_Ws_only.pkl') separability_pos[np.isnan(separability_pos)] = 0 db = joblib.load('db_dict.pkl') db = np.array([db[str(i)][1] for i in range(3539)]) decibel = joblib.load('db_dict.pkl') decibel = np.array([decibel[str(i)][1] for i in range(3539)]) speech_overlap = joblib.load('speech_overlap.pkl') #pcs = joblib.load('testtest.pkl')[..., :3] #average_pcs = pcs.mean(axis=0) ratings_dict = joblib.load('ratings_dict.pkl') feature_dict = {'Time-Frequency Separability' : separability, 'Sound level (db)' : db, 'Positive separability' : separability_pos, 'Speech duration (s)' : speech_overlap, 'Noise rating' : ratings_dict['ratings'], 'BSC' : bsc} feature_dict = {'Time-Frequency Separability': separability, 'Sound level (decibel)': decibel, 'Positive separability': separability_pos, 'Speech duration (s)': speech_overlap, 'Noise rating': ratings_dict['ratings'], 'BSC': bsc} return feature_dict def get_cluster_infos(means_file='cluster_means_reordered.pkl', idx_file='compressed_cluster_identity_reordered.pkl'): def get_cluster_infos(means_file='cluster_means_reordered.pkl', idx_file='compressed_cluster_identity_reordered.pkl'): cluster_means = joblib.load(means_file) cluster_idx = joblib.load(idx_file) return {'means': cluster_means, 'index': cluster_idx} Loading Loading @@ -81,7 +90,8 @@ def get_seps(features, separability, excl_idx=None): for ft in features: if ft.any(): if excl_idx is not None: separabilities_sample.append(np.array([separability[loc] for loc in np.where(ft)[0] if not np.isin(loc, excl_idx)])) separabilities_sample.append( np.array([separability[loc] for loc in np.where(ft)[0] if not np.isin(loc, excl_idx)])) else: separabilities_sample.append(separability[np.where(ft)[0]]) return np.concatenate(separabilities_sample) Loading Loading @@ -109,3 +119,84 @@ def compute_mps_time_and_freq_labels(n_fft=882, sr=16000, fmax=8000, n_mels=48): 1. / 100.)) mps_freqs = np.fft.fftshift(np.fft.fftfreq(fft_freq.shape[0], freq_step)) return mps_times/10, mps_freqs def bin_component_indices(component, n_bins=5): '''Computes n_bins bins of component and returns the bin edges and indices''' assert len(component.shape) == 1 _, edges = np.histogram(component, bins=n_bins) indices = np.digitize(component, edges) return edges, indices def get_features_in_sample(bsc, feature): '''Melts the occurences of feature in each row of bsc into a list of lists IN: bsc - ndarray of shape (samples, 12000) feature - ndarray of shape (200,) OUT: feature_list - list of lists ''' feature_list = [] bsc = np.reshape(bsc, (-1, 60, 200)) for bsc_sample in bsc: temp_list = [] for bsc_ts in bsc_sample: active_BFs = np.where(bsc_ts)[0] if active_BFs.size > 0: temp_list.append(feature[active_BFs]) feature_list.append(np.concatenate(temp_list)) return feature_list def feature_list_to_df(feature_list, indices_samples, feature_name='value'): '''Converts a feature list to a melted dataframe annotated by the bins from indices_samples''' list_of_indices = [[idx]*len(feature_list[i]) for i, idx in enumerate(indices_samples)] return pd.DataFrame({'bin': np.concatenate(list_of_indices), feature_name: np.concatenate(feature_list)}) def make_df_for_feature_sensitivity(bf_feature_dict, bsc, component, n_bins=5): '''Creates a melted pandas DataFrame for each feature in bf feature_dict IN: bf_feature_dict - dictionary with auditory feature names as keys and shape (200,) ndarrays quantifying the feature for each BSC basis function bsc - ndarray of the Binary Sparse Coding basis function activations component - ndarray of component activation in each sample n_bins - number of bins for each principal component ''' from functools import reduce _, indices = bin_component_indices(component, n_bins=n_bins) list_of_dfs = [feature_list_to_df(get_features_in_sample(bsc, feature), indices, feature_name=feature_name) for feature_name, feature in bf_feature_dict.items()] joint_df = reduce(lambda x, y: pd.concat([x,y.drop('bin', axis=1)], axis=1), list_of_dfs) return joint_df def annotate_df_with_pc_number(df, pc_number): '''Adds a column to df with pc_number''' return pd.concat([df, pd.Series([pc_number]*df.shape[0], name='PC')], axis=1) #TODO: think about how to do a unittest def make_feature_pc_df(bf_feature_dict, bsc, pcs, n_bins=5): '''Creates a melted pandas DataFrame for each feature in bf feature_dict and each component in pcs.shape[1] IN: bf_feature_dict - dictionary with auditory feature names as keys and shape (200,) ndarrays quantifying the feature for each BSC basis function bsc - ndarray of the Binary Sparse Coding basis function activations pcs - principal component values for each sample n_bins - number of bins for each principal component''' # test that the number of samples is the same assert pcs.shape[0] == bsc.shape[0] pc_df_list = [annotate_df_with_pc_number( make_df_for_feature_sensitivity(bf_feature_dict, bsc, component, n_bins=n_bins), i+1) for i, component in enumerate(pcs.T)] return pd.concat(pc_df_list, axis=0, ignore_index=True)