Commit b4e745a1 authored by mjboos's avatar mjboos

more stuff

parent 39f40c32
This source diff could not be displayed because it is too large. You can view the blob instead.
import numpy as np
import matplotlib.pyplot as plt
from nilearn import image as img
import pandas as pd
import joblib
import seaborn as sns
import dill
from sklearn.linear_model import LinearRegression
from copy import deepcopy
from coef_helper_functions import remove_BF_from_coefs, get_cluster_coefs_from_estimator, make_df_for_lineplot
def test_latent_space_reconstruction(feature, latent_activity,
estimator=None, **kwargs):
'''Returns the cross-validated explained variance (averaged across 8 folds) for predicting feature from latent_activity'''
from sklearn.model_selection import cross_validate
from sklearn.linear_model import RidgeCV
if estimator is None:
estimator = RidgeCV(alphas=[1e-5,1e-3,1e-1,1,1e3,1e5])
cv_result = cross_validate(estimator, latent_activity, feature, scoring='explained_variance', cv=8, **kwargs)
if 'estimator' in cv_result:
return cv_result['test_score'], cv_result['estimator']
else:
return cv_result['test_score']
def get_feature_scores(feature_dict, latent_activity, ratings_idx, estimator=None, **kwargs):
scores_dict = dict()
for label in ['Time-Frequency Separability', 'Sound level (db)', 'Speech duration (s)']:
scores_dict[label] = test_latent_space_reconstruction(feature_dict[label], latent_activity, estimator=estimator, **kwargs)
scores_dict['Noise rating'] = test_latent_space_reconstruction(feature_dict['Noise rating'], latent_activity[ratings_idx], estimator=estimator, **kwargs)
return scores_dict
def get_average_estimator():
with open('average_estimator.pkl','r') as fn:
estimator = dill.load(fn)
return estimator
def get_feature_dict():
bsc = joblib.load('../semisupervised/logBSC_H200_stimuli.pkl')
separability = joblib.load('mean_sep.pkl')
separability_pos = joblib.load('sep_of_pos_Ws_only.pkl')
separability_pos[np.isnan(separability_pos)] = 0
db = joblib.load('db_dict.pkl')
db = np.array([db[str(i)][1] for i in range(3539)])
speech_overlap = joblib.load('speech_overlap.pkl')
#pcs = joblib.load('testtest.pkl')[..., :3]
#average_pcs = pcs.mean(axis=0)
ratings_dict = joblib.load('ratings_dict.pkl')
feature_dict = {'Time-Frequency Separability' : separability, 'Sound level (db)' : db, 'Positive separability' : separability_pos,
'Speech duration (s)' : speech_overlap, 'Noise rating' : ratings_dict['ratings'], 'BSC' : bsc}
return feature_dict
def get_cluster_infos(means_file='cluster_means_reordered.pkl', idx_file='compressed_cluster_identity_reordered.pkl'):
cluster_means = joblib.load(means_file)
cluster_idx = joblib.load(idx_file)
return {'means' : cluster_means, 'index' : cluster_idx}
def get_corr_df(joint_pcs, cluster_means, cluster_idx):
corrs = [{cl:np.corrcoef(joint_pcs[:,pc], cluster_means[i])[0,1]
for i, cl in enumerate(np.unique(cluster_idx))} for pc in range(3)]
correlations = np.array([corr.values() for corr in corrs])
corr_df = pd.concat([pd.DataFrame(correlations).melt(), pd.Series(np.tile(['PC 1', 'PC 2', 'PC 3'], 7), name='PC')], axis=1, ignore_index=True)
corr_df.columns = ['Cluster', 'Correlation', 'PC']
corr_df.Cluster = corr_df.Cluster.map({0:'1', 1:'2', 2:'3', 3:'4', 4:'5',5:'6',6:'7'})
return corr_df
def cluster_reordering(cluster_means, cluster_idx):
"""Reorders cluster_means and cluster_idx and saves them as new_files"""
reorder_dict = {1:0, 0:1, 6:2, 2:3, 8:4, 9:5, 5:6}
reorder_list = [1,0,4,2,5,6,3]
cluster_means_reordered = cluster_means[reorder_list]
cluster_idx_new = deepcopy(cluster_idx)
for current, to_be in reorder_dict.items():
cluster_idx_new[cluster_idx==current] = to_be
joblib.dump(cluster_means_reordered, 'cluster_means_reordered.pkl')
joblib.dump(cluster_idx_new, 'compressed_cluster_identity_reordered.pkl')
def get_seps(features, separability, excl_idx=None):
features = np.reshape(features, (60, 200))
separabilities_sample = []
for ft in features:
if ft.any():
if excl_idx is not None:
separabilities_sample.append(np.array([separability[loc] for loc in np.where(ft)[0] if not np.isin(loc, excl_idx)]))
else:
separabilities_sample.append(separability[np.where(ft)[0]])
return np.concatenate(separabilities_sample)
#TODO: write unit test
def compute_MPS_from_STFT(BF, n_fft=882, sr=44100, fmax=8000, n_mels=48):
import librosa as lbr
mel_filters = lbr.filters.mel(sr=sr, n_fft=n_fft, fmax=fmax, n_mels=n_mels)
Ws_stft = BF.dot(mel_filters)
return compute_MPS(Ws_stft)
def compute_MPS(specgram):
return np.abs(np.fft.fftshift(np.fft.fft2(specgram)))
def compute_mps_time_and_freq_labels(n_fft=882, sr=16000, fmax=8000, n_mels=48):
'''Returns the labels for time and frequency modulation for the input parameters'''
import librosa as lbr
fft_freq = lbr.core.fft_frequencies(sr=sr, n_fft=n_fft)
freq_step = np.log(fft_freq[2]) - np.log(fft_freq[1])
time_step = sr/(n_fft/2)
mps_times = np.fft.fftshift(np.fft.fftfreq(10,
1. / 100.))
mps_freqs = np.fft.fftshift(np.fft.fftfreq(fft_freq.shape[0], freq_step))
return mps_times/10, mps_freqs
\ No newline at end of file
......@@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"execution_count": null,
"metadata": {},
"outputs": [
{
......@@ -106,7 +106,18 @@
"speech_overlap = joblib.load('speech_overlap.pkl')\n",
"mps_orig = joblib.load('MPS_orig.pkl')\n",
"mps_orig_reshaped = np.reshape(mps_orig, (200, -1))\n",
"coefs = np.reshape(np.reshape(joblib.load('coefs_bla.pkl').T, (-1,5,200))[:,:3], (-1, 200)).T"
"coefs = np.reshape(np.reshape(joblib.load('coefs_bla.pkl').T, (-1,5,200))[:,:3], (-1, 200)).T\n",
"\n",
"cluster_means = joblib.load('cluster_means.pkl')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cluster_means = joblib.load('cluster_means.pkl')"
]
},
{
......@@ -3,64 +3,40 @@ import matplotlib.pyplot as plt
from nilearn import image as img
import pandas as pd
import joblib
import seaborn as sns
import dill
from sklearn.linear_model import LinearRegression
from copy import deepcopy
def test_latent_space_reconstruction(feature, latent_activity,
estimator=None, **kwargs):
'''Returns the cross-validated explained variance (averaged across 8 folds) for predicting feature from latent_activity'''
from sklearn.model_selection import cross_validate
from sklearn.linear_model import RidgeCV
if estimator is None:
estimator = RidgeCV(alphas=[1e-5,1e-3,1e-1,1,1e3,1e5])
cv_result = cross_validate(estimator, latent_activity, feature, scoring='explained_variance', cv=8, **kwargs)
mean_test_score = cv_result['test_score'].mean()
if 'estimator' in cv_result:
return mean_test_score, cv_result['estimator']
else:
return mean_test_score
def get_feature_scores(feature_dict, latent_activity, ratings_idx, estimator=None, **kwargs):
from coef_helper_functions import remove_BF_from_coefs, get_cluster_coefs_from_estimator, make_df_for_lineplot
from auditory_feature_helpers import *
if __name__=='__main__':
feature_dict = get_feature_dict()
bsc = feature_dict.pop('BSC')
estimator = get_average_estimator()
joint_pcs = estimator.predict(bsc)
ratings_dict = joblib.load('ratings_dict.pkl')
joint_scores = get_feature_scores(feature_dict, joint_pcs, ratings_dict['ratings_idx'], estimator=LinearRegression())
cluster_dict = get_cluster_infos()
cluster_idx = cluster_dict.pop('index')
scores_dict = dict()
for label in ['Time-Frequency Separability', 'Sound level (db)', 'Speech duration (s)', 'Positive separability']:
scores_dict[label] = test_latent_space_reconstruction(feature_dict[label], latent_activity, estimator=estimator, **kwargs)
scores_dict['Noise rating'] = test_latent_space_reconstruction(feature_dict['Noise rating'], latent_activity[ratings_idx], estimator=estimator, **kwargs)
return scores_dict
def remove_BF_from_coefs(estimator, remove_bf):
'''Returns estimator with all coefficients set to zero that correspond to basis functions in remove bool'''
remove_coef = np.tile(remove_bf, 60)
estimator_new = deepcopy(estimator)
estimator_new.transformedcomp[remove_coef,:] = 0
return estimator_new
with open('average_estimator.pkl','r') as fn:
estimator = dill.load(fn)
bsc = joblib.load('../semisupervised/logBSC_H200_stimuli.pkl')
joint_pcs = estimator.predict(bsc)[:,:3]
separability = joblib.load('mean_sep.pkl')
separability_pos = joblib.load('sep_of_pos_Ws_only.pkl')
separability_pos[np.isnan(separability_pos)] = 0
db = joblib.load('db_dict.pkl')
db = np.array([db[str(i)][1] for i in range(3539)])
speech_overlap = joblib.load('speech_overlap.pkl')
#pcs = joblib.load('testtest.pkl')[..., :3]
#average_pcs = pcs.mean(axis=0)
ratings_dict = joblib.load('ratings_dict.pkl')
cluster_idx = joblib.load('cluster_identity.pkl')
feature_dict = {'Time-Frequency Separability' : separability, 'Sound level (db)' : db, 'Positive separability' : separability_pos,
'Speech duration (s)' : speech_overlap, 'Noise rating' : ratings_dict['ratings']}
joint_scores = get_feature_scores(feature_dict, joint_pcs, ratings_dict['ratings_idx'], estimator=LinearRegression())
scores_dict = dict()
for i in np.unique(cluster_idx):
pc_predictions_wo_cluster = remove_BF_from_coefs(estimator, cluster_idx==i).predict(bsc)[:,:3]
scores_dict[i] = get_feature_scores(feature_dict, pc_predictions_wo_cluster, ratings_dict['ratings_idx'], estimator=LinearRegression())
#individual_scores = [get_feature_scores(feature_dict, individual_pcs, ratings_dict['ratings_idx'], estimator=LinearRegression(), return_estimator=True) for individual_pcs in pcs]
for i in np.unique(cluster_idx):
pc_predictions_wo_cluster = remove_BF_from_coefs(estimator, cluster_idx==i).predict(bsc)[:,:3]
scores_dict[i] = get_feature_scores(feature_dict, pc_predictions_wo_cluster, ratings_dict['ratings_idx'], estimator=LinearRegression())
#individual_scores = [get_feature_scores(feature_dict, individual_pcs, ratings_dict['ratings_idx'], estimator=LinearRegression(), return_estimator=True) for individual_pcs in pcs]
joint_scores_mean = {feature : feature_arr.mean() for feature, feature_arr in joint_scores.iteritems()}
cluster_joint_diff = {cluster : {feature : joint_scores[feature]-scores_ft
for feature, scores_ft in cluster_scores.iteritems()}
for cluster, cluster_scores in scores_dict.iteritems()}
reshaped_dict = {(feature,"cluster {}".format(cluster+1)) : cluster_scores[feature] for feature in joint_scores for cluster, cluster_scores in cluster_joint_diff.iteritems()}
feature_cluster_df = pd.melt(pd.DataFrame(reshaped_dict))
feature_cluster_df.columns = ['Feature', 'Cluster', 'Difference in explained variance']
g = sns.catplot(data=feature_cluster_df, col='Feature', kind='strip', x='Difference in explained variance',
y='Cluster', col_wrap=3)
g.savefig('Differences_explained_variance_per_cluster_compressed_new.svg')
#fig, axes = plt.subplots(4,3,figsize=(15,20), constrained_layout=True)
#flat_axes = axes.flatten()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment