Source code for tfcomb.analysis

from __future__ import print_function

import pandas as pd
import numpy as np
from scipy.stats import chisquare
import multiprocessing as mp
import seaborn as sns
import matplotlib.pyplot as plt

#Internal functions
from tfcomb.logging import TFcombLogger, InputError
from tfcomb.utils import check_columns, check_type, is_symmetric

#-------------------------------------------------------------------------------#
#---------------------------- Orientation analysis -----------------------------#
#-------------------------------------------------------------------------------#

def _get_scenario_keys(TF1, TF2, scenario):
			
		scenario_keys = {
						"same" :   [(TF1, "+", TF2, "+"),
									(TF2, "+", TF1, "+"),
									(TF1, "-", TF2, "-"),
									(TF2, "-", TF1, "-")],
						"opposite": [(TF1, "+", TF2, "-"),
									(TF1, "-", TF2, "+"),
									(TF2, "+", TF1, "-"),
									(TF2, "-", TF1, "+")],
						"TF1-TF2": [(TF1, "+", TF2, "+"), 
									(TF2, "-", TF1, "-")],
						"TF2-TF1": [(TF2, "+", TF1, "+"), 
									(TF1, "-", TF2, "-")],
						"convergent": [(TF1, "+", TF2, "-"), 
									   (TF2, "+", TF1, "-")],
						"divergent": [(TF1, "-", TF2, "+"), 
									  (TF2, "-", TF1, "+")]
						}

		return(scenario_keys[scenario])

def _get_unique_pairs(pairs):

	#Remove TF2-TF1 duplicates
	seen = {}
	for pair in pairs:
		if pair[::-1] not in seen: #the reverse TF2-TF1 pair has not been seen yet
			seen[pair] = ""
	unique = list(seen.keys())

	return(unique)

[docs]def orientation(rules, verbosity=1): """ Perform orientation analysis on the TF pairs in a directional / strand-specific table. The analysis counts different scenarios depending on the input. If the input matrix is symmetric, the analysis contains two scenarios:: 1. Same: ⟝(TF1+)⟶ ⟝(TF2+)⟶ = ⟝(TF2+)⟶ ⟝(TF1+)⟶ = ⟵(TF1-)⟞ ⟵(TF2-)⟞ = ⟵(TF2-)⟞ ⟵(TF1-)⟞ 2. Opposite: ⟝(TF1+)⟶ ⟵(TF2-)⟞ = ⟝(TF2+)⟶ ⟵(TF1-)⟞ = ⟵(TF1-)⟞ ⟝(TF2+)⟶ = ⟵(TF2-)⟞ ⟝(TF1+)⟶ If the input is directional, the analysis contains four different scenarios:: 1. TF1-TF2: ⟝(TF1+)⟶ ⟝(TF2+)⟶ = ⟵(TF2-)⟞ ⟵(TF1-)⟞ 2. TF2-TF1: ⟝(TF2+)⟶ ⟝(TF1+)⟶ = ⟵(TF1-)⟞ ⟵(TF2-)⟞ 3. convergent: ⟝(TF1+)⟶ ⟵(TF2-)⟞ = ⟝(TF2+)⟶ ⟵(TF1-)⟞ 4. divergent: ⟵(TF1-)⟞ ⟝(TF2+)⟶ = ⟵(TF2-)⟞ ⟝(TF1+)⟶ Parameters ---------- rules : pd.DataFrame The .rules output of a CombObj analysis. verbosity : int A value between 0-3 where 0 (only errors), 1 (info), 2 (debug), 3 (spam debug). Default: 1. Returns -------- An OrientationAnalysis object (subclass of pd.DataFrame). The table contains frequencies of pairs related to each scenario. The dataframe has the following columns: - TF1: name of the first TF in pair - TF2: name of the second TF in pair - TF1_TF2_count: The total count of TF1-TF2 co-occurring pairs - If symmetric: - Same - Opposite - If directional: - TF1_TF2 - TF2_TF1 - convergent - divergent - std: Standard deviation of scenario frequencies - pvalue: A chi-square test to test the hypothesis that the scenarios are equally distributed """ logger = TFcombLogger(verbosity) #Check that input is dataframe check_type(rules, pd.DataFrame, "rules") rules = rules.copy() #ensures that rules-table is not changed #Split TF names from strands try: rules[["TF1_name", "TF1_strand"]] = rules["TF1"].str.split("(", expand=True) except Exception as e: raise InputError("Failed to split TF name from strand. Please ensure that .count_within() was run with '--stranded=True' and/or '--directional=True'.") rules["TF1_strand"] = rules["TF1_strand"].str.replace(")", "", regex=False) rules[["TF2_name", "TF2_strand"]] = rules["TF2"].str.split("(", expand=True) rules["TF2_strand"] = rules["TF2_strand"].str.replace(")", "", regex=False) #Establish if rules are directional rules_pivot = rules.pivot(index='TF1', columns='TF2', values='TF1_TF2_count') symmetric_bool = is_symmetric(rules_pivot) if symmetric_bool == True: scenarios = ["same", "opposite"] #2 scenarios logger.info("Rules are symmetric - scenarios counted are: {0}".format(scenarios)) #Remove duplicated pairs from analysis (TF1-TF2 = TF2-TF1) rules.set_index(["TF1", "TF2"], inplace=True) pairs = rules.index.tolist() unique = _get_unique_pairs(pairs) rules = rules.loc[unique,:] else: scenarios = ["TF1-TF2", "TF2-TF1", "convergent", "divergent"] #4 scenarios logger.info("Rules are directional - scenarios counted are: {0}".format(scenarios)) #Setup count dictionary keys = [tuple(x) for x in rules[["TF1_name", "TF1_strand", "TF2_name", "TF2_strand"]].values] counts = rules["TF1_TF2_count"].tolist() count_dict = dict(zip(keys, counts)) #Get all possible TF1-TF2 pairs pairs = list(zip(rules["TF1_name"], rules["TF2_name"])) pairs = sorted(list(set(pairs))) pairs = _get_unique_pairs(pairs) #Get counts per scenario counts = {} for (TF1, TF2) in pairs: counts[(TF1, TF2)] = {} #initialize counts per pair for scenario in scenarios: keys = _get_scenario_keys(TF1, TF2, scenario) keys = set(keys) #remove duplicate keys in case of TF1==TF2 count = np.sum([count_dict.get(key, 0) for key in keys]) #sum of counts counts[(TF1, TF2)][scenario] = count #Create dataframe frame = pd.DataFrame().from_dict(counts, orient="index") frame.index.names = ["TF1", "TF2"] frame.reset_index(inplace=True) frame = frame[frame[scenarios].sum(axis=1) > 0] #remove any scenarios with sum of 0 #Calculate chisquare p-value (are the scenarios normally distributed?) unique = frame[scenarios].drop_duplicates() #only calculate chi-square once per seen count combination - speeds up calculation mat = unique.to_numpy() rows, cols = mat.shape pvalues = [0]*rows for row in range(rows): n = mat[row,:] #counts per scenario s, p = chisquare(n) pvalues[row] = p unique["pvalue"] = pvalues #Merge unique to frame frame = frame.merge(unique, left_on=scenarios, right_on=scenarios, how="left") frame.index = frame["TF1"] + "-" + frame["TF2"] #Normalize counts to sum of 1 frame["TF1_TF2_count"] = frame[scenarios].sum(axis=1) for scenario in scenarios: frame[scenario] = frame[scenario] / frame["TF1_TF2_count"] frame[scenario] = frame[scenario].replace(np.inf, 0) #Calculate standard deviation frame["std"] = frame[scenarios].std(axis=1) #Sort by pvalue/count and reorder colums in frame frame.sort_values(["pvalue", "TF1_TF2_count"], ascending=[True, False], inplace=True) frame = frame[["TF1", "TF2", "TF1_TF2_count"] + scenarios + ["std", "pvalue"]] frame = OrientationAnalysis(frame) return(frame)
[docs]class OrientationAnalysis(pd.DataFrame): """ Analysis of the orientation of TF co-ocurring pairs """ @property def _constructor(self): return OrientationAnalysis
[docs] def plot_heatmap(self, yticklabels=False, figsize=(6,6), save=None, **kwargs): """ Plot a heatmap of orientation scenarios for the output of the orientation analysis. Parameters ----------- yticklabels : bool, optional Show yticklabels (TF-pairs) in plot. Default: False. figsize : tuple The size of the output heatmap. Default: (6,6) save : str, optional Save the plot to the file given in 'save'. Default: None. kwargs : arguments Any additional arguments are passed to sns.clustermap. Returns -------- seaborn.matrix.ClusterGrid """ scenarios = [col for col in self.columns if col not in ["TF1", "TF2", "TF1_TF2_count", "std", "pvalue"]] g = sns.clustermap(self[scenarios], col_cluster=False, yticklabels=yticklabels, cbar_kws={'label': "Fraction"}, figsize=figsize, **kwargs) n_pairs = self.shape[0] g.ax_heatmap.set_ylabel("TF-TF pairs (n={0})".format(n_pairs)) if save is not None: plt.savefig(save, dpi=600) return(g)