Source code for tfcomb.objects

#objects.py: Contains CombObj, DiffCombObj and DistObj classes
#
#@author: Mette Bentsen and Vanessa Heger
#@contact: mette.bentsen (at) mpi-bn.mpg.de
#@license: MIT


import os 
import pandas as pd
import itertools
import multiprocessing as mp
import numpy as np
import copy
import glob
import fnmatch
import dill
import collections
import math
import re

#Statistics
import qnorm #quantile normalization
from scipy.stats import linregress
from kneed import KneeLocator
import statsmodels.api as sm
from sklearn.mixture import GaussianMixture
from scipy import stats

#Modules for plotting
import matplotlib.pyplot as plt
import seaborn as sns
from tfcomb import utils

#Utilities from TOBIAS
import tobias
from tobias.utils.motifs import MotifList
from tobias.utils.regions import OneRegion, RegionList
#from tobias.utils.signals import fast_rolling_math

#TF-comb modules
import tfcomb
import tfcomb.plotting
import tfcomb.network
import tfcomb.analysis
from tfcomb.counting import count_co_occurrence
from tfcomb.logging import TFcombLogger, InputError
from tfcomb.utils import OneTFBS, Progress, check_type, check_value, check_string, unique_region_names, check_columns, check_writeability
from tfcomb.utils import *


#pd.options.mode.chained_assignment = 'raise' # for debugging
pd.options.mode.chained_assignment = None

[docs]class CombObj(): """ The main class for collecting and working with co-occurring TFs. Examples ---------- >>> C = tfcomb.objects.CombObj() # Verbosity of the output log can be set using the 'verbosity' parameter: \n >>> C = tfcomb.objects.CombObj(verbosity=2) """ #-------------------------------------------------------------------------------# #------------------------------- Getting started -------------------------------# #-------------------------------------------------------------------------------# def __init__(self, verbosity=1): #set verbosity #Function and run parameters self.verbosity = verbosity #0: error, 1:info, 2:debug, 3:spam-debug self.logger = TFcombLogger(self.verbosity) #Variables for storing input data self.prefix = None #is used when objects are added to a DiffCombObj self.TFBS = None #None or filled with list of TFBS self.TF_names = [] #list of TF names self.count_names = [] #list of counted TF names (might be different than TF_names due to strand-specific counting) #Variables for counts self.TF_counts = None #numpy array of size n_TFs self.pair_counts = None #numpy matrix of size n_TFs x n_TFs self.bg_counts_mean = None #numpy matrix of size n_TFs x n_TFs self.bg_counts_std = None #numpy matrix of size n_TFs x n_TFs #Market basket and network analysis self.rules = None #filled in by .market_basket() self.network = None #filled by .build_network() #Default parameters self.min_dist = 0 self.max_dist = 100 self.directional = False self.min_overlap = 0 self.max_overlap = 0 self.anchor = "inner" self.anchor_to_int = {"inner": 0, "outer": 1, "center": 2} #Variable for storing DistObj for distance analysis self.distObj = None def __str__(self): """ Returns a string representation of the CombObj depending on what variables are already stored """ s = "<CombObj" if self.TFBS is not None: s += ": {0} TFBS ({1} unique names)".format(len(self.TFBS), len(self.TF_names)) if self.rules is not None: s += " | Market basket analysis: {0} rules".format(self.rules.shape[0]) s += ">" return(s) def __repr__(self): return(self.__str__()) def __getstate__(self): """ Get state for pickling""" return {k:v for (k, v) in self.__dict__.items() if k != "_sites"} #do not pickle the internal _sites array def __add__(self, obj): """ Internal method to add two CombObj together using: `CombObj1 + CombObj2 = new_CombObj`. This merges the .TFBS of both objects under the hood. Parameters: ---------- obj : CombObj A CombObj to add. Returns: ---------- A merged CombObj. """ combined = CombObj(self.verbosity) #initialize empty combobj #Merge TFBS combined.TFBS = RegionList(self.TFBS + obj.TFBS) combined.TFBS.loc_sort() #sort TFBS by coordinates #Set .TF_names of the new list counts = {r.name: "" for r in combined.TFBS} combined.TF_names = sorted(list(set(counts.keys()))) #ensures that the same TF order is used across cores/subsets return(combined)
[docs] def copy(self): """ Returns a deep copy of the CombObj """ copied = copy.deepcopy(self) copied.logger = TFcombLogger(self.verbosity) #receives its own logger return(copied)
[docs] def set_verbosity(self, level): """ Set the verbosity level for logging after creating the CombObj. Parameters ---------- level : int A value between 0-3 where 0 (only errors), 1 (info), 2 (debug), 3 (spam debug). Default: 1. """ self.verbosity = level self.logger = TFcombLogger(self.verbosity) #restart logger with new verbosity
[docs] def set_prefix(self, prefix): """ Sets the .prefix variable of the object. Useful when comparing two objects in a DiffCombObj. Parameters ----------- prefix : str A string to add as .prefix for this object, e.g. 'control', 'treatment' or 'analysis1'. """ check_type(prefix, str, "prefix") self.prefix = prefix
#-------------------------------------------------------------------------------# #----------------------------- Checks for the object----------------------------# #-------------------------------------------------------------------------------# def _check_TFBS(self): """ Internal check whether the .TFBS was already filled. Raises InputError when .TFBS is not available.""" #Check that TFBS exist and that it is RegionList if self.TFBS is None or (not isinstance(self.TFBS, list) and not isinstance(self.TFBS, RegionList)): raise InputError("No TFBS available in '.TFBS'. The TFBS are set either using .TFBS_from_motifs, .TFBS_from_bed or TFBS_from_TOBIAS.") def _check_counts(self): """ Internal check whether .count_within was already run. Raises InputError when counts are not available or if counts have the wrong dimensions.""" #Check if counts were set attributes = ["TF_counts", "pair_counts"] for att in attributes: val = getattr(self, att) if val is None: raise InputError(f"Internal counts for '{att}' were not set. Please run .count_within() to obtain TF-TF co-occurrence counts.") else: n_TFs = len(self.count_names) #can be different than self.TF_names if stranded == True tfcomb.utils.check_type(val, np.ndarray, val) #raises inputerror if val is not None, but also not array size = val.shape invalid = 0 if len(size) == 1: if size != (n_TFs,): invalid = 1 elif len(size) == 2: if size != (n_TFs,n_TFs): invalid = 1 #If the sizes of arrays do not fit number of TFs if invalid == 1: err = f"Internal counts for '{att}' had shape {size} which does not fit with length of TF names ({n_TFs})." err += "Please check that .count_within() was run on the same TFs as currently set within the object" raise InputError(err) def _check_rules(self): """ Internal check whether .rules were filled. Raises InputError when .rules are not available. """ if self.rules is None or not isinstance(self.rules, pd.DataFrame): raise InputError("No market basket rules found in .rules. The rules are found by running .market_basket().")
[docs] def check_pair(self, pair): """ Checks if a pair is valid and present. Parameters ---------- pair : tuple(str,str) TF names for which the test should be performed. e.g. ("NFYA","NFYB") Raises ---------- """ #check member size if len(pair) != 2: raise InputError(f'{pair} is not valid. It should contain exactly two TF names per pair. e.g. ("NFYA","NFYB")') # check tf names are string tf1,tf2 = pair tfcomb.utils.check_type(tf1, str, "TF1 from pair") tfcomb.utils.check_type(tf2, str, "TF2 from pair") # check rules are filled if type(self) == DistObj and self.rules is None: raise InputError(".rules not filled. Please run .fill_rules() first.") # check tf1 is present within object if tf1 not in self.TF_names and tf1 not in self.count_names: #if the tf is stranded (e.g. TFNAME(+)), it could be in count_names raise InputError(f"{tf1} (TF1) is not valid as it is not present in the current object.") # check tf1 is present within object if tf2 not in self.TF_names and tf2 not in self.count_names: raise InputError(f"{tf2} (TF2) is not valid as it is not present in the current object.") if type(self) == DistObj: if len(self.rules.loc[((self.rules["TF1"] == tf1) & (self.rules["TF2"] == tf2))]) == 0: raise InputError(f"No rules for pair {tf1} - {tf2} found.")
#-------------------------------------------------------------------------------# #----------------------------- Save / import object ----------------------------# #-------------------------------------------------------------------------------#
[docs] def to_pickle(self, path): """ Save the CombObj to a pickle file. Parameters ---------- path : str Path to the output pickle file e.g. 'my_combobj.pkl'. See also --------- from_pickle """ f_out = open(path, 'wb') dill.dump(self, f_out)
[docs] def from_pickle(self, path): """ Import a CombObj from a pickle file. Parameters ----------- path : str Path to an existing pickle file to read. Raises ------- InputError If read object is not an instance of CombObj. See also ---------- to_pickle """ filehandler = open(path, 'rb') try: obj = dill.load(filehandler) except AttributeError as e: if "new_block" in str(e): s = f"It looks like the CombObj was built with pandas 1.3.x, but the current pandas version is {pd.__version__}." s += " Please rebuild the CombObj with pandas 1.2.x or upgrade pandas to 1.3.x to load the object." raise InputError(s) else: #another error during reading raise e #Check if object is CombObj if not isinstance(obj, CombObj): raise InputError("Object from '{0}' is not a CombObj".format(path)) #Overwrite self with CombObj self = obj self.set_verbosity(self.verbosity) #restart logger return(self)
#-------------------------------------------------------------------------------# #-------------------------- Setting up the .TFBS list --------------------------# #-------------------------------------------------------------------------------#
[docs] def TFBS_from_motifs(self, regions, motifs, genome, motif_pvalue=1e-05, motif_naming="name", gc=0.5, resolve_overlapping="merge", extend_bp=0, threads=1, overwrite=False, _suffix=""): #suffix to add to output motif names """ Function to calculate TFBS from motifs and genome fasta within the given genomic regions. Parameters ----------- regions : str or tobias.utils.regions.RegionList Path to a .bed-file containing regions or a tobias-format RegionList object. motifs : str or tobias.utils.motifs.MotifList Path to a file containing JASPAR/MEME-style motifs or a tobias-format MotifList object. genome : str Path to the genome fasta-file to use for scan. motif_pvalue : float, optional The pvalue threshold for the motif search. Default: 1e-05. motif_naming : str, optional How to name TFs based on input motifs. Must be one of: 'name', 'id', 'name_id' or 'id_name'. Default: "name". gc : float between 0-1, optional Set GC-content for the motif background model. Default: 0.5. resolve_overlapping : str, optional Control how to treat overlapping occurrences of the same TF. Must be one of "merge", "highest_score" or "off". If "highest_score", the highest scoring overlapping site is kept. If "merge", the sites are merged, keeping the information of the first site. If "off", overlapping TFBS are kept. Default: "merge". extend_bp : int, optional Extend input regions with 'extend_bp' before scanning. Default: 0. threads : int, optional How many threads to use for multiprocessing. Default: 1. overwrite : boolean, optional Whether to overwrite existing sites within .TFBS. Default: False (sites are appended to .TFBS). Returns ----------- None .TFBS_from_motifs fills the objects' .TFBS variable """ #Check input validity allowed_motif_naming = ["name", "id", "name_id", "id_name"] check_type(regions, [str, tobias.utils.regions.RegionList], "regions") check_type(motifs, [str, tobias.utils.motifs.MotifList], "motifs") check_type(genome, str, "genome") check_value(motif_pvalue, vmin=0, vmax=1, name="motif_pvalue") check_string(motif_naming, allowed_motif_naming, "motif_naming") check_value(gc, vmin=0, vmax=1, name="gc") check_string(resolve_overlapping, ["off", "merge", "highest_score"], "resolve_overlapping") check_value(threads, vmin=1, name="threads") check_type(overwrite, bool, "overwrite") #If previous TFBS should be overwritten or TFBS should be initialized initialized = 0 if overwrite == True or self.TFBS is None: initialized = 1 self.TFBS = RegionList() self.TF_names = [] motifs = copy.deepcopy(motifs) #ensures that original motifs are not altered #Setup regions if isinstance(regions, str): regions_f = regions regions = RegionList().from_bed(regions) self.logger.debug("Read {0} regions from {1}".format(len(regions), regions_f)) #Extend input regions if extend_bp > 0: for region in regions: region.extend_reg(extend_bp) #Setup motifs if isinstance(motifs, str): motifs_f = motifs motifs = tfcomb.utils.prepare_motifs(motifs_f, motif_pvalue, motif_naming) self.logger.debug("Read {0} motifs from '{1}'".format(len(motifs), motifs_f)) else: #set gc for motifs #bg = _ = [motif.get_threshold(motif_pvalue) for motif in motifs] _ = [motif.set_prefix(motif_naming) for motif in motifs] #Set suffix to motif names for motif in motifs: motif.prefix = motif.prefix + _suffix #Check that regions are within the genome bounds genome_obj = tfcomb.utils.open_genome(genome) check_boundaries(regions, genome_obj) genome_obj.close() #Scan for TFBS (either single-thread or with multiprocessing) self.logger.info("Scanning for TFBS with {0} thread(s)...".format(threads)) if threads == 1: n_regions = len(regions) chunks = regions.chunks(100) genome_obj = tfcomb.utils.open_genome(genome) #open pysam fasta obj n_regions_processed = 0 TFBS = RegionList([]) #initialize empty list for region_chunk in chunks: region_TFBS = tfcomb.utils.calculate_TFBS(region_chunk, motifs, genome_obj, resolve_overlapping) TFBS += region_TFBS #Update progress n_regions_processed += len(region_chunk) self.logger.debug("{0:.1f}% ({1} / {2})".format(n_regions_processed/n_regions*100, n_regions_processed, n_regions)) genome_obj.close() else: chunks = regions.chunks(100) #creates chunks of regions for multiprocessing #Setup pool pool = mp.Pool(threads) jobs = [] for region_chunk in chunks: self.logger.spam("Starting job for region_chunk of length: {0}".format(len(region_chunk))) jobs.append(pool.apply_async(tfcomb.utils.calculate_TFBS, (region_chunk, motifs, genome, resolve_overlapping))) pool.close() log_progress(jobs, self.logger) #only exits when the jobs are complete results = [job.get() for job in jobs] pool.join() #Join all TFBS to one list TFBS = RegionList(sum(results, [])) self.logger.info("Processing scanned TFBS") #Join and process TFBS self.TFBS += TFBS self.TFBS.loc_sort() TF_names = unique_region_names(TFBS) self.TF_names = sorted(list(set(self.TF_names + TF_names))) #Resolve any leftover overlaps between jobs if resolve_overlapping != "off": TFBS = tfcomb.utils.resolve_overlaps(TFBS, how=resolve_overlapping) #Final info log self.logger.info("Identified {0} TFBS ({1} unique names) within given regions".format(len(TFBS), len(TF_names))) if initialized == 0: #if sites were added, log the updated self.logger.info("The attribute .TFBS now contains {0} TFBS ({1} unique names)".format(len(self.TFBS), len(self.TF_names)))
[docs] def TFBS_from_bed(self, bed_file, overwrite=False): """ Fills the .TFBS attribute using a precalculated set of binding sites e.g. from ChIP-seq. Parameters ------------- bed_file : str A path to a .bed-file with precalculated binding sites. The 4th column of the file should contain the name of the TF in question. overwrite : boolean Whether to overwrite existing sites within .TFBS. Default: False (sites are appended to .TFBS). Returns ----------- None The .TFBS variable is filled in place """ #Check input parameters check_type(bed_file, str, "bed_file") check_type(overwrite, bool, "overwrite") #If previous TFBS should be overwritten or TFBS should be initialized initialized = 0 if overwrite == True or self.TFBS is None: self.TFBS = RegionList() self.TF_names = [] initialized = 1 #Read sites from file self.logger.info("Reading sites from '{0}'...".format(bed_file)) read_TFBS = RegionList([OneTFBS(region) for region in RegionList().from_bed(bed_file)]) #Add TFBS to internal .TFBS list and process self.logger.info("Processing sites") self.TFBS += read_TFBS self.TFBS.loc_sort() read_TF_names = unique_region_names(read_TFBS) self.TF_names = sorted(list(set(self.TF_names + read_TF_names))) #Final info log self.logger.info("Read {0} sites ({1} unique names)".format(len(read_TFBS), len(read_TF_names))) if initialized == 0: #if sites were added, log the updated self.logger.info("The attribute .TFBS now contains {0} TFBS ({1} unique names)".format(len(self.TFBS), len(self.TF_names)))
[docs] def TFBS_from_TOBIAS(self, bindetect_path, condition, overwrite=False): """ Fills the .TFBS variable with pre-calculated bound binding sites from TOBIAS BINDetect. Parameters ----------- bindetect_path : str Path to the BINDetect-output folder containing <TF1>, <TF2>, <TF3> (...) folders. condition : str Name of condition to use for fetching bound sites. overwrite : boolean Whether to overwrite existing sites within .TFBS. Default: False (sites are appended to .TFBS). Returns ----------- None The .TFBS variable is filled in place Raises ------- InputError If no files are found in path or if condition is not one of the avaiable conditions. """ #Check input check_type(bindetect_path, str, "bindetect_path") check_type(condition, str, "condition") check_type(overwrite, bool, "overwrite") #If previous TFBS should be overwritten or TFBS should be initialized initialized = 0 if overwrite == True or self.TFBS is None: self.TFBS = RegionList() self.TF_names = [] initialized = 1 #Grep for files within given path pattern = os.path.join(bindetect_path, "*", "beds", "*_bound.bed") files = glob.glob(pattern) if len(files) == 0: raise InputError("No '_bound'-files were found in path. Please ensure that the given path is the output of TOBIAS BINDetect.") #Check if condition given is within available_conditions available_conditions = set([os.path.basename(f).split("_")[-2] for f in files]) if condition not in available_conditions: raise InputError("Condition must be one of: {0}".format(list(available_conditions))) #Read sites from files condition_pattern = os.path.join(bindetect_path, "*", "beds", "*" + condition + "_bound.bed") condition_files = fnmatch.filter(files, condition_pattern) TFBS = RegionList() for f in condition_files: TFBS += RegionList([OneTFBS(region) for region in RegionList().from_bed(f)]) #Add TFBS to internal .TFBS list and process self.TFBS += TFBS self.TFBS.loc_sort() read_TF_names = unique_region_names(TFBS) self.TF_names = sorted(list(set(self.TF_names + read_TF_names))) self.logger.info("Read {0} sites ({1} unique names) from condition '{2}'".format(len(TFBS), len(read_TF_names), condition)) if initialized == 0: self.logger.info("The attribute .TFBS now contains {0} TFBS ({1} unique names)".format(len(self.TFBS), len(self.TF_names)))
#-------------------------------------------------------------------------------------------------------------# #----------------------------------- Filtering and processing of TFBS ----------------------------------------# #-------------------------------------------------------------------------------------------------------------#
[docs] def cluster_TFBS(self, threshold=0.5, merge_overlapping=True): """ Cluster TFBS based on overlap of individual binding sites. This can be used to pre-process motif-derived TFBS into TF "families" of TFs with similar motifs. This changes the .name attribute of each site within .TFBS to represent the cluster (or the original TF name if no cluster was found). Parameters ------------ threshold : float from 0-1, optional The threshold to set when clustering binding sites. Default: 0.5. merge_overlapping : bool, optional Whether to merge overlapping sites following clustering. If True, overlapping sites from the same cluster will be merged to one site (spanning site1-start -> site2-end). If False, the original sites (but with updated names) will be kept in .TFBS. Default: True. Returns -------- None The .TFBS names are updated in place. """ #Check given input self._check_TFBS() tfcomb.utils.check_value(threshold, vmin=0, vmax=1, name="threshold") tfcomb.utils.check_type(merge_overlapping, bool, "merge_overlapping") #Calculate overlap of TFBS overlap_dict = self.TFBS.count_overlaps() #Setup cluster object clust = tobias.utils.regions.RegionCluster(overlap_dict) clust.cluster(threshold=threshold) #Create name -> new name dict conversion = {} for cluster_idx in clust.clusters: members = clust.clusters[cluster_idx]["member_names"] if len(members) > 1: new_name = "/".join(sorted(members)) for member in members: conversion[member] = new_name else: conversion[members[0]] = members[0] #Convert TFBS names for site in self.TFBS: site.name = conversion[site.name] #Handle overlapping sites within the same clusters if merge_overlapping: n_sites_orig = len(self.TFBS) self.TFBS = tfcomb.utils.merge_self_overlaps(self.TFBS) self.logger.info("merge_overlapping == True: Merged {0} .TFBS to {1} non-overlapping sites".format(n_sites_orig, len(self.TFBS))) #Update names of TFBS n_names_orig = len(self.TF_names) #number of unique names before clustering self.TF_names = unique_region_names(self.TFBS) #unique names after clustering n_names_clustered = len(self.TF_names) self.logger.info("TFBS were clustered from {0} to {1} unique names. The new TF names can be seen in <CombObj>.TFBS and <CombObj>.TF_names.",format(n_names_orig, n_names_clustered))
[docs] def subset_TFBS(self, names=None, regions=None): """ Subset .TFBS in object to specific regions or TF names. Can be used to select only a subset of TFBS (e.g. only in promoters) to run analysis on. Note: Either 'names' or 'regions' must be given - not both. Parameters ----------- names : list of strings, optional A list of names to keep. Default: None. regions : str or RegionList, optional Path to a .bed-file containing regions or a tobias-format RegionList object. Default: None. Returns ------- None The .TFBS attribute is updated in place. """ #Check given input self._check_TFBS() if (names is None and regions is None) or (names is not None and regions is not None): raise InputError("You must give either 'names' or 'regions' to .subset_TFBS.") #Subset TFBS based on input if regions is not None: tfcomb.utils.check_type(regions, [str, tobias.utils.regions.RegionList], "regions") #If regions are string, read to internal format if isinstance(regions, str): regions = RegionList().from_bed(regions) #Get indices of overlapping sites n_TFBS = len(self.TFBS) self.logger.info("Overlapping {0} TFBS with {1} regions".format(n_TFBS, len(regions))) TFBS_overlap_labeled = tfcomb.utils.add_region_overlap(self.TFBS, regions, att="overlap") #adds overlap boolean to TFBS idx = [i for i, site in enumerate(TFBS_overlap_labeled) if site.overlap == True] self.TFBS = RegionList([self.TFBS[i] for i in idx]) elif names is not None: tfcomb.utils.check_type(names, [list, set, tuple], "names") #Check that strings overlap with TFBS names = set(names) #input names to overlap TF_names = set(self.TF_names) #names from object not_in_TFBS = names - TF_names if len(not_in_TFBS) > 0: self.logger.warning("{0} names from 'names' were not found in <CombObj> names and could therefore not be selected. These names are: {1}".format(len(not_in_TFBS), not_in_TFBS)) in_TFBS = TF_names.intersection(names) if len(in_TFBS) == 0: raise InputError("No overlap found between 'names' and names from <CombObj>.TFBS. Please select names of TFs within the data.") self.TFBS = RegionList([site for site in self.TFBS if site.name in in_TFBS]) self.TF_names = unique_region_names(self.TFBS) #unique names after clustering self.logger.info("Subset finished! The attribute .TFBS now contains {0} sites.".format(len(self.TFBS)))
[docs] def TFBS_to_bed(self, path): """ Writes out the .TFBS regions to a .bed-file. This is a wrapper for the tobias.utils.regions.RegionList().write_bed() utility. Parameters ---------- path : str File path to write .bed-file to. """ #Check input self._check_TFBS() check_writeability(path) #Call the .write_bed utility from tobias f = open(path, "w") s = "\n".join([str(site) for site in self.TFBS]) + "\n" f.write(s) f.close()
def _prepare_TFBS(self, force=False): """ Prepare the TFBS for internal counting within count_within. Sets the internal ._sites attribute. Checks the existence and correct length of _sites, and only creates it if not already saved. Set 'force' to True to force recalculation. """ #Find out if _sites should be set create = 1 if hasattr(self, "_sites"): if len(self.TFBS) == len(self._sites): create = 0 else: self.logger.debug("Length of .TFBS ({0}) is different than existing ._sites ({1}) attribute. Recreating ._sites.".format(len(self.TFBS), len(self._sites))) if create == 1 or force == True or not hasattr(self, "name_to_idx"): self.logger.info("Setting up binding sites for counting") chromosomes = {site.chrom:"" for site in self.TFBS}.keys() chrom_to_idx = {chrom: idx for idx, chrom in enumerate(chromosomes)} self.name_to_idx = {name: idx for idx, name in enumerate(self.TF_names)} self._sites = np.array([(chrom_to_idx[site.chrom], site.start, site.end, self.name_to_idx[site.name]) for site in self.TFBS]) #numpy integer array @staticmethod def _get_sort_idx(sites, anchor="center"): """ Get indices for sorting sites (from _prepare_TFBS) depending on anchor. Parameters ----------- sites : np.array The ._sites array prepared by _prepare_TFBS. anchor : str The anchor for calculating distances. One of "inner", "outer" or "center". Default: "center". """ if anchor == "center": sort_idx = [i for i, _ in sorted(enumerate(sites), key=lambda tup: (tup[1][0], int((tup[1][1] + tup[1][2]) / 2)))] elif anchor == "inner" or anchor == "outer": sort_idx = [i for i, _ in sorted(enumerate(sites), key=lambda tup: (tup[1][0], tup[1][1], tup[1][2]))] return sort_idx #-------------------------------------------------------------------------------------------------------------# #----------------------------------------- Counting co-occurrences -------------------------------------------# #-------------------------------------------------------------------------------------------------------------#
[docs] def count_within(self, min_dist=0, max_dist=100, min_overlap=0, max_overlap=0, stranded=False, directional=False, binarize=False, anchor="inner", n_background=50, threads=1): """ Count co-occurrences between TFBS. This function requires .TFBS to be filled by either `TFBS_from_motifs`, `TFBS_from_bed` or `TFBS_from_tobias`. This function can be followed by .market_basket to calculate association rules. Parameters ----------- min_dist : int Minimum distance between two TFBS to be counted as co-occurring. Distances are calculated depending on the 'anchor' given. Default: 0. max_dist : int Maximum distance between two TFBS to be counted as co-occurring. Distances are calculated depending on the 'anchor' given. Default: 100. min_overlap : float between 0-1, optional Minimum overlap fraction needed between sites, e.g. 0 = no overlap needed, 1 = full overlap needed. Default: 0. max_overlap : float between 0-1, optional Controls how much overlap is allowed for individual sites. A value of 0 indicates that overlapping TFBS will not be saved as co-occurring. Float values between 0-1 indicate the fraction of overlap allowed (the overlap is always calculated as a fraction of the smallest TFBS). A value of 1 allows all overlaps. Default: 0 (no overlap allowed). stranded : bool Whether to take strand of TFBSs into account. Default: False. directional : bool Decide if direction of found pairs should be taken into account, e.g. whether "<---TF1---> <---TF2--->" is only counted as TF1-TF2 (directional=True) or also as TF2-TF1 (directional=False). Default: False. binarize : bool, optional Whether to count a TF1-TF2 more than once per window (e.g. in the case of "<TF1> <TF2> <TF2> (...)"). Default: False. anchor : str, optional The anchor to use for calculating distance. Must be one of ["inner", "outer", "center"] n_background : int, optional Number of random co-occurrence backgrounds to obtain. This number effects the runtime of .count_within, but 'threads' can be used to speed up background calculation. Default: 50. threads : int, optional Number of threads to use. Default: 1. Returns ---------- None Fills the object variables .TF_counts and .pair_counts. Raises -------- ValueError If .TFBS has not been filled. """ anchor_str_to_int = {"inner": 0, "outer": 1, "center": 2} #Check input parameters self._check_TFBS() tfcomb.utils.check_value(min_dist, vmin=0, integer=True, name="min_dist") tfcomb.utils.check_value(max_dist, vmin=min_dist, integer=True, name="max_dist") tfcomb.utils.check_value(min_overlap, vmin=0, vmax=1, name="min_overlap") tfcomb.utils.check_value(max_overlap, vmin=0, vmax=1, name="max_overlap") tfcomb.utils.check_type(stranded, bool, "stranded") tfcomb.utils.check_type(directional, bool, "directional") tfcomb.utils.check_type(binarize, bool, "binarize") tfcomb.utils.check_string(anchor, list(anchor_str_to_int.keys()), "anchor") #Update object variables self.rules = None #Remove .rules if market_basket() was previously run self.n_TFBS = len(self.TFBS) #number of TFBS counted self.min_dist = min_dist self.max_dist = max_dist self.min_overlap = min_overlap self.max_overlap = max_overlap self.stranded = stranded self.directional = directional self.binarize = binarize self.anchor = anchor #Prepare TFBS in the correct format self._prepare_TFBS() sites = self._sites #sites still points to ._sites #Should strand be taken into account? if stranded == True: sites = self._sites.copy() #don't change self._sites name_to_idx = {} TF_names = [] #names in order of idx current_idx = 0 #initialize index at 0 for i, site in enumerate(self.TFBS): name = "{0}({1})".format(site.name, site.strand) if name not in name_to_idx: TF_names.append(name) name_to_idx[name] = current_idx current_idx += 1 #increment for next name sites[i][-1] = name_to_idx[name] #set new idx based on stranded name else: TF_names = self.TF_names self.logger.spam("TF_names: {0}".format(TF_names)) #Sort sites by mid if anchor is center: if anchor == "center": sort_idx = self._get_sort_idx(sites, anchor="center") sites = sites[sort_idx, :] #---------- Count co-occurrences within TFBS ---------# self.logger.info("Counting co-occurrences within sites") n_names = len(TF_names) anchor_int = anchor_str_to_int[anchor] TF_counts, pair_counts = count_co_occurrence(sites, min_dist=min_dist, max_dist=max_dist, min_overlap=min_overlap, max_overlap=max_overlap, binarize=binarize, anchor=anchor_int, n_names=n_names) pair_counts = tfcomb.utils.make_symmetric(pair_counts) if directional == False else pair_counts #Deal with directionality self.count_names = TF_names #this can be different from TF_names if stranded == True self.TF_counts = TF_counts self.pair_counts = pair_counts #---------- Count co-occurrences within shuffled background ---------# if n_background > 0: self.logger.info("Counting co-occurrence within background") parameters = ["min_dist", "max_dist", "min_overlap", "max_overlap", "binarize", "directional"] kwargs = {param: getattr(self, param) for param in parameters} kwargs["anchor"] = anchor_int kwargs["n_names"] = n_names #this is not necessarily the length of self.TF_names! #setup multiprocessing if threads == 1: l = [] #list of background pair_counts self.logger.info("Running with multiprocessing threads == 1. To change this, give 'threads' in the parameter of the function.") p = Progress(n_background, 10, self.logger) #Setup progress object for i in range(n_background): l.append(tfcomb.utils.calculate_background(sites, i, **kwargs)) p.write_progress(i) else: #Setup pool pool = mp.Pool(threads) #Add job per background iteration jobs = [] for i in range(n_background): self.logger.spam("Adding job for i = {0}".format(i)) args = (sites, i) #sites, seed job = pool.apply_async(tfcomb.utils.calculate_background, args, kwargs) jobs.append(job) pool.close() log_progress(jobs, self.logger) #only exits when the jobs are complete self.logger.debug("Fetching jobs from pool") l = [job.get() for job in jobs] pool.join() #Calculate z-score per pair stacked = np.stack(l) #3-dimensional array of stacked pair_counts from background stds = np.std(stacked, axis=0) stds[stds == 0] = np.nan #possible divide by zero in zscore calculation means = np.mean(stacked, axis=0) z = (pair_counts - means)/stds #pair_counts are the true osberved co-occurrences #Handle NaNs introduced in zscore calculation z[np.isnan(z) & (pair_counts == means)] = 0 z[np.isnan(z) & (pair_counts > means)] = np.inf z[np.isnan(z) & (pair_counts < means)] = -np.inf self.zscore = z else: self.logger.info("n_background is set to 0; z-score calculation will be skipped") self.logger.info("Done finding co-occurrences! Run .market_basket() to estimate significant pairs")
[docs] def get_pair_locations(self, pair, TF1_strand = None, TF2_strand = None, **kwargs): """ Get genomic locations of a particular TF pair. Requires .TFBS to be filled. If 'count_within' was run, the parameters used within the latest 'count_within' run are used. Else, the default values of tfcomb.utils.get_pair_locations() are used. Both options can be overwritten by setting kwargs. Parameters ---------- pair : tuple Name of TF1, TF2 in pair. TF1_strand : str, optional Strand of TF1 in pair. Default: None (strand is not taken into account). TF2_strand : str, optional Strand of TF2 in pair. Default: None (strand is not taken into account). kwargs : arguments Any additional arguments are passed to tfcomb.utils.get_pair_locations. Returns ------- tfcomb.utils.TFBSPairList """ #Check input parameters self.check_pair(pair) self._check_TFBS() self._prepare_TFBS() #prepare TFBS sites if not already existing self.logger.debug("kwargs given in function: {0}".format(kwargs)) #If not set, fill in kwargs with internal arguments set by count_within() attributes = ["min_dist", "max_dist", "directional", "min_overlap", "max_overlap", "anchor"] for att in attributes: if hasattr(self, att): if not att in kwargs: kwargs[att] = getattr(self, att) self.logger.debug("kwargs for get_pair_locations: {0}".format(kwargs)) TF1, TF2 = pair TF1_int = self.name_to_idx[TF1] TF2_int = self.name_to_idx[TF2] anchor_string = kwargs["anchor"] self.logger.debug(f"TF1: {TF1}={TF1_int}, TF2: {TF2}={TF2_int}") # Sort sites based on the anchor position self.logger.debug("Sorting sites based on anchor '{0}'".format(anchor_string)) sites = self._sites if anchor_string == "center": sort_idx = self._get_sort_idx(sites, anchor=anchor_string) idx_to_original = {idx: original_idx for idx, original_idx in enumerate(sort_idx)} sites = sites[sort_idx, :] # Get locations via counting function self.logger.debug("Getting locations for {0}-{1} with anchor '{2}'".format(TF1, TF2, anchor_string)) kwargs["anchor"] = self.anchor_to_int[anchor_string] # convert anchor string to int kwargs["n_names"] = max(self.name_to_idx.values()) + 1 # need at least the number of names as the highest index (for intializing the matrix) self.logger.spam(f"kwargs for count_co_occurrence: {kwargs}") idx_mat = tfcomb.counting.count_co_occurrence(sites, task=3, rules=[(TF1_int, TF2_int)], **kwargs) n_locations = idx_mat.shape[0] #Fetch locations from TFBS list self.logger.debug("Fetching locations from .TFBS") locations = tfcomb.utils.TFBSPairList([None]*n_locations) if anchor_string == "center": for i in range(n_locations): site1_idx = idx_mat[i, 0] #location in sorted sites site1_idx = idx_to_original[site1_idx] #original idx in self.TFBS (before sorting) site2_idx = idx_mat[i, 1] site2_idx = idx_to_original[site2_idx] #Fetch locations in .TFBS site1 = self.TFBS[site1_idx] site2 = self.TFBS[site2_idx] locations[i] = TFBSPair(TFBS1=site1, TFBS2=site2, anchor=anchor_string) else: for i in range(n_locations): site1_idx = idx_mat[i, 0] #no need to convert idx back to original, since sites were not sorted site2_idx = idx_mat[i, 1] #Fetch locations in .TFBS site1 = self.TFBS[site1_idx] site2 = self.TFBS[site2_idx] locations[i] = TFBSPair(TFBS1=site1, TFBS2=site2, anchor=anchor_string) #Check strandedness if TF1_strand != None or TF2_strand != None: drop = [] #collect indices to drop from locations for i, pair in enumerate(locations): if TF1_strand != None and ((pair[0].name == TF1 and pair[0].strand != TF1_strand) or (pair[1].name == TF1 and pair[1].strand != TF1_strand)): drop.append(i) if TF2_strand != None and ((pair[0].name == TF2 and pair[0].strand != TF2_strand) or (pair[1].name == TF2 and pair[1].strand != TF2_strand)): drop.append(i) drop = set(drop) #remove any duplicate indices locations = tfcomb.utils.TFBSPairList([pair for i, pair in enumerate(locations) if i not in drop]) return(locations)
#-----------------------------------------------------------------------------------------# #-------------------------------- Market basket analysis ---------------------------------# #-----------------------------------------------------------------------------------------#
[docs] def market_basket(self, measure="cosine", threads=1, keep_zero=False, n_baskets=1e6, _show_columns=["TF1_TF2_count", "TF1_count", "TF2_count"]): """ Runs market basket analysis on the TF1-TF2 counts. Requires prior run of .count_within(). Parameters ----------- measure : str or list of strings, optional The measure(s) to use for market basket analysis. Can be any of: ["cosine", "confidence", "lift", "jaccard"]. Default: 'cosine'. threads : int, optional Threads to use for multiprocessing. This is passed to .count_within() in case the <CombObj> does not contain any counts yet. Default: 1. keep_zero : bool, optional Whether to keep rules with 0 occurrences in .rules table. Default: False (remove 0-rules). n_baskets : int, optional The number of baskets used for calculating market basket measures. Default: 1e6. Raises ------- InputError If the measure given is not within available measures. """ #Check given input check_value(threads, vmin=1, name="threads") available_measures = ["cosine", "confidence", "lift", "jaccard"] if isinstance(measure, str): measure = [measure] for m in measure: tfcomb.utils.check_string(m, available_measures) #Check that TF counts are available; otherwise calculate counts try: self._check_counts() except InputError as e: print(e) self.logger.warning("No counts found in <CombObj>. Running <CombObj>.count_within() with standard parameters.") self.count_within(threads=threads) #Check show columns; these are the columns which will be shown in .rules output available = ["TF1_TF2_count", "TF1_count", "TF2_count", "n_baskets", "TF1_TF2_support", "TF1_support", "TF2_support"] for col in _show_columns: tfcomb.utils.check_string(col, available) ##### Calculate market basket analysis ##### #Convert pair counts to table and convert to long format pair_counts_table = pd.DataFrame(self.pair_counts, index=self.count_names, columns=self.count_names) #size n x n TFs pair_counts_table["TF1"] = pair_counts_table.index table = pd.melt(pair_counts_table, id_vars=["TF1"], var_name=["TF2"], value_name="TF1_TF2_count") #long format (TF1, TF2, value) #Add TF single counts to table vals = zip(self.count_names, self.TF_counts) single_counts = pd.DataFrame(vals, columns=["TF", "count"]) tf1_counts = single_counts.rename(columns={"TF": "TF1", "count":"TF1_count"}) tf2_counts = single_counts.rename(columns={"TF": "TF2", "count":"TF2_count"}) table = table.merge(tf1_counts).merge(tf2_counts) #Calculate support table["n_baskets"] = n_baskets table["TF1_TF2_support"] = table["TF1_TF2_count"] / table["n_baskets"] table["TF1_support"] = table["TF1_count"] / table["n_baskets"] table["TF2_support"] = table["TF2_count"] / table["n_baskets"] #Calculate association metric: if isinstance(measure, str): measure = [measure] for metric in measure: if metric == "cosine": table["cosine"] = table["TF1_TF2_support"] / np.sqrt(table["TF1_support"] * table["TF2_support"]) elif metric == "confidence": table["confidence"] = table["TF1_TF2_support"] / table["TF1_support"] elif metric == "lift": table["lift"] = table["confidence"] / table["TF2_support"] elif metric == "jaccard": table["jaccard"] = table["TF1_TF2_support"] / (table["TF1_support"] + table["TF2_support"] - table["TF1_TF2_support"]) else: raise InputError("Measure '{0}' is invalid. The measure must be one of: {1}".format(metric, available_measures)) #Remove rows with TF1_TF2_count == 0 if keep_zero == False: table = table[table["TF1_TF2_count"] != 0] #Sort for highest measure pairs table.sort_values([measure[0], "TF1"], ascending=[False, True], inplace=True) #if two pairs have equal measure, sort by TF1 name table.reset_index(inplace=True, drop=True) #Add z-score per pair if hasattr(self, "zscore"): zscore_table = pd.DataFrame(self.zscore, index=self.count_names, columns=self.count_names) #size n x n TFs zscore_table["TF1"] = zscore_table.index ztable_table_long = pd.melt(zscore_table, id_vars=["TF1"], var_name=["TF2"], value_name="zscore") #long format (TF1, TF2, value) table = table.merge(ztable_table_long) measure += ["zscore"] #show zscore in output #Create internal node table for future network analysis TF1_table = table[["TF1", "TF1_count"]].set_index("TF1", drop=False).drop_duplicates() TF2_table = table[["TF2", "TF2_count"]].set_index("TF2", drop=False).drop_duplicates() self.TF_table = TF1_table.merge(TF2_table, left_index=True, right_index=True) #Set name of index for table table.index = table["TF1"] + "-" + table["TF2"] #Subset to _show_columns table = table[["TF1", "TF2"] + _show_columns + measure] #Market basket is done; save to .rules self.rules = table self.logger.info("Market basket analysis is done! Results are found in <CombObj>.rules")
#-----------------------------------------------------------------------------------------# #------------------------------ Selecting significant rules ------------------------------# #-----------------------------------------------------------------------------------------#
[docs] def reduce_TFBS(self): """ Reduce TFBS to the TFs present in .rules. Returns -------- None - changes .TFBS in place""" if hasattr(self, "TFBS"): #This is false for DiffCombObj, which is also using the function #Get names from rules self.logger.debug("Getting names") selected_names = list(set(self.rules["TF1"].tolist() + self.rules["TF2"].tolist())) #Do the TFs contain strand information? Collect this for subsetting name_strands = {} clean_names = [] for name in selected_names: m = re.match("(.+)\(([+-.])\)$", name) #check if name is "NAME(+/./-)" if m is not None: name_clean = m.group(1) #name without strand clean_names.append(name_clean) name_strands[name_clean] = name_strands.get(name_clean, set()) | set([m.group(2)]) #add strand to set for this name else: clean_names.append(name) #name is already clean of strand info #Subset TFBS using name and strand information self.logger.debug("Setting TFBS in new object") clean_names = set(clean_names) #check against set is faster than list self.TFBS = RegionList([site for site in self.TFBS if (site.name in clean_names) and (site.strand in name_strands.get(site.name, {site.strand}))]) #comparison for strand is a set #if site is not in name_strands, site.strand is compared to itself #Update TF names as well (names without strands) self.TF_names = [name for name in self.TF_names if name in clean_names]
[docs] def simplify_rules(self): """ Simplify rules so that TF1-TF2 and TF2-TF1 pairs only occur once within .rules. This is useful for association metrics such as 'cosine', where the association of TF1->TF2 equals TF2->TF1. This function keeps the first unique pair occurring within the rules table. """ self._check_rules() #Go through pairs and check which to keep tuples = self.rules[["TF1", "TF2"]].to_records(index=False).tolist() seen = {} idx_to_keep = {} for i, tup in enumerate(tuples): reverse = (tup[1],tup[0]) if reverse in seen: #the reverse was already found previously pass #do not keep else: seen[tup] = True idx_to_keep[i] = True #Create simplified table using idx_to_keep sub_rules = self.rules.iloc[list(idx_to_keep.keys())] self.rules = sub_rules #overwrite .rules with simplified rules
[docs] def select_TF_rules(self, TF_list, TF1=True, TF2=True, reduce_TFBS=True, inplace=False, how="inner"): """ Select rules based on a list of TF names. The parameters TF1/TF2 can be used to select for which TF to create the selection on (by default: both TF1 and TF2). Parameters ------------ TF_list : list List of TF names fitting to TF1/TF2 within .rules. TF1 : bool, optional Whether to subset the rules containing 'TF_list' TFs within "TF1". Default: True. TF2 : bool, optional Whether to subset the rules containing 'TF_list' TFs within "TF2". Default: True. reduce_TFBS : bool, optional Whether to reduce the .TFBS of the new object to the TFs remaining in `.rules` after selection. Setting this to 'False' will improve speed, but also increase memory consumption. Default: True. inplace : bool, optional Whether to make selection on current CombObj. If False, how: string, optional How to join TF1 and TF2 subset. Default: inner Raises -------- InputError If both TF1 and TF2 are False or if no rules were selected based on input. Returns -------- If inplace == False; tfcomb.objects.CombObj() An object containing a subset of <Combobj>.rules. if inplace == True; Returns None """ #Check input self._check_rules() check_type(TF_list, list, name="TF_list") check_type(TF1, bool, "TF1") check_type(TF2, bool, "TF2") check_string(how, ["left", "right", "outer", "inner", "cross"]) #Create selected subset selected = self.rules if TF1 == False and TF2 == False: raise InputError("Either TF1 or TF2 must be True in order to create a selection.") #Create selections for TF1/TF2 selections = [] for (TF_bool, TF_col) in zip([TF1, TF2], ["TF1", "TF2"]): if TF_bool == True: #Write out any TFs from TF_list not in TF_col names not_found = set(TF_list) - set(self.rules[TF_col]) if len(not_found) > 0: self.logger.warning("{0}/{1} names in 'TF_list' were not found within '{2}' names: {3}".format(len(not_found), len(TF_list), TF_col, list(not_found))) #Create selection selected_bool = self.rules[TF_col].isin(TF_list) selected = self.rules[selected_bool] selections.append(selected) #Join selections from TF1 and TF2 if len(selections) > 1: selected = selections[0].merge(selections[1], how=how) else: selected = selections[0] #Set index of selected selected.index = selected["TF1"] + "-" + selected["TF2"] #Stop if no rules were able to be selected if len(selected) == 0: raise InputError("No rules could be selected - please adjust TF_list and/or TF1/TF2 parameters.") self.logger.info("Selected {0} rules".format(len(selected))) #Create new object with selected rules (or filter current object) if inplace == True: self.rules = selected self.network = None #Reduce the TFBS and TF_names if reduce_TFBS == True: self.reduce_TFBS() return(None) else: #create copy of object self.logger.info("Creating subset of object") new_obj = self.copy() new_obj.rules = selected new_obj.network = None #Reduce the TFBS and TF_names if reduce_TFBS == True: new_obj.reduce_TFBS() return(new_obj)
[docs] def select_custom_rules(self, custom_list, reduce_TFBS=True): """ Select rules based on a custom list of TF pairs. Parameters ------------ custom_list : list of strings List of TF pairs (e.g. a string "TF1-TF2") fitting to TF1/TF2 combination within .rules. reduce_TFBS : bool, optional Whether to reduce the .TFBS of the new object to the TFs remaining in `.rules` after selection. Setting this to 'False' will improve speed, but also increase memory consumption. Default: True. Returns -------- tfcomb.objects.CombObj() An object containing a subset of <Combobj>.rules """ #Check input self._check_rules() check_type(custom_list, list, name="TF_list") #check_type(custom_list[0], [tuple, list], name="Pairs") #Create selected subset selected = self.rules.copy() selected = selected.loc[custom_list] #Create new object with selected rules new_obj = self.copy() new_obj.rules = selected new_obj.network = None if reduce_TFBS == True: new_obj.reduce_TFBS() return(new_obj)
[docs] def select_top_rules(self, n, reduce_TFBS=True): """ Select the top 'n' rules within .rules. By default, the .rules are sorted for the measure value, so n=100 will select the top 100 highest values for the measure (e.g. cosine). Parameters ----------- n : int The number of rules to select. reduce_TFBS : bool, optional Whether to reduce the .TFBS of the new object to the TFs remaining in `.rules` after selection. Setting this to 'False' will improve speed, but also increase memory consumption. Default: True. Returns -------- tfcomb.objects.CombObj() An object containing a subset of <Combobj>.rules """ #Check input types self._check_rules() tfcomb.utils.check_type(n, int, "n") #Select top n_rules from .rules selected = self.rules.copy() selected = selected[:n] #Create new object with selected rules new_obj = self.copy() new_obj.rules = selected new_obj.network = None if reduce_TFBS == True: new_obj.reduce_TFBS() return(new_obj)
[docs] def select_significant_rules(self, x="cosine", y="zscore", x_threshold=None, x_threshold_percent=0.05, y_threshold=None, y_threshold_percent=0.05, reduce_TFBS=True, plot=True, **kwargs): """ Make selection of rules based on distribution of x/y-measures Parameters ----------- x: str, optional The name of the column within .rules containing the measure to be selected on. Default: 'cosine'. y : str, optional The name of the column within .rules containing the pvalue to be selected on. Default: 'zscore' x_threshold : float, optional A minimum threshold for the x-axis measure to be selected. If None, the threshold will be estimated from the data. Default: None. x_threshold_percent : float between 0-1, optional If x_threshold is not given, x_threshold_percent controls the strictness of the automatic threshold selection. Default: 0.05. y_threshold : float, optional A minimum threshold for the y-axis measure to be selected. If None, the threshold will be estimated from the data. Default: None. y_threshold_percent : float between 0-1, optional If y_threshold is not given, y_threshold_percent controls the strictness of the automatic threshold selection. Default: 0.05. reduce_TFBS : bool, optional Whether to reduce the .TFBS of the new object to the TFs remaining in `.rules` after selection. Setting this to 'False' will improve speed, but also increase memory consumption. Default: True. plot : bool, optional Whether to show the 'measure vs. pvalue'-plot or not. Default: True. kwargs : arguments Additional arguments are forwarded to tfcomb.plotting.scatter Returns -------- tfcomb.objects.CombObj() An object containing a subset of <obj>.rules See also --------- tfcomb.plotting.scatter """ #Check given input self._check_rules() if x_threshold is not None: check_value(x_threshold) if y_threshold is not None: check_value(y_threshold) tfcomb.utils.check_value(y_threshold_percent, vmin=0, vmax=1, name="y_threshold_percent") tfcomb.utils.check_value(x_threshold_percent, vmin=0, vmax=1, name="x_threshold_percent") #Check if measure are in columns if x not in self.rules.columns: raise KeyError("Column given for x ('{0}') is not in .rules".format(x)) #Warn if y aka zscore contains inf if self.rules[y].isin([np.inf, -np.inf]).any(): self.logger.warning(f"{y} contains infinite value. This could be the result of 'n_background' from .count_within being set too low, which will lead to spurious results of .select_significant_rules. Please check and adjust parameters if necessary.") #If measure_threshold is None; try to calculate optimal threshold via knee-plot if x_threshold is None: self.logger.info("x_threshold is None; trying to calculate optimal threshold") x_threshold = tfcomb.utils.get_threshold(self.rules[x], percent=x_threshold_percent) if y_threshold is None: self.logger.info("y_threshold is None; trying to calculate optimal threshold") y_threshold = tfcomb.utils.get_threshold(self.rules[y], percent=y_threshold_percent) #Set threshold on table selected = self.rules.copy() selected = selected[(selected[x] >= x_threshold) & (selected[y] >= y_threshold)] if plot == True: tfcomb.plotting.scatter(self.rules, x=x, y=y, x_threshold=x_threshold, y_threshold=y_threshold, **kwargs) #Create a CombObj with the subset of TFBS and rules self.logger.info("Creating subset of TFBS and rules using thresholds") self.logger.debug("Copying old to new object") new_obj = self.copy() new_obj.rules = selected new_obj.network = None if reduce_TFBS == True: new_obj.reduce_TFBS() return(new_obj)
#-----------------------------------------------------------------------------------------# #------------------------------ Integration of external data -----------------------------# #-----------------------------------------------------------------------------------------#
[docs] def integrate_data(self, table, merge="pair", TF1_col="TF1", TF2_col="TF2", prefix=None): """ Function to add external data to object rules. Parameters ------------ table : str or pandas.DataFrame A table containing data to add to .rules. If table is a string, 'table' is assumed to be the path to a tab-separated table containing a header line and rows of data. merge : str Which information to merge - must be one of "pair", "TF1" or "TF2". The option "pair" is used to merge infromation about TF-TF pairs such as protein-protein-interactions. The 'TF1' and 'TF2' can be used to include TF-specific information such as expression levels. TF1_col : str, optional The column in table corresponding to "TF1" name. If merge == "TF2", 'TF1' is ignored. Default: "TF1". TF2_col : str, optional The column in table corresponding to "TF2" name. If merge == "TF1", 'TF2' is ignored. Default: "TF2". prefix : str, optional A prefix to add to the columns. Can be useful for adding the same information to both TF1 and TF2 (e.g. by using "TF1" and "TF2" prefixes), or adding same-name columns from different tables. Default: None (no prefix). """ self._check_rules() check_type(table, [str, pd.DataFrame], "table") check_string(merge, ["pair", "TF1", "TF2"], "merge") #Read table if string (path) was given if isinstance(table, str): table = pd.read_csv(table, sep="\t") self.logger.info("Read table of shape {0} with columns: {1}".format(table.shape, table.columns.tolist())) table = table.drop_duplicates() #Add prefix to columns if prefix is not None: check_type(prefix, str) table.columns = [prefix + str(col) if col not in [TF1_col, TF2_col] else col for col in table.columns] #Check if columns in table were already existing current_columns = [col for col in self.rules.columns if col not in ["TF1", "TF2"]] adding_columns = [col for col in table.columns if col not in [TF1_col, TF2_col]] duplicates = list(set(current_columns) & set(adding_columns)) if len(duplicates) > 1: self.logger.warning("Column(s) '{0}' from input table are already present in .rules, and could not be integrated.".format(duplicates)) self.logger.warning("Please set 'prefix' in order to make the column names unique.") table.drop(columns=duplicates, inplace=True) #Merge table to object if merge == "TF1": check_columns(table, [TF1_col]) self.rules = self.rules.merge(table, left_on="TF1", right_on=TF1_col, how="left") self.rules = self.rules.drop(columns=[TF1_col]) elif merge == "TF2": check_columns(table, [TF2_col]) self.rules = self.rules.merge(table, left_on="TF2", right_on=TF2_col, how="left") self.rules = self.rules.drop(columns=[TF2_col]) elif merge == "pair": check_columns(table, [TF1_col, TF2_col]) self.rules = self.rules.merge(table, left_on=["TF1", "TF2"], right_on=[TF1_col, TF2_col], how="left") #Set name of index for table self.rules.index = self.rules["TF1"] + "-" + self.rules["TF2"] #If data was integrated, .network must be recalculated self.network = None
#-----------------------------------------------------------------------------------------# #-------------------------------- Plotting functionality --------------------------------# #-----------------------------------------------------------------------------------------#
[docs] def plot_TFBS(self, **kwargs): """ This is a wrapper for the plotting function `tfcomb.plotting.genome_view` Parameters ------------ kwargs : arguments All arguments are passed to `tfcomb.plotting.genome_view`. Please see the documentation for input parameters. """ self._check_TFBS() #Requires TFBS #Plot TFBS via genome view tfcomb.plotting.genome_view(self.TFBS, **kwargs)
[docs] def plot_heatmap(self, n_rules=20, color_by="cosine", sort_by=None, **kwargs): """ Plot a heatmap of rules and their attribute values. This is a wrapper for the plotting function `tfcomb.plotting.heatmap`. Parameters ----------- n_rules : int, optional The number of rules to show. The first `n_rules` rules of .rules are taken. Default: 20. color_by : str, optional A column within .rules to color the heatmap by. Note: Can be different than sort_by. Default: "cosine". sort_by : str, optional A column within .rules to sort by before choosing n_rules. Default: None (rules are not sorted before selection). kwargs : arguments Any additional arguments are passed to tfcomb.plotting.heatmap. See also --------- tfcomb.plotting.heatmap """ #Check types tfcomb.utils.check_type(n_rules, [int]) #Check that columns are available in self.rules tfcomb.utils.check_columns(self.rules, [color_by, sort_by]) #Sort table by another column than currently associations = self.rules.copy() if sort_by is not None: associations = associations.sort_values(sort_by, ascending=False) #Choose n number of rules tf1_list = associations["TF1"][:n_rules] tf2_list = associations["TF2"][:n_rules] # Fill all combinations for the TFs selected from top rules (to fill white spaces) chosen_associations = associations[(associations["TF1"].isin(tf1_list) & associations["TF2"].isin(tf2_list))] #Plot h = tfcomb.plotting.heatmap(chosen_associations, color_by=color_by, **kwargs)
[docs] def plot_bubble(self, n_rules=20, yaxis="cosine", color_by="TF1_TF2_count", size_by=None, sort_by=None, **kwargs): """ Plot a bubble-style scatterplot of the object rules. This is a wrapper for the plotting function `tfcomb.plotting.bubble`. Parameters ----------- n_rules : int, optional The number of rules to show. The first `n_rules` rules of .rules are taken. Default: 20. yaxis : str, optional A column within .rules to depict on the y-axis of the plot. Default: "cosine". color_by : str, optional A column within .rules to color points in the plot by. Default: "TF1_TF2_count". size_by : str, optional A column within .rules to size points in the plot by. Default: None. sort_by : str, optional A column within .rules to sort by before choosing n_rules. Default: None (rules are not sorted before selection). unique : bool, optional Only show unique pairs in plot, e.g. only the first occurrence of TF1-TF2 / TF2-TF1. Default: True. kwargs : arguments Any additional arguments are passed to tfcomb.plotting.bubble. See also ----------- tfcomb.plotting.bubble """ self._check_rules() #Sort rules table = self.rules.copy() if sort_by is not None: table = table.sort_values(sort_by, ascending=False) #Select n top rules top_rules = table.head(n_rules) top_rules.index = top_rules["TF1"].values + " + " + top_rules["TF2"].values #Plot ax = tfcomb.plotting.bubble(top_rules, yaxis=yaxis, color_by=color_by, size_by=size_by, **kwargs)
[docs] def plot_scatter(self, x, y, hue=None, **kwargs): """ Plot a scatterplot of information from .rules. Parameters ------------ x : str The name of the column in .rules containing values to plot on x-axis. y : str The name of the column in .rules containing values to plot on y-axis. """ #Check that market basket was run self._check_rules() tfcomb.plotting.scatter(self.rules, x, y)
#-------------------------------------------------------------------------------------------# #----------------------------------- In-depth analysis -------------------------------------# #-------------------------------------------------------------------------------------------#
[docs] def create_distObj(self): """ Creates a distObject, useful for manual analysis. Fills self.distObj. """ self._check_rules() self.distObj = DistObj() self.distObj.fill_rules(self) self.distObj.logger.info("DistObject successfully created! It can be accessed via <CombObj>.distObj")
[docs] def analyze_distances(self, parent_directory=None, threads=4, correction=True, scale=True, **kwargs): """ Standard distance analysis workflow. Use create_distObj for own workflow steps and more options! """ self.create_distObj() self.distObj.set_verbosity(self.verbosity) self.distObj.count_distances(directional=self.directional) #Ensure that parent directories exist before trying to plot tfcomb.utils.check_type(parent_directory,[type(None),str]) if parent_directory is not None: tfcomb.utils.check_dir(parent_directory) subfolder_linres = os.path.join(parent_directory, "linres") tfcomb.utils.check_dir(subfolder_linres) subfolder_corrected = os.path.join(parent_directory, "corrected") tfcomb.utils.check_dir(subfolder_corrected) subfolder_peaks = os.path.join(parent_directory, "peaks","peaks.tsv") tfcomb.utils.check_dir(subfolder_peaks) else: subfolder_linres = parent_directory subfolder_corrected = parent_directory subfolder_peaks = parent_directory #Perform steps in standard workflow if scale: self.distObj.scale() self.distObj.smooth(window_size=3) if correction: self.distObj.correct_background(threads=threads) self.distObj.analyze_signal_all(threads=threads, save=subfolder_peaks, **kwargs) if parent_directory is not None: for tf1,tf2 in list(zip(self.distances.TF1, self.distances.TF2)): self.plot_analyzed_signal((tf1, tf2), only_peaking=True, save=os.path.join(parent_directory, "peaks", f"{tf1}_{tf2}.png"))
[docs] def analyze_orientation(self): """ Analyze preferred orientation of sites in .TFBS. This is a wrapper for tfcomb.analysis.orientation(). Returns ----------- pd.DataFrame See also ---------- tfcomb.analysis.orientation """ self._check_rules() #market basket must be run. table = tfcomb.analysis.orientation(self.rules) return(table)
#-------------------------------------------------------------------------------------------# #------------------------------------ Network analysis -------------------------------------# #-------------------------------------------------------------------------------------------#
[docs] def build_network(self, **kwargs): """ Builds a TF-TF co-occurrence network for the rules within object. This is a wrapper for the tfcomb.network.build_nx_network() function, which uses the python networkx package. Parameters ------------ kwargs : arguments Any additional arguments are passed to tfcomb.network.build_nx_network(). Returns ------- None - fills the .network attribute of the `CombObj` with a networkx.Graph object """ #Build network self.logger.debug("Building network using tfcomb.network.build_network") self.network = tfcomb.network.build_network(self.rules, node_table=self.TF_table, verbosity=self.verbosity, **kwargs) self.logger.info("Finished! The network is found within <CombObj>.network.")
[docs] def cluster_network(self, method="louvain", weight=None): """ Creates a clustering of nodes within network and add a new node attribute "cluster" to the network. Parameters ----------- method : str, one of ["louvain", "blockmodel"] The method Default: "louvain". weight : str, optional The name of the edge attribute to use as weight. Default: None (not weighted). """ #Fetch network from object if self.network is None: self.logger.info("The .network attribute is not available - running .build_network()") self.build_network() #Decide method of partitioning if method == "louvain": tfcomb.network.cluster_louvain(self.network, weight=weight, logger=self.logger) #this adds "partition" to the network self.logger.info("Added 'cluster' attribute to the network attributes") node_table = tfcomb.network.get_node_table(self.network) elif method == "blockmodel": #Create gt network self._gt_network = tfcomb.network.build_network(self.rules, node_table=self.TF_table, tool="graph-tool", verbosity=self.verbosity) #Partition network tfcomb.network.cluster_blockmodel(self._gt_network) node_table = tfcomb.network.get_node_table(self._gt_network) node_table.set_index("TF1", drop=False, inplace=True) else: raise ValueError("Method must be one of: ['louvain', 'blockmodel']") #Update TF_table self.logger.debug("TF_table: {0}".format(node_table.head(5))) self.TF_table = node_table #Update network attribute for plotting if method == "blockmodel": self.network = tfcomb.network.build_network(self.rules, node_table=self.TF_table, verbosity=self.verbosity)
#no return - networks were changed in place
[docs] def plot_network(self, color_node_by="TF1_count", color_edge_by="cosine", size_edge_by="TF1_TF2_count", **kwargs): """ Plot the rules in .rules as a network using Graphviz for python. This function is a wrapper for building the network (using tfcomb.network.build_network) and subsequently plotting the network (using tfcomb.plotting.network). Parameters ----------- color_node_by : str, optional A column in .rules or .TF_table to color nodes by. Default: 'TF1_count'. color_edge_by : str, optional A column in .rules to color edges by. Default: 'cosine'. size_edge_by : str, optional A column in rules to size edge width by. Default: 'TF1_TF2_count'. kwargs : arguments All other arguments are passed to tfcomb.plotting.network. See also -------- tfcomb.network.build_network and tfcomb.plotting.network """ #Fetch network from object or build network if self.network is None: self.logger.warning("The .network attribute is not set yet - running build_network().") self.build_network() #running build network() #Plot network G = self.network dot = tfcomb.plotting.network(G, color_node_by=color_node_by, color_edge_by=color_edge_by, size_edge_by=size_edge_by, verbosity=self.verbosity, **kwargs) return(dot)
#-----------------------------------------------------------------------------------------# #------------------------------ Comparison to other objects ------------------------------# #-----------------------------------------------------------------------------------------#
[docs] def compare(self, obj_to_compare, measure="cosine", join="inner", normalize=True): """ Utility function to create a DiffCombObj directly from a comparison between this CombObj and another CombObj. Requires .market_basket() run on both objects. Runs DiffCombObj.normalize (if chosen) and DiffCombObj.calculate_foldchanges() under the hood. Note ------ Set .prefix for each object to get proper naming of output log2fc columns. Parameters --------- obj_to_compare : tfcomb.objects.CombObj Another CombObj to compare to the current CombObj. measure : str, optional The measure to compare between objects. Default: 'cosine'. join : string How to join the TF names of the two objects. Must be one of "inner" or "outer". If "inner", only TFs present in both objects are retained. If "outer", TFs from both objects are used, and any missing counts are set to 0. Default: "inner". normalize : bool, optional Whether to normalize values between objects. Default: True. Return ------- DiffCombObj """ #Create object diff = DiffCombObj([self, obj_to_compare], measure=measure, join=join, verbosity=self.verbosity) if normalize == True: diff.normalize() diff.calculate_foldchanges() return(diff)
################################################################################### ############################## Differential analysis ############################## ###################################################################################
[docs]class DiffCombObj(): def __init__(self, objects=[], measure='cosine', join="inner", fillna=True, verbosity=1): """ Initializes a DiffCombObj object for doing differential analysis between CombObj's. Parameters ------------ objects : list, optional A list of CombObj instances. If list is empty, an empty DiffCombObj will be created. Default: []. measure : str, optional The measure to compare between objects. Must be a column within .rules for each object. Default: 'cosine'. join : string How to join the TF names of the two objects. Must be one of "inner" or "outer". If "inner", only TFs present in both objects are retained. If "outer", TFs from both objects are used, and any missing counts are set to 0. Default: "inner". fillna : True If "join" == "outer", there can be missing counts for individual rules. If fillna == True, these counts are set to 0. Else, the counts are NA. Default: True. verbosity : int, optional The verbosity of the output logging. Default: 1. See also --------- add_object for adding objects one-by-one """ #Initialize object variables self.n_objects = 0 self.prefixes = [] #filled by ".add_object" self.measure = measure #the measure #Setup logger self.verbosity = verbosity self.logger = TFcombLogger(self.verbosity) #Add objects one-by-one for obj in objects: self.add_object(obj, join=join, fillna=fillna) #Use functions from CombObj self._set_combobj_functions() def _set_combobj_functions(self): """ Reuse CombObj functions""" self.copy = lambda : CombObj.copy(self) self.set_verbosity = lambda *args, **kwargs: CombObj.set_verbosity(self, *args, **kwargs) self.build_network = lambda : CombObj.build_network(self) self._check_rules = lambda : CombObj._check_rules(self) self.simplify_rules = lambda : CombObj.simplify_rules(self) self.select_TF_rules = lambda *args, **kwargs: CombObj.select_TF_rules(self, *args, **kwargs) self.select_custom_rules = lambda *args, **kwargs: CombObj.select_custom_rules(self, *args, **kwargs) self.reduce_TFBS = lambda : CombObj.reduce_TFBS(self) #to use in selecting rules self.integrate_data = lambda *args, **kwargs: CombObj.integrate_data(self, *args, **kwargs) def __str__(self): pass
[docs] def add_object(self, obj, join="inner", fillna=True ): """ Add one CombObj to the DiffCombObj. Parameters ----------- obj : CombObj An instance of CombObj join : string How to join the TF names of the two objects. Must be one of "inner" or "outer". If "inner", only TFs present in both objects are retained. If "outer", TFs from both objects are used, and any missing counts are set to 0. Default: "inner". fillna : True If "join" == "outer", there can be missing counts for individual rules. If fillna == True, these counts are set to 0. Else, the counts are NA. Default: True. Returns -------- None Object is added in place """ #Check that object is an instance of CombObj check_type(obj, [CombObj]) check_string(join, ["inner", "outer"], "join") #Check that market basket was run on the object try: obj._check_rules() except InputError as e: raise InputError("Object is missing .rules. Please check that .market_basket() was run on the CombObj.") #Check if prefix is set - otherwise, set to obj<int> if obj.prefix is not None: prefix = obj.prefix else: prefix = "Obj" + str(self.n_objects + 1) self.logger.warning("CombObj has no prefix set, so the prefix in the DiffCombObj was set to '{0}'. Use <CombObj>.set_prefix() to set a specific prefix for the CombObj.".format(prefix)) self.prefixes.append(prefix) #Check that prefixes are unique (e.g. if one prefix was set to "Obj1") duplicates = set([p for p in self.prefixes if self.prefixes.count(p) > 1]) if len(duplicates) > 0: raise InputError("Prefix {0} is not unique within DiffCombObj prefixes. Please use <CombObj>.set_prefix() to set another prefix for the object. The current prefixes are: {1}".format(prefix, self.prefixes)) #check that object contains self.measure if self.measure not in obj.rules.columns: raise InputError("Measure '{0}' is not available in <CombObj>.rules. Please rerun .market_basket() for this measure or select another measure for the DiffCombObj".format(self.measure)) #Format table from obj to contain TF1/TF2 + measures with prefix columns_to_keep = ["TF1", "TF2"] + [self.measure] obj_table = obj.rules[columns_to_keep] #only keep necessary columns obj_table.columns = [str(prefix) + "_" + col if col not in ["TF1", "TF2"] else col for col in obj_table.columns] #add prefix to all columns besides TF1/TF2 obj_TF_table = obj.TF_table.copy() obj_TF_table.columns = [str(prefix) + "_" + col for col in obj_TF_table.columns] #Initialize tables if this is the first object if self.n_objects == 0: self.rules = obj_table self.TF_table = obj_TF_table #Or add object to this DiffCombObj else: #if join is inner, remove any TFs not present in both objects if join == "inner": left_TFs = set(list(set(self.rules["TF1"])) + list(set(self.rules["TF2"]))) right_TFs = set(list(set(obj_table["TF1"])) + list(set(obj_table["TF2"]))) common = left_TFs.intersection(right_TFs) not_common = left_TFs.union(right_TFs) - common if len(not_common) > 0: self.logger.warning("{0} TFs were not common between objects and were excluded from .rules. Set 'join' to 'outer' in order to use all TFs across objects. The TFs excluded were: {1}".format(len(not_common), list(not_common))) #Subset both tables to common TFs A = self.rules.loc[self.rules["TF1"].isin(common) & self.rules["TF2"].isin(common)] B = obj_table.loc[obj_table["TF1"].isin(common) & obj_table["TF2"].isin(common)] self.rules = A.merge(B, left_on=["TF1", "TF2"], right_on=["TF1", "TF2"]) else: #join is outer self.rules = self.rules.merge(obj_table, left_on=["TF1", "TF2"], right_on=["TF1", "TF2"], how="outer") if fillna == True: self.rules = self.rules.fillna(0) #Fill NA with null (happens if TF1/TF2 pairs are different between objects) #Merge TF tables self.TF_table = self.TF_table.merge(obj_TF_table, left_index=True, right_index=True) self.n_objects += 1 #current number of objects +1 for the one just added #Set name of index for table self.rules.index = self.rules["TF1"] + "-" + self.rules["TF2"]
#-----------------------------------------------------------------------------------------# #--------------------------- Calculate differential measures -----------------------------# #-----------------------------------------------------------------------------------------#
[docs] def normalize(self): """ Normalize the values for the DiffCombObj given measure (.measure) using quantile normalization. Overwrites the <prefix>_<measure> columns in .rules with the normalized values. """ #Establish input/output columns measure_columns = [prefix + "_" + self.measure for prefix in self.prefixes] zero_bool = self.rules[measure_columns] == 0 nan_bool = self.rules[measure_columns].isnull() #Fill na with 0 data = self.rules[measure_columns] data[nan_bool] = 0 #Normalize values self.rules[measure_columns] = qnorm.quantile_normalize(data, axis=1) #Ensure that original 0 values are kept at 0, and original nan kept at nan self.rules[zero_bool] = np.nan self.rules.fillna(0, inplace=True) self.rules[nan_bool] = np.nan
[docs] def calculate_foldchanges(self, pseudo=0.01): """ Calculate measure foldchanges between objects in DiffCombObj. The measure is chosen at the creation of the DiffCombObj and defaults to 'cosine'. Parameters ---------- pseudo : float, optional Set the pseudocount to add to all values before log2-foldchange transformation. Default: 0.01. See also -------- tfcomb.DiffCombObj.normalize """ measure = self.measure #Find all possible combinations of objects combinations = itertools.combinations(self.prefixes, 2) self.contrasts = list(combinations) self.logger.debug("Contrasts: {0}".format(self.contrasts)) columns = [] #collect the log2fc columns per contrast for (p1, p2) in self.contrasts: self.logger.info("Calculating foldchange for contrast: {0} / {1}".format(p1, p2)) log2_col = "{0}/{1}_{2}_log2fc".format(p1, p2, measure) columns.append(log2_col) p1_values = self.rules[p1 + "_" + measure] p2_values = self.rules[p2 + "_" + measure] ratio = (p1_values + pseudo) / (p2_values + pseudo) ratio[ratio <= 0] = np.nan self.rules[log2_col] = np.log2(ratio) #Sort by first contrast log2fc self.logger.debug("columns: {0}".format(columns)) self.rules.sort_values(columns[0], inplace=True) self.logger.info("The calculated log2fc's are found in the rules table (<DiffCombObj>.rules)")
#-----------------------------------------------------------------------------------------# #------------------------------ Selecting significant rules ------------------------------# #-----------------------------------------------------------------------------------------#
[docs] def select_rules(self, contrast=None, measure="cosine", measure_threshold=None, measure_threshold_percent=0.05, mean_threshold=None, mean_threshold_percent=0.05, plot = True, **kwargs): """ Select differentially regulated rules using a MA-plot on the basis of measure and mean of measures per contrast. Parameters ----------- contrast : tuple Name of the contrast to use in tuple format e.g. (<prefix1>,<prefix2>). Default: None (the first contrast is shown). measure : str, optional The measure to use for selecting rules. Default: "cosine" (internally converted to <prefix1>/<prefix2>_<measure>_log2fc). measure_threshold : tuple, optional Threshold for 'measure' for selecting rules. Default: None (the threshold is estimated automatically) measure_threshold_percent : float between 0-1 If measure_threshold is not set, measure_threshold_percent controls the strictness of the automatic threshold. If you increase this value, more differential rules will be found and vice versa. Default: 0.05. mean_threshold : float, optional Threshold for 'mean' for selecting rules. Default: None (the threshold is estimated automatically) mean_threshold_percent : float between 0-1 if mean_threshold is not set, mean_threshold_percent controls the strictness of the automatic threshold. If you increase this value, more differential rules will be found and vice versa. Default: 0.05. plot : boolean, optional Whether to plot the volcano plot. Default: True. kwargs : arguments, optional Additional arguments are passed to tfcomb.plotting.scatter. Returns ---------- tfcomb.objects.DiffCombObj() An object containing a subset of <DiffCombobj>.rules See also ---------- tfcomb.plotting.volcano """ table = self.rules.copy() #make sure not to change self.rules if self.contrasts is None: self.logger.warning(".select_rules requires foldchanges to run. Running .calculate_foldchanges() now.") #Check input if measure_threshold is not None: tfcomb.utils.check_type(measure_threshold, [tuple, list], name="measure_threshold") measure_threshold = tuple(sorted(measure_threshold)) #ensure that lower threshold is first in tuple if mean_threshold is not None: tfcomb.utils.check_value(mean_threshold, name="mean_threshold") tfcomb.utils.check_value(measure_threshold_percent, vmin=0, vmax=1, name="measure_threshold_percent") tfcomb.utils.check_value(mean_threshold_percent, vmin=0, vmax=1, name="mean_threshold_percent") #Identify measure to use based on contrast if contrast == None: contrast = self.contrasts[0] else: #check if contrast is valid if contrast not in self.contrasts: raise InputError("Given contrast {0} is not valid. The contrast must be a tuple and be any of: {1}".format(contrast, self.contrasts)) self.logger.info("Selecting rules for contrast: {0}".format(contrast)) measure_col = "{0}/{1}_{2}_log2fc".format(contrast[0], contrast[1], measure) self.logger.debug("Measure column is: {0}".format(measure_col)) #Calculate mean of measures measure_cols = [c + "_" + measure for c in contrast] mean_col = "Mean of '{0}' for {1} and {2}".format(measure, contrast[0], contrast[1]) table[mean_col] = table[measure_cols].mean(axis=1) to_keep = table[mean_col] > 0 #only keep values with mean measure > 0 #Find optimal measure threshold if measure_threshold is None: self.logger.info("measure_threshold is None; trying to calculate optimal threshold") vals = table[measure_col][to_keep] measure_threshold = tfcomb.utils.get_threshold(vals, "both", percent=measure_threshold_percent, verbosity=self.verbosity) self.logger.debug("Measure threshold is: {0}".format(measure_threshold)) #Find optimal mean threshold if mean_threshold is None: self.logger.info("mean_threshold is None; trying to calculate optimal threshold") vals = table[mean_col][to_keep] #remove large influence of 0-means mean_threshold = tfcomb.utils.get_threshold(vals, "upper", percent=mean_threshold_percent, verbosity=self.verbosity) self.logger.debug("Mean threshold is: {0}".format(mean_threshold)) #Plot the MA-plot if chosen if plot == True: tfcomb.plotting.scatter(table, x=mean_col, y=measure_col, x_threshold=mean_threshold, y_threshold=measure_threshold, **kwargs) #Set threshold on rules selected = self.rules.copy() selected = selected[((selected[measure_col] <= measure_threshold[0]) | (selected[measure_col] >= measure_threshold[1])) & (table[mean_col] >= mean_threshold)] #Create a DiffCombObj with the subset of rules self.logger.info("Creating subset of rules using thresholds") new_obj = self.copy() new_obj._set_combobj_functions() #set combobj functions for new object; else they point to self new_obj.rules = selected new_obj.network = None return(new_obj)
#-------------------------------------------------------------------------------------------# #----------------------------- Plots for differential analysis -----------------------------# #-------------------------------------------------------------------------------------------#
[docs] def plot_correlation(self, method="pearson", save=None, **kwargs): """ Plot correlation of 'measure' between rules across objects. Parameters ----------- method : str, optional Either 'pearson' or 'spearman'. Default: 'pearson'. save : str, optional Save the plot to the file given in 'save'. Default: None. kwargs : arguments, optional Additional arguments are passed to sns.clustermap. """ #Define columns cols = [prefix + "_" + self.measure for prefix in self.prefixes] #Calculate matrix and plot matrix = self.rules[cols].corr(method=method) g = sns.clustermap(matrix, cbar_kws={'label': method.capitalize() + " corr."}, **kwargs) #rotate x-axis labels _ = plt.setp(g.ax_heatmap.get_yticklabels(), rotation=0, size=15) # For y axis _ = plt.setp(g.ax_heatmap.get_xticklabels(), rotation=45, ha="right", size=15) # For x axis if save is not None: plt.savefig(save, dpi=600, bbox_inches="tight") return(g)
[docs] def plot_rules_heatmap(self, **kwargs): """ Plot a heatmap of size n_rules x n_objects """ cols = [prefix + "_" + self.measure for prefix in self.prefixes] data = self.rules[cols] g = sns.clustermap(data, xticklabels=True, **kwargs) #Rotate labels _ = plt.setp(g.ax_heatmap.get_xticklabels(), rotation=45, ha="right", size=15) # For x axis return(g)
#todo: plot_contrast heatmap
[docs] def plot_heatmap(self, contrast=None, n_rules=10, color_by="cosine_log2fc", sort_by=None, **kwargs): """ Functionality to plot a heatmap of differentially co-occurring TF pairs for a certain contrast. Parameters ------------ contrast : tuple, optional Name of the contrast to use in tuple format e.g. (<prefix1>,<prefix2>). Default: None (the first contrast is shown). n_rules : int, optional Number of rules to show from each contrast (default: 10). Note: This is the number of rules either up/down, meaning that the rules shown are n_rules * 2. color_by : str, optional Default: "cosine" (converted to "<prefix1>/<prefix2>_<color_by>") sort_by : str, optional Column in .rules to sort rules by. Default: None (keep sort) kwargs : arguments, optional Additional arguments are passed to tfcomb.plotting.heatmap. See also ---------- tfcomb.plotting.heatmap """ #todo: requires log2fcs to be calculated contrast = tfcomb.utils.set_contrast(contrast, self.contrasts) #Decide columns based on color_by / sort_by color_by = "{0}/{1}_{2}".format(contrast[0], contrast[1], color_by) if sort_by is not None: sort_by = "{0}/{1}_{2}".format(contrast[0], contrast[1], sort_by) #Check if columns are found in table check_columns(self.rules, [sort_by, color_by]) #Sort by measure associations = self.rules.copy() if sort_by is not None: associations.sort_values(sort_by, ascending=False, inplace=True) #Choose n number of rules tf1_list = list(set(associations["TF1"][:n_rules].tolist() + associations["TF1"][-n_rules:].tolist())) tf2_list = list(set(associations["TF2"][:n_rules].tolist() + associations["TF2"][-n_rules:].tolist())) # Fill all combinations for the TFs selected from top rules (to fill white spaces) chosen_associations = associations[(associations["TF1"].isin(tf1_list) & associations["TF2"].isin(tf2_list))] #Plot heatmap tfcomb.plotting.heatmap(chosen_associations, color_by=color_by)
[docs] def plot_bubble(self, contrast=None, n_rules=20, yaxis="cosine_log2fc", color_by=None, size_by=None, **kwargs): """ Plot bubble scatterplot of information within .rules. Parameters ----------- contrast : tuple, optional Name of the contrast to use in tuple format e.g. (<prefix1>,<prefix2>). Default: None (the first contrast is shown). n_rules : int, optional Number of rules to show (in each direction). Default: 20. yaxis : str, optional Measure to show on the y-axis. Default: "cosine_log2fc". color_by : str, optional If column is not in rules, the string is supposed to be in the form "prefix1/prefix2_<color_by>". Default: None. size_by : str, optional Column to size bubbles by. Default: None. kwargs : arguments Any additional arguments are passed to tfcomb.plotting.bubble. See also ---------- tfcomb.plotting.bubble """ contrast = tfcomb.utils.set_contrast(contrast, self.contrasts) #Decide column names based on yaxis / color_by / sort_by columns = [yaxis, color_by, size_by] for i, name in enumerate(columns): if name is not None: if name not in self.rules.columns: #Assume that this is the contrast suffix columns[i] = "{0}/{1}_{2}".format(*contrast, name) yaxis, color_by, size_by = columns #Select top/bottom n rules sorted_table = self.rules.sort_values(yaxis, ascending=False) sorted_table.index = sorted_table["TF1"] + " + " + sorted_table["TF2"] top_rules = sorted_table.head(n_rules) bottom_rules = sorted_table.tail(n_rules) # Draw each cell as a scatter point with varying size and color fig, (ax1, ax2) = plt.subplots(2, constrained_layout=True) tfcomb.plotting.bubble(data=top_rules, yaxis=yaxis, color_by=color_by, size_by=size_by, ax=ax1, **kwargs) tfcomb.plotting.bubble(data=bottom_rules, yaxis=yaxis, color_by=color_by, size_by=size_by, ax=ax2, **kwargs)
#Remove x-axis label for upper plot #return(fig)
[docs] def plot_network(self, contrast=None, color_node_by=None, size_node_by=None, color_edge_by="cosine_log2fc", size_edge_by=None, **kwargs): """ Plot the network of differential co-occurring TFs. Parameters ----------- contrast : tuple Name of the contrast to use in tuple format e.g. (<prefix1>,<prefix2>). Default: None (the first contrast is shown). color_node_by : str, optional Name of measure to color node by. If column is not in .rules, the name will be internally converted to "prefix1/prefix2_<color_edge_by>". Default: None. size_node_by : str, optional Column in .rules to size_node_by. If column is not in .rules, the name will be internally converted to "prefix1/prefix2_<size_node_by>" Default: None. color_edge_by : str, optional The name of measure or column to color edge by (will be internally converted to "prefix1/prefix2_<color_edge_by>"). Default: "cosine_log2fc". size_edge_by : str, optional The name of measure or column to size edge by. kwargs : arguments Any additional arguments are passed to tfcomb.plotting.network. Returns -------- dot network object See also --------- tfcomb.plotting.network """ contrast = tfcomb.utils.set_contrast(contrast, self.contrasts) #Adjust column names self.logger.debug("Adjusting column names") columns = [color_node_by, size_node_by, color_edge_by, size_edge_by] for i, name in enumerate(columns): if name is not None: if name not in self.rules.columns: #Assume that this is the contrast suffix columns[i] = "{0}/{1}_{2}".format(*contrast, name) color_node_by, size_node_by, color_edge_by, size_edge_by = columns self.logger.debug("color_node_by: {0} | size_node_by: {1} | color_edge_by: {2} | size_edge_by: {3}".format(color_node_by, size_node_by, color_edge_by, size_edge_by, )) #Create subset of rules selected = self.rules #Build network self.logger.debug("Building network using 'tfcomb.network.build_network'") self.build_network() #adds .network to self #G = tfcomb.network.build_nx_network(selected) #self.network = G #Plot network self.logger.debug("Plotting network using 'tfcomb.plotting.network'") dot = tfcomb.plotting.network(self.network, color_node_by=color_node_by, size_node_by=size_node_by, color_edge_by=color_edge_by, size_edge_by=size_edge_by, verbosity=self.verbosity, **kwargs) return(dot)
# -------------------------------------------------------------------------------# # ----------------------------- Save / import object ----------------------------# # -------------------------------------------------------------------------------#
[docs] def to_pickle(self, path : str): """ Save the DiffCombObj to a pickle file. Parameters ---------- path : str Path to the output pickle file e.g. 'my_diff_comb_obj.pkl'. See also --------- from_pickle """ f_out = open(path, 'wb') dill.dump(self, f_out)
[docs] def from_pickle(self, path : str): """ Import a DiffCombObj from a pickle file. Parameters ----------- path : str Path to an existing pickle file to read. Raises ------- InputError If read object is not an instance of DiffCombObj. See also ---------- to_pickle """ filehandler = open(path, 'rb') obj = dill.load(filehandler) # Check if object is CombObj if not isinstance(obj, DiffCombObj): raise InputError("Object from '{0}' is not a DiffCombObj".format(path)) # Overwrite self with DiffCombObj self = obj self.set_verbosity(self.verbosity) # restart logger return self
################################################################################### ############################## Distance analysis ################################## ###################################################################################
[docs]class DistObj(): """ The main class for analyzing preferred binding distances for co-occurring TFs. Examples ---------- >>> D = tfcomb.distances.DistObj() # Verbosity of the output log can be set using the 'verbosity' parameter: >>> D = tfcomb.distances.DistObj(verbosity=2) """ #-------------------------------------------------------------------------------------------# #-------------------------------- Setup and sanity checks ----------------------------------# #-------------------------------------------------------------------------------------------# def __init__(self, verbosity=1): #set verbosity #Function and run parameters self.verbosity = verbosity #0: error, 1:info, 2:debug, 3:spam-debug self.logger = TFcombLogger(self.verbosity) #Variables for storing data self.rules = None # Filled in by .fill_rules() self.TF_names = [] # List of TF names self.TF_table = None # List of all TF counts self._raw = None # Raw distance data [Pandas DataFrame of size n_pairs x maxDist] self.network = None # Network obj self.distances = None # Pandas DataFrame of size n_pairs x maxDist self.uncorrected = None # Pandas DataFrame of size n_pairs x maxDist self.corrected = None # Pandas DataFrame of size n_pairs x maxDist self.linres = None # Pandas DataFrame of size n_pairs x 3 self.normalized = None # Pandas DataFrame of size n_pairs x maxDist self.smoothed = None # Pandas DataFrame of size n_pairs x maxDist self.peaks = None # Pandas DataFrame of size n_pairs x n_preferredDistance self.classified = None # Pandas DataFrame of size n_pairs x 3 self.datasource = None # Holds the datastructure self.peaking_count = None # Number of pairs with at least one peak self.zscores = None # calculated zscores self.stringency = None # stringency param self.prominence = None # zscore or array of flat values self._noise_method = None # private storage noise_method self._height_multiplier = None # private storage height_mulitplier self._collapsed = None #stores the negative values to be able to expand negative section again self._collapsed_peaks = None #stores the negative peaks to be able to expand negative section again self.n_bp = 0 # Predicted number of baskets self.TFBS = RegionList() # None RegionList() of TFBS self.smooth_window = 1 # Smoothing window size, 1 = no smoothing # str <-> int encoding dicts self.name_to_idx = None # Mapping TF-names: string <-> int self.pair_to_idx = None # Mapping Pairs: tuple(string) <-> int self.anchor_modes = {"inner": 0, "outer": 1, "center": 2} #str -> integer anchor mode # Default analysis parameters self.min_dist = 0 # Minimum distance. Default: 0 self.max_dist = 100 # Maximum distance. Default 100. self.max_overlap = 0 # Maximum overlap. Default 0. self.directional = False # True if direction is taken into account, false otherwise self.anchor = "inner" # How to count distances: inner, outer or center. self.percentage = False # Whether to count distances as bp or percentages # private constants self._XLBL_ROTATION = 90 # label rotation degree for plotting x labels self._XLBL_FONTSIZE = 10 # label fontsize adjustment for plotting x labels #Use functions from CombObj self._set_combobj_functions() def _set_combobj_functions(self): """ Reuse CombObj functions""" self.copy = lambda : CombObj.copy(self) self.set_verbosity = lambda *args, **kwargs: CombObj.set_verbosity(self, *args, **kwargs) self._prepare_TFBS = lambda *args, **kwargs: CombObj._prepare_TFBS(self, *args, **kwargs) self._get_sort_idx = lambda *args, **kwargs: CombObj._get_sort_idx(*args, **kwargs) self.check_pair = lambda *args: CombObj.check_pair(self, *args) def __str__(self): """ Returns a string representation of the DistObj depending on what variables are already stored """ s = "<DistObj" if self.TFBS is not None: s += ": {0} TFBS ({1} unique names)".format(len(self.TFBS), len(self.TF_names)) if self.rules is not None: s += " | Market basket analysis: {0} rules".format(self.rules.shape[0]) if self.peaks is not None: s += " | Found peaks: {0}".format(self.peaks.shape[0]) s += " | from {0} pairs.".format(self.peaking_count) s += ">" return(s)
[docs] def set_verbosity(self, level): """ Set the verbosity level for logging after creating the CombObj. Parameters ---------- level : int A value between 0-3 where 0 (only errors), 1 (info), 2 (debug), 3 (spam debug). Returns ------- None Sets the verbosity level for the Logger inplace """ self.verbosity = level self.logger = TFcombLogger(self.verbosity) #restart logger with new verbosity
[docs] def fill_rules(self, comb_obj): """ Fill DistanceObject according to reference object with all needed Values and parameters to perform standard prefered distance analysis Parameters ---------- comb_obj : tfcomb.objects (or any other object contain all necessary rules) Object from which the rules and parameters should be copied from Returns ------- None Copies values and parameters from a combObj or diffCombObj. """ # Check for mandatory attributes missing = [] for attr in ["rules", "TF_names", "TFBS", "TF_table"]: try: getattr(comb_obj, attr) except AttributeError: missing.append(attr) if len(missing) > 0: raise InputError(f"Mandatory attributes {missing} missing for object {comb_obj}") #Copy internal _sites (integer representation of TFBS) and name_to_idx for name translation for attr in ["name_to_idx", "_sites"]: if hasattr(comb_obj, attr): setattr(self, attr, getattr(comb_obj, attr)) # copy required attributes self.rules = comb_obj.rules self.TF_names = comb_obj.TF_names self.count_names = comb_obj.count_names #different from TF_names if counting is stranded self.TFBS = comb_obj.TFBS self.TF_table = comb_obj.TF_table # Overwrite default parameters with values from CombObj variables = ["min_dist", "max_dist", "min_overlap", "max_overlap", "directional", "stranded", "anchor"] for variable in variables: if hasattr(comb_obj, variable): setattr(self, variable, getattr(comb_obj, variable))
[docs] def reset_signal(self): """ Resets the signals to their original state. Returns -------- None Resets the object datasource variable to the original raw distances """ self.logger.info("Resetting signals") self.datasource = self.distances
#-------------------------------------------------------------------------------------------# #---------------------------------- Checks on variables-------------------------------------# #-------------------------------------------------------------------------------------------#
[docs] def check_datasource(self, att): """ Utility function to check if distances in .<att> were set. If not, InputError is raised. Parameters ---------- att : str Attribute name for a dataframe in self. """ df = getattr(self, att) #fetched dataframe for this att if df is None: if att == "distances" or att == "datasource": raise InputError("No distances evaluated yet. Please run .count_distances() first.") elif att == "corrected" or att == "uncorrected": raise InputError("Distances are not corrected yet. Please run .correct_background() first.") elif att == "scaled": raise InputError("Distance are not yet scaled. Please run .scale() first.") elif att == "smoothed": raise InputError("Distances are not yet smoothed. Please run .smooth() first.") elif att == "zscores": raise InputError("Distances were not analyzed yet. Please run .analyze_signal_all() first.") #If self.<att> is present, check if it is a Dataframe tfcomb.utils.check_type(df, pd.DataFrame, att)
[docs] def check_peaks(self): """ Utility function to check if peaks were called. If not, InputError is raised. """ if self.peaks is None: raise InputError("Peaks not evaluated yet. Please run .analyze_signal_all() first.") #If self.peaks is present, check if it is a Dataframe tfcomb.utils.check_type(self.peaks, pd.DataFrame, ".corrected")
[docs] def check_min_max_dist(self): """ Utility function to check if min and max distance are valid. """ if self.min_dist is None: raise InputError(".min_dist is not set") if self.max_dist is None: raise InputError(".max_dist is not set") tfcomb.utils.check_value(self.min_dist, integer=True, name=".min_dist") tfcomb.utils.check_value(self.max_dist, integer=True, name=".max_dist") if self.min_dist > self.max_dist: raise InputError(".min_dist must be lesser or equal .max_dist")
#-------------------------------------------------------------------------------------------# #------------------------ Running different functions in chunks ----------------------------# #-------------------------------------------------------------------------------------------#
[docs] @staticmethod def chunk_table(table, n): """ Split a pandas dataframe row-wise into n chunks. Parameters ------------ n : int A positive number of chunks to split table into. Returns -------- list of pd.DataFrames """ n_rows = table.shape[0] chunk_size = math.ceil(n_rows/n) # last chunk will be chunks - (threads - 1) smaller chunks = [table.iloc[i:i+chunk_size,:] for i in range(0, n_rows, chunk_size)] #list of dataframes return chunks
def _multiprocess_chunks(self, threads, func, datatable): """ Split Data in chunks to multiprocess it. Because of the rather short but numerous calls mp.Pool() creates to much overhead. So instead utilize chunks. Following functions can be multiprocessed: ["analyze_signal", "evaluate_noise"] Parameters ---------- threads : int Number of threads used func : tfcomb.utils.*_chunks function Related chunk process function from tfcomb.utils datatable: pd.DataFrame Datatable to operate on (e.g. .distances) See also ------- tfcomb.utils.analyze_signal_chunks tfcomb.utils.evaluate_noise_chunks """ # check function is supported if not callable(func): raise InputError(f"Input {func} not callable. Please provide a function") check_string(func.__name__,["analyze_signal_chunks", "evaluate_noise_chunks"], name="function") self.logger.debug(f"Multiprocessing chunks for {func}") # open Pool for multiprocessing pool = mp.Pool(threads) #Chunk table into threads. multiprocess every function call will result in mp overhead, therefore chunk it chunks = self.chunk_table(datatable, threads) # start one chunk per thread jobs = [] for chunk in chunks: #range(threads): #Apply function with parameters and add to pool if func == tfcomb.utils.analyze_signal_chunks: job = pool.apply_async(func, args=(chunk, self.threshold, )) # apply function with params elif func == tfcomb.utils.evaluate_noise_chunks: # subset peaks to those in chunk peaks_sub = self.peaks.loc[chunk.index.tolist()] job = pool.apply_async(func, args=(chunk, peaks_sub, self._noise_method, self._height_multiplier, )) # apply function with params jobs.append(job) # accept no new jobs pool.close() # log_progress(jobs, self.logger) # doesn't work with chunks (TODO) # get results from jobs results = [] for job in jobs: results += job.get() # wait for all jobs to be finished and tidy up pools pool.join() # convert to numpy results = np.array(results, dtype=object) # prevent convert float to str return results #-------------------------------------------------------------------------------------------# #---------------------------- Counting distances between TFs -------------------------------# #-------------------------------------------------------------------------------------------#
[docs] def count_distances(self, directional=None, stranded=None, percentage=False, percentage_bins=100): """ Count distances for co_occurring TFs, can be followed by analyze_distances to determine preferred binding distances Parameters ---------- directional : bool or None, optional Decide if direction of found pairs should be taken into account, e.g. whether "<---TF1---> <---TF2--->" is only counted as TF1-TF2 (directional=True) or also as TF2-TF1 (directional=False). If directional is None, self.directional will be used. Default: None. stranded : bool or None, optional Whether to take strand of TFBS into account when counting distances. If stranded is None, self.stranded will be used. Default: None percentage : bool, optional Whether to count distances as bp or percentage of longest TF1/TF2 region. If True, output will be collected in 1-percent increments from 0-1. If False, output depends on the min/max distance values given in the DistObj. Default: False. Returns -------- None Fills the object variable .distances. """ #todo; reset any previous counts #Replace directional/stranded with internal values directional = self.directional if directional is None else directional stranded = self.stranded if stranded is None else stranded #Check input types tfcomb.utils.check_string(self.anchor, list(self.anchor_modes.keys()), "self.anchor") tfcomb.utils.check_type(directional, [bool], "directional") tfcomb.utils.check_type(stranded, bool, "stranded") tfcomb.utils.check_type(percentage, bool, "percentage") tfcomb.utils.check_value(percentage_bins, vmin=1, integer=True, name="percentage_bins") self.check_min_max_dist() #Check if overlapping is allowed (when anchor == 0 (inner))): if self.anchor == "inner" and self.min_dist < 0 and self.max_overlap == 0: self.logger.warning("'min_dist' is below 0, but max_overlap is set to 0. Please set max_overlap > 0 in order to count overlapping pairs with negative distances.") self.logger.info("Preparing to count distances.") # encode chromosome,pairs and name to int representation self._prepare_TFBS() #prepare _sites and name_to_idx dict sites = self._sites #Check if rules look stranded rule_names = list(set(self.rules["TF1"].tolist() + self.rules["TF2"].tolist())) strand_matching = [re.match("(.+)\(([+-.])\)$", name) for name in rule_names] #check if name is "NAME(+/./-)" n_strand_names = sum([match is not None for match in strand_matching]) if n_strand_names > 0 and stranded == False: raise InputError("TF names in .rules contain strand information, but stranded is set to False. Please set stranded to True to count distances for rules.") #Should strand be taken into account? if stranded == True: sites = self._sites.copy() #don't change self._sites name_to_idx = {} TF_names = [] #names in order of idx current_idx = -1 for i, site in enumerate(self.TFBS): name = "{0}({1})".format(site.name, site.strand) if name not in name_to_idx: TF_names.append(name) current_idx = current_idx + 1 name_to_idx[name] = current_idx sites[i][-1] = name_to_idx[name] #set new idx based on stranded name else: TF_names = self.TF_names name_to_idx = self.name_to_idx self.pairs = [(name_to_idx[TF1], name_to_idx[TF2]) for (TF1, TF2) in self.rules[["TF1", "TF2"]].values] #list of TF1/TF rules to count distances for self.logger.spam("TF_names: {0}".format(TF_names)) #Sort sites by mid if anchor == center: if self.anchor == "center": sort_idx = self._get_sort_idx(sites, anchor="center") sites = sites[sort_idx, :] self.logger.info("Calculating distances") anchor_mode = self.anchor_modes[self.anchor] self._raw = count_co_occurrence(sites, min_dist=self.min_dist, max_dist=self.max_dist, min_overlap=self.min_overlap, max_overlap=self.max_overlap, binarize=False, #does not affect distance counting anchor=anchor_mode, #integer representation of anchor n_names=len(TF_names), directional=directional, task=2, #task = count distances rules=self.pairs, #rules to count distances for percentage=percentage, percentage_bins=percentage_bins, ) self.percentage = percentage self.percentage_bins = percentage_bins # convert raw counts (numpy array with int encoded pair names) to better readable format (pandas DataFrame with TF names) self._raw_to_human_readable(name_to_idx) #fills in .distances self.logger.info("Done finding distances! Results are found in .distances") self.logger.info("You can now run .smooth() and/or .correct_background() to preprocess sites before finding peaks.") self.logger.info("Or you can find peaks directly using .analyze_signal_all()")
def _raw_to_human_readable(self, name_to_idx): """ Get the raw distance in human readable format. Sets the variable '.distances' which is a pd.Dataframe with the columns: TF1 name, TF2 name, count min_dist, count min_dist +1, ...., count max_dist). Note: Normalization method is min_max normalization: (x[i] - min(x))/(max(x)-min(x)) Parameters ----------- name_to_idx : dict Dictionary encoding TF names to integers """ #Converting to pandas format self.logger.debug("Converting raw count data to pretty dataframe") if self.percentage == False: columns = ['TF1', 'TF2'] + list(range(self.min_dist, self.max_dist + 1)) else: columns = ['TF1', 'TF2'] + list(range(self.percentage_bins+1)) self.distances = pd.DataFrame(self._raw, columns=columns) #Convert integers to TF names # get names from int encoding idx_to_name = {} for name, idx in name_to_idx.items(): idx_to_name[idx] = name self.distances["TF1"] = [idx_to_name[idx] for idx in self.distances["TF1"]] self.distances["TF2"] = [idx_to_name[idx] for idx in self.distances["TF2"]] self.distances.index = self.distances["TF1"] + "-" + self.distances["TF2"] #Set datasource for future normalization/correction/analysis self.datasource = self.distances.copy() #-------------------------------------------------------------------------------------------# #------------------- Process counted distances (scale/smooth/correct)-----------------------# #-------------------------------------------------------------------------------------------# #Scale signal
[docs] def scale(self, how="min-max"): """ Scale the counted distances per pair. Saves the scaled counts into .scaled and updates .datasource. Parameters ----------- how : str, optional How to scale the counts. Must be one of: ["min-max", "fraction"]. If "min-max", all counts are scaled between 0 and 1. If "fraction", the sum of all counts are scled between 0 and 1. Default: "min-max". """ check_string(how, ["min-max", "fraction"], "how") distances_mat = self.datasource.iloc[:,2:].to_numpy().astype(float) #float for nan replacement #Calculate scaling depending on "how" if how == "min-max": min_count = np.array([distances_mat.min(axis=1)]).T #convert to column vectors max_count = np.array([distances_mat.max(axis=1)]).T #convert to column vectors ranges = max_count - min_count #min-max range per pair ranges[ranges==0] = np.nan #Perform scaling and save to self.scaled scaled_mat = (distances_mat - min_count) / ranges scaled_mat = np.nan_to_num(scaled_mat) elif how == "fraction": sum_count = distances_mat.sum(axis=1) #sum of each row sum_count[sum_count==0] = np.nan scaled_mat = distances_mat / sum_count.reshape(-1,1) scaled_mat = np.nan_to_num(scaled_mat) #rescale back to 0 #Set .scaled variable self.scaled = self.datasource.copy() self.scaled.iloc[:,2:] = scaled_mat #Update datasource self.datasource = self.scaled
#Smooth signals
[docs] def smooth(self, window_size=3, reduce=True): """ Helper function for smoothing all rules with a given window size. Parameters ---------- window_size : int, optional Window size for the rolling smoothing window. A bigger window produces larger flanking ranks at the sides. Default: 3. reduce : bool, optional Reduce the distances to the positions with a full window, i.e. if the window size is 3, the first and last distances are removed. This prevents flawed enrichment of peaks at the borders of the distances. Default: True. Returns -------- None Fills the object variable .smoothed and updates .datasource """ tfcomb.utils.check_value(window_size, vmin=0, integer=True, name="window size") if self.is_smoothed() == True: self.logger.warning("Data was already smoothed - beware that smoothing again might produce unwanted results. Please use .reset_signal() to reset the signal.") self.logger.info(f"Smoothing signals with window size {window_size}") mat = self.datasource.iloc[:,2:].to_numpy() #distances counted distances = self.datasource.columns[2:] smoothed_mat = np.array([tfcomb.counting.rolling_mean(mat[row,:].astype(float), window_size) for row in range(len(mat))]) #smooth values per pair #Cut the borders of smoothed mat if reduce == True if reduce == True: lf = int(np.floor((window_size - 1) / 2.0)) rf = int(np.ceil((window_size - 1)/ 2.0)) smoothed_mat = smoothed_mat[:,lf:-rf] distances = distances[lf:-rf] #Create final table smoothed_table = pd.DataFrame(smoothed_mat, columns=distances).reset_index(drop=True) self.smoothed = pd.concat([self.datasource.iloc[:,:2].reset_index(drop=True), smoothed_table], axis=1) self.smoothed.index = self.datasource.index #reset index #Save information about smoothing to object self.smooth_window = window_size self.datasource = self.smoothed
[docs] def is_smoothed(self): """ Return True if data was smoothed during analysis, False otherwise Returns -------- bool True if smoothed, False otherwiese """ if (self.smoothed is None) or (self.smooth_window <= 1): return False return True
#Correct background signal from distance counts
[docs] def correct_background(self, frac=0.66, threads=1): """ Corrects the background of distances. Parameters ------------- frac : float, optional Fraction of data used to calculate smooth. Setting this fraction lower will cause stronger smoothing effect. Default: 0.66 threads : int, optional Number of threads to use in functions. Default: 1. Returns ---------- None Fills the object variable .corrected """ #Check input parameters self.check_datasource("distances") tfcomb.utils.check_value(frac, vmin=0, vmax=1, name="frac") tfcomb.utils.check_value(threads, vmin=1, vmax=os.cpu_count(), integer=True, name="threads") #Correct background in chunks pool = mp.Pool(threads) chunks = self.chunk_table(self.datasource, threads) jobs = [] for chunk in chunks: job = pool.apply_async(self._correct_chunk, (chunk, frac, )) jobs.append(job) #Collect results pool.close() pool.join() #waits for all jobs results = [job.get() for job in jobs] #Join tables self.uncorrected = self.datasource #used for plotting self.lowess = pd.concat([result[0] for result in results]) self.corrected = pd.concat([result[1] for result in results]) self.datasource = self.corrected #update datasource self.logger.info("Background correction finished! Results can be found in .corrected")
@staticmethod def _correct_chunk(counts, frac=0.66): """ Correct counts for background signal using lowess smoothing. Parameters ------------ counts : pd.DataFrame Subset of .datasource dataframe frac : float, optional Fraction of data used to calculate smooth. Default: 0.66. Returns --------- Tuple of two dataframes counts_lowess : lowess smoothing of counts corrected : counts corrected """ #Run lowess smoothing for each row in input counts n_rows = counts.shape[0] counts_lowess = counts.copy() for i in range(n_rows): signal = counts.iloc[i,2:] #first two columns are TF names counts_lowess.iloc[i,2:] = sm.nonparametric.lowess(signal, range(len(signal)), frac=frac)[:,1] #first column in result is x-values #Correct counts corrected = counts.copy() corrected.iloc[:,2:] = counts.iloc[:,2:] - counts_lowess.iloc[:,2:] #correct by subtracting lowess return (counts_lowess, corrected) #-------------------------------------------------------------------------------------------# #--------------------------- Analysis to get preferred distances ---------------------------# #-------------------------------------------------------------------------------------------# def _get_zscores(self, datasource, mixture=False, threads=1): """ Calculate zscores for the data in datasource. Parameters -------------- datasource : pd.DataFrame The datasource table to calculate zscore on (can be a subset of all rules). mixture : bool, optional Whether to estimate zscore using all datapoints (False) or to use a mixture model (True). Default: False. threads : int, optional Find z-scores using multiprocessing. """ #Get zscores in chunks pool = mp.Pool(threads) chunks = self.chunk_table(datasource, threads) jobs = [] for chunk in chunks: job = pool.apply_async(self._zscore_chunk, (chunk, mixture, )) jobs.append(job) #Collect results pool.close() pool.join() #waits for all jobs results = [job.get() for job in jobs] #Each result consists of the zscore table and a dict containing backgruond information self.zscores = pd.concat([result[0] for result in results]) self.bg_dist = {} for d in [result[1] for result in results]: self.bg_dist.update(d) #add results to bg_dist @staticmethod def _zscore_chunk(datasource, mixture=False): #Fetch information from data indices = datasource.index.tolist() #all index names e.g. TF1-TF2 distance_cols = datasource.columns.tolist()[2:] #Regular zscore stds = datasource[distance_cols].std(ddof=0, axis=1) stds[stds == 0] = np.nan #possible divide by zero in zscore calculation means = datasource[distance_cols].mean(axis=1) #save information on background distribution bg_dist = {} for i, idx in enumerate(indices): bg_dist[idx] = [(means[i], stds[i], 1.0)] #mean, std, weight for each pair # Calculate zscore (x-mean)/std zsc = datasource[distance_cols].subtract(means, axis=0).divide(stds, axis=0).fillna(0) #Save zscores to object zscores = datasource.copy() zscores.iloc[:,2:] = zsc #If mixture == True, try to update with mixture model zscores if mixture == True: for idx in datasource.index.tolist(): #index name e.g. TF1-TF2 X = datasource.loc[idx].iloc[2:].values.astype(float) #Fit kernel to input data to increase signal kernel = stats.gaussian_kde(X) vals = kernel.resample(1000).reshape(-1, 1) #The data is either 1 or two components; find which one models = [] for n in [1,2]: try: model = GaussianMixture(n).fit(vals) models.append(model) except: pass #If it was possible to fit one/two components; else, zscore stays original if len(models) > 0: AIC = [m.aic(X.reshape(-1, 1)) for m in models] M_best = models[np.argmin(AIC)] #model with lowest AIC (1 or two components) means = M_best.means_.ravel() stds = np.sqrt(M_best.covariances_.ravel()) weights = M_best.weights_ #Sort components based no weight (largest component is background) idx_sort = np.argsort(-M_best.weights_) #Add components to bg_dist in order bg_dist[idx] = [] for i in idx_sort: bg_dist[idx].append((means[i], stds[i], weights[i])) dist_mean, dist_std, _ = bg_dist[idx][0] #the first distribution (largest component) is used as background zscore = (X - dist_mean) / dist_std zscores.loc[idx, distance_cols] = zscore #update ztscores for this idx return (zscores, bg_dist)
[docs] def analyze_signal_all(self, threads=1, method="zscore", threshold=2, min_count=1, save=None): """ After background correction is done, the signal is analyzed for peaks, indicating preferred binding distances. There can be more than one peak (more than one preferred binding distance) per Signal. Peaks are called with scipy.signal.find_peaks(). Parameters ---------- threads : int Number of threads used. Default: 1. method : str Method for transforming counts. Can be one of: "zscore" or "flat". If "zscore", the zscore for the pairs is used. If "flat", no transformation is performed. Default: "zscore". threshold : float The lower threshold for selecting peaks. Default: 2. min_count : int Minimum count of TF1-TF2 occurrences for a preferred distance to be called. Default: 1 (all occurrences are considered). save : str Path to save the peaks table to. Default: None (table is not written). Returns ------- None Fills the object variable self.peaks, self.peaking_count """ # Check given input if save is not None: tfcomb.utils.check_writeability(save) tfcomb.utils.check_value(threshold, name="threshold") tfcomb.utils.check_string(method, ["zscore", "flat"], name="method") # sanity checks self.check_datasource("distances") #----- Find preferred peaks -----# self.logger.info(f"Analyzing Signal with threads {threads}") datasource = self.datasource distance_cols = datasource.columns[2:].tolist() #Set threshold on the number of sites if min_count > 1: n = datasource.shape[0] counts = self.distances[distance_cols].sum(axis=1) #raw distances for counting sites datasource = datasource.loc[counts >= min_count, :] self.logger.info("Reduced number of rules from {0} to {1} having min_count >= {2}".format(n, datasource.shape[0], min_count)) # save params self.method = method self.threshold = threshold #Set input to functions self.logger.info("Calculating zscores for signals") if method == "zscore": self._get_zscores(datasource, threads=threads) signal = self.zscores elif method == "zscore-mixture": #under development; not applicable at the moment #If "zscore-mixture", a modified z-score on a Gaussian Mixture Model of counts is used. self._get_zscores(datasource, mixture=True, threads=threads) signal = self.zscores elif method == "flat": signal = datasource #no change in signal #Find peaks from input signal self.logger.info("Finding preferred distances") res = self._multiprocess_chunks(threads, tfcomb.utils.analyze_signal_chunks, signal) #Collect results from all pairs to pandas dataframe res = pd.concat([pd.DataFrame(pair_res) for pair_res in res]).reset_index(drop=True) #Format order of columns columns = ["TF1", "TF2", "Distance", "peak_heights", "prominences", "Threshold"] res = res[columns] res["Distance"] = res["Distance"].astype(int) # distances are float - change to int res.rename(columns={"peak_heights":"Peak Heights", "prominences": "Prominences"}, inplace=True) self.peaks = res # Add total counts of (TF1-TF2) to each peak self.peaks.index = self.peaks["TF1"] + "-" + self.peaks["TF2"] counts = self.distances[distance_cols].sum(axis=1) counts.name = "TF1_TF2_count" self.peaks = self.peaks.merge(counts.to_frame(), left_index=True, right_index=True) #Define list of distances included in each peak (depending on smooth window) distances = datasource.columns[2:].tolist() dist_min = min(distances) dist_max = max(distances) lf = int(np.floor((self.smooth_window-1) / 2.0)) rf = int(np.ceil((self.smooth_window-1) / 2.0)) self.peaks["dist_list"] = [list(range(max(dist-lf, dist_min), min(dist+rf, dist_max)+1)) for dist in self.peaks["Distance"]] #Get count per peak depending on smooth window self.peaks["Distance_count"] = [self.distances.loc[idx, dist_list].sum() for idx, dist_list in zip(self.peaks.index, self.peaks["dist_list"])] self.peaks["Distance_percent"] = (self.peaks["Distance_count"] / self.peaks["TF1_TF2_count"]) * 100 #Replace Distance + / - lf/rf self.peaks["Distance_window"] = ["[{0};{1}]".format(min(dist), max(dist)) for dist in self.peaks["dist_list"]] #Sort peaks on highest prominences self.peaks = self.peaks.sort_values("Prominences", ascending=False) self.peaks.drop(columns=["dist_list"], inplace=True) #Remove temporary column #----- Save stats on run -----# self.peaking_count = self.peaks.drop_duplicates(["TF1", "TF2"]).shape[0] #number of pairs with any peaks # QoL save of threshold and method self.thresh = self.rules[["TF1", "TF2"]] #save threshold for all rules, even those without peaks self.thresh["Threshold"] = threshold self.thresh["Method"] = method # Save dataframe if save is not None: self.peaks.to_csv(save) self.logger.info("Done analyzing signal. Results are found in .peaks")
#-------------------------------------------------------------------------------------------# #------------------------- Evaluation and ranking of found distances -----------------------# #-------------------------------------------------------------------------------------------#
[docs] def evaluate_noise(self, threads=1, method="median", height_multiplier=0.75): """ Evaluates the noisiness of the signal. Therefore the peaks are cut out and the remaining signal is analyzed. Parameters --------- threads : int Number of threads used for evaluation. Default: 1 method : str Measurement to calculate the noisiness of a signal. One of ["median", "min_max"]. Default: "median" height_multiplier : float Height multiplier (percentage) to calculate cut points. Must be between 0 and 1. Default: 0.75 See also -------- tfcomb.utils.evaluate_noise_chunks """ self.check_peaks() self._noise_method = method self._height_multiplier = height_multiplier datasource = self.datasource #Subset datasource to TFs with peaks peak_pairs = set(self.peaks.index) datasource = datasource[datasource.index.isin(peak_pairs)] #Evaluate noisiness self.logger.info(f"Evaluating noisiness of the signals with {threads} threads") res = self._multiprocess_chunks(threads, tfcomb.utils.evaluate_noise_chunks, datasource) noisiness = pd.DataFrame(res, columns=["TF1", "TF2", "Noisiness"]) # merge noisiness self.peaks = self.peaks.merge(noisiness) self.peaks.index = self.peaks["TF1"] + "-" + self.peaks["TF2"]
[docs] def rank_rules(self, by=["Distance_percent", "Peak Heights", "Noisiness"], calc_mean=True): """ ranks rules within each column specified. Parameters ---------- by : list of strings Columns for wich the rules should be ranked Default: ["Distance_percent", "Peak Heights", "Noisiness"] calc_mean : bool True if an extra column should be calculated containing the mean rank, false otherwise Default: True Raises ------- InputError If columns selection (parameter: by) is not valid. Returns ------- None adds a rank column for each criteria given plus one for the mean if set to True """ # check peaks are calculated + by are only valid columns self.check_peaks() if (not set(by).issubset(self.peaks.columns)): raise InputError(f"Column selection not valid. Possible column names to rank by: {self.peaks.columns.values}") # save col names for mean_rank rank_cols = [] # rank all given columns for col in by: # new column name rank_col = "rank_" + col # decide if biggest number = rank 1 or rank n if col =="Noisiness": self.peaks[rank_col] = self.peaks[col].rank(method="dense", ascending=1) else: self.peaks[rank_col] = self.peaks[col].rank(method="dense", ascending=0) rank_cols.append(rank_col) #Add mean rank to peaks if calc_mean: # calculate mean rank (from all column ranks) self.peaks["mean_rank"] = self.peaks[rank_cols].mean(axis=1) # nice to have the best ranks at the top self.peaks = self.peaks.sort_values(by="mean_rank")
#-------------------------------------------------------------------------------------------# #------------------- Additional functionality/analysis for distances -----------------------# #-------------------------------------------------------------------------------------------#
[docs] def mean_distance(self, source="datasource"): """ Get the mean distance for each rule in .rules. Returns --------- pandas.DataFrame containing "mean_distance" per rule. """ self.check_datasource(source) datasource = getattr(self, source) #Calculate weighted average sums = datasource.iloc[:,2:].sum(axis=1).values weights = datasource.iloc[:,2:].div(sums, axis=0) distances = datasource.columns[2:].tolist() avg = (weights * distances).sum(axis=1) #Convert series to df df = pd.DataFrame(avg).rename(columns={0:"mean_distance"}) return df
[docs] def max_distance(self, source="datasource"): """ Get the distance with the maximum signal for each rule in .rules. Parameters ----------- source : str The name of the datasource to use for calculation. Default: "datasource" (the current state of data). Returns --------- pandas.DataFrame containing "max_distance" per rule. """ self.check_datasource(source) datasource = getattr(self, source) df = datasource.iloc[:,2:].idxmax(axis=1).to_frame() #idxmax directly gives the name of column df.columns = ["max_distance"] return df
[docs] def analyze_hubs(self): """ Counts the number of different partners each transcription factor forms a peak with, **with at least one peak**. Returns -------- pd.Series A panda series with the tf as index and the count as integer """ self.check_peaks() occurrences= collections.Counter([x for (x,z) in set(self.peaks.set_index(["TF1","TF2"]).index)]) return pd.Series(occurrences)
[docs] def count_peaks(self): """ Counts the number of identified distance peaks per rules. Returns --------- pd.DataFrame A dataframe containing 'n_peaks' (column) for each TF1-TF2 rule (index) """ self.check_peaks() peak_counts = self.peaks.groupby(["TF1", "TF2"]).size().to_frame().rename(columns={0:"n_peaks"}).reset_index() peak_counts.index = peak_counts["TF1"] + "-" + peak_counts["TF2"] peak_counts.drop(columns=["TF1", "TF2"], inplace=True) #Merge peak counts with rules counts = self.rules[["TF1", "TF2"]] counts = counts.merge(peak_counts, left_index=True, right_index=True, how="left") counts = counts.fillna(0) #Fill with 0 for rules without peaks counts["n_peaks"] = counts["n_peaks"].astype(int) return(counts)
[docs] def classify_rules(self): """ Classify all rules True if at least one peak was found, False otherwise. Returns -------- None fills .classified """ self.check_peaks() self.logger.info("classifying rules") p_index = self.peaks.set_index(["TF1","TF2"]).index.drop_duplicates() datasource = self.datasource[["TF1","TF2"]] datasource["isPeaking"] = datasource.set_index(["TF1","TF2"]).index.isin(p_index) datasource.index = datasource["TF1"] + "-" + datasource["TF2"] self.classified = datasource self.logger.info(f"classifcation done. Results can be found in .classified")
[docs] def get_periodicity(self): """ Calculate periodicity for all rules via autocorrelation. Returns --------- None Fills the object variable .autocorrelation and .periodicity """ #Collect data datasource = self.datasource #raw/corrected/smoothed distances = datasource.columns.tolist()[2:] lags = range(1,len(distances)) #lag 1 to end of distances #Setup output table self.autocorrelation = pd.DataFrame(index=datasource.index, columns=lags) #Calculate autocorrelation per pair n_rules = datasource.shape[0] for i in range(n_rules): #for each pair counts = datasource.iloc[i,2:].values #Calculate autocorrelation autocorr = sm.tsa.acf(counts, nlags=len(lags), fft=True) autocorr = autocorr[1:] #exclude lag 0 self.autocorrelation.iloc[i,:] = autocorr #Find best periodicity per pair info = {} for i in range(n_rules): signal = self.autocorrelation.iloc[i,:] y = np.fft.rfft(signal) x = np.fft.rfftfreq(len(signal)) #Position of highest peak idx_max = np.argmax(y) info[self.autocorrelation.index[i]] = {"period": np.round(1/x[idx_max], 2), "amplitude": np.round(np.abs(y[idx_max]), 3)} self.periodicity = pd.DataFrame().from_dict(info, orient="index")
[docs] def plot_autocorrelation(self, pair): """ Plot the autocorrelation for a pair, which shows the lag of periodicity in the counted distances. Parameters ------------ pair : tuple(str, str) TF names to plot. e.g. ("NFYA","NFYB") """ from statsmodels.graphics import tsaplots #check pair name = "-".join(pair) #Fetch signal for pair datasource = self.datasource signal = datasource.loc[name][2:] #raw/corrected/smoothed distances = datasource.columns.tolist()[2:] lags = range(1,len(distances)) #lag 1 to end of distances fig, ax = plt.subplots() fig = tsaplots.plot_acf(signal, lags=len(lags), zero=False, alpha=None, title=f"Autocorrelation for {name}", ax=ax) ax.spines['right'].set_visible(False) ax.spines['top'].set_visible(False) ax.set_xlabel("Lag (bp)") ax.set_ylabel("Correlation coefficient") return ax
[docs] def build_network(self): """ Builds a TF-TF co-occurrence network for the rules within object. Returns ------- None fills the .network attribute of the `CombObj` with a networkx.Graph object See also ------- tfcomb.network.build_nx_network """ #Build network self.logger.debug("Building network using tfcomb.network.build_network") self.network = tfcomb.network.build_network(self.peaks, node_table=self.TF_table, verbosity=self.verbosity) self.logger.info("Finished! The network is found within <CombObj>.<distObj>.network.")
#-------------------------------------------------------------------------------------------------------------# #---------------------------------------------- Plotting -----------------------------------------------------# #-------------------------------------------------------------------------------------------------------------#
[docs] def plot_bg_estimation(self, pair): """ Plot the background estimation for pair for debugging background estimation Parameters ------------ pair : tuple(str, str) TF names to plot. e.g. ("NFYA","NFYB") """ TF1, TF2 = pair ind = TF1 + "-" + TF2 # construct index #Get counts for distances distances = self.datasource.columns.tolist()[2:] counts = self.datasource.loc[ind, distances] #Initialize plot _, ax = plt.subplots(ncols=2, figsize=(10,4)) #Plot distribution of distances sns.histplot(x=distances, weights=counts, ax=ax[0], bins=len(distances)) ax[0].set_xlabel("Distance in bp") ax[0].set_ylabel("Counts per bp") #Plot hist of distances sns.histplot(x=counts, ax=ax[1], stat="probability", color="grey") ax[1].set_xlabel("Counts per distance") #Add background distributions dist_list = self.bg_dist.get(ind, []) for i, (mu, std, weight) in enumerate(dist_list): name = "Background distribution" if i == 0 else "Upper distribution" #Create normal dist with mu/std x = np.linspace(min(counts), max(counts), 100) rv = stats.norm(mu, std) pdf = rv.pdf(x) ax[1].plot(x, pdf, label="{0} ({1:.1f}%)".format(name, weight*100)) #Final adjustments ax[1].legend() plt.subplots_adjust(wspace=0.3)
def _plot_all(self, save_path, method): """ Plots all pairs. Parameters ---------- save_path : str Directory path to save plots to. Filename will be created with "{method}_{tf1}_{tf2}.png". e.g. save_path/linres_NFYA_NFYB.png method : str Plotting style See also ---------- tfcomb.objects.DistObj.plot """ self.check_min_max_dist() self.check_datasource("distances") tfcomb.utils.check_dir(save_path) # warn user self.logger.info(f"Plotting {method}-plots for all pairs. This may take a while.") for tf1,tf2 in list(zip(self.distances.TF1, self.distances.TF2)): self.plot((tf1, tf2), method=method, save=os.path.join(save_path,f"{method}_{tf1}_{tf2}.png")) def _collapse_negative(self, sourcetable, method="max"): """ Method to collapse negative coulmns(-min distance to 0). e.g. columns [-3, -2, -1] will ne summarized as "neg". Parameters ----------- sourcetable: pd.DataFrame DataFrame which should be collapsed (e.g. .smoothed or distances) method : str, optional Summarization Method. One of ["max","min","mean","sum"]. e.g. with "max" 2.3 is chosen from [1.2, 2.3, 1.5], for sum: "neg" would be 5 Default: max Returns ----------- pd.DataFrame collapsed dataframe for plotting purposes pd.DataFrame, None altered peak list (negative distances are replaced with "neg"). If no peaks found yet, None will be returned Notes -------- This method is intended to alter the plots produced, not to run analysis on it. Although it is possible with most of the functions, we recommend not to use collapsed data for analysis steps. To revert collapsing, use .expand_negative(). See also -------- tfcomb.objects.expand_negative """ # check method tfcomb.utils.check_string(method, ["max","min","mean","sum"]) # check if negative positions possible if self.min_dist >= 0: raise InputError("Data must contain negative position to collapse") datasource = sourcetable # create negative columns neg_cols = neg_cols = [ x for x in range(self.min_dist,0)] #select negative part neg = datasource[["TF1","TF2"] + neg_cols] #add negative column according to method if method =="max": neg["neg"] = neg[neg_cols].max(axis=1) elif method=="min": neg["neg"] = neg[neg_cols].min(axis=1) elif method=="mean": neg["neg"] = neg[neg_cols].mean(axis=1) elif method=="sum": neg["neg"] = neg[neg_cols].sum(axis=1) # remove negative columns from df datasource = datasource.drop(neg_cols, axis=1) try: # try inserting collapsed negative value datasource.insert(2, "neg", neg["neg"]) except ValueError: raise InputError("Datasource already contains a negative column") # alter peaks list peaks = None if self.peaks is not None: peaks = copy.deepcopy(self.peaks) # replace every negative position with "neg" [peaks["Distance"].replace(x,"neg", inplace=True) for x in range(self.min_dist,0)] return (datasource, peaks)
[docs] def plot(self, pair, method="peaks", style="hist", show_peaks=True, save=None, config=None, collapse=None, ax=None, color='tab:blue', max_dist=None, **kwargs): """ Produces different plots. Parameters ----------- pair : tuple(str, str) TF names to plot. e.g. ("NFYA","NFYB") method : str, optional Plotting method. One of: - 'peaks': Shows the z-score signal and any peaks found by analyze_signal_all. - 'correction': Shows the fit of the lowess curve to the data. - 'datasource', 'distances', 'scaled', 'corrected', 'smoothed': Shows the signal of the counts given in the .<method> table. Default: 'peaks'. style : str, optional What style to plot the datasource in. Can be one of: ["hist", "kde", "line"]. Default: "hist". show_peaks : bool, optional Whether to show the identified peak(s) (if any were found) in the plot. Default: True. save: str, optional Path to save the plots to. If save is None plots won't be plotted. Default: None config : dict, optional Config for some plotting methods. \n e.g. {"nbins":100} for histogram like plots or {"bwadjust":0.1} for kde (densitiy) plot. \n If set to *None*, below mentioned default parameters are used.\n possible parameters: \n [hist]: n_bins, Default: self.max_dist - self.min_dist + 1 \n [kde]: bwadjust, Default: 0.1 (see seaborn.kdeplot()) \n Default: None collapse: str, optional None if negative data should not be collapsed. ["min","max","mean","sum"] allowed as methods. See ._collapse_negative() for more information. ax : plt.axis Plot to an existing axis object. Default: None (a new axis will be created). color : str, optional Color of the plot hist/line/kde. Default: "tab:blue". max_dist : int, optional Option to set the max_dist independent of the max_dist used for counting distances. Default: None (max_dist is not changed). kwargs : arguments Additional arguments are passed to plt.hist(). """ #---------------- Input checks ------------------# self.check_datasource("distances") self.check_pair(pair) self.check_min_max_dist() # check method given is supported tfcomb.utils.check_string(style, ["hist", "kde", "line"], name="style") supported_methods = ["peaks", "correction", "datasource", "distances", "scaled", "corrected", "smoothed"] tfcomb.utils.check_string(method, supported_methods) if collapse is not None: tfcomb.utils.check_string(collapse, ["min","max","mean","sum"]) #collapse is not valid for other styles than hist if style != "hist": raise InputError("Setting 'collapse' is only valid for style='hist'. Please adjust input parameters.") # check config is dict tfcomb.utils.check_type(config, [dict, type(None)], "config") # check save is writeable if save is not None: tfcomb.utils.check_writeability(save) # get TF1, TF2 out of pair tf1, tf2 = pair ind = tf1 + "-" + tf2 # construct index #---------------- Setup data to plot --------------# ylbl = "Count per distance" # standard ylbl, replace with specific option if needed #Select datasource to plot if method == "peaks": #Checking if peaks were analyzed if not hasattr(self, "thresh"): #'peaks' was chosen, but signals were not yet analyzed. Falling back to signal. method = "signal" source_table = self.datasource else: #check that pair was in zscores if not ind in self.thresh.index: self.logger.warning("TF pair '{0}' distances were counted, but not analyzed due to thresholds set in analyze_signal_all. Falling back to viewing signal.".format(ind)) method = "signal" source_table = self.datasource #check which method was used for calculating peaks if self.thresh.loc[ind, "Method"] == "flat": source_table = self.datasource elif self.thresh.loc[ind, "Method"] == "zscore": source_table = self.zscores ylbl = "Z-score normalized counts" elif method == "correction": if style == "kde": raise InputError("Style 'kde' is not valid for method 'correction'. Please select another style or method.") self.check_datasource("uncorrected") source_table = self.uncorrected else: self.check_datasource(method) source_table = getattr(self, method) if method == "corrected": ylbl = "Corrected count per distance" if method == "scaled": ylbl = "Scaled count per distance" # collapse or keep negative distances if collapse: source_table, peak_df = self._collapse_negative(source_table, method=collapse) #replaces all negative distances/peaks with "neg" else: peak_df = self.peaks #Establish x and y-values x_data = source_table.columns[2:].values #x_data is distances y_data = source_table.loc[ind].iloc[2:].values #Reduce to max_dist if chosen if max_dist is not None: x_bool = [x <= max_dist for x in x_data] x_data = x_data[x_bool] y_data = y_data[x_bool] #Split neg from data if neg was collapsed if collapse: #replace neg with a positional offset neg_pos = int(min(x_data[1:]) - int((max(x_data[1:]) - min(x_data[1:])) * 0.05)) # -5% of the full range of values x_neg_data = neg_pos y_neg_data = y_data[0] x_data = x_data[1:] y_data = y_data[1:] #Replace config with default parameters config = {} if config is None else config #initialize empty config default = {"bins": len(x_data), "bw_adjust": 0.1} for key in default: if key not in config: config[key] = default[key] #Check format of input config tfcomb.utils.check_type(config["bins"], [int], "bins") tfcomb.utils.check_value(config["bw_adjust"], vmin=0, name="bw_adjust") #-------------- Start plotting -------------# # start subplot if ax == None: _, ax = plt.subplots() #Plot signal from source using different styles if style == "kde": # kde plot, see seaborn.kdeplot() for more information if min(y_data) < 0: raise InputError("Style 'kde' is not valid for negative input data, e.g. if counts were corrected or if plotting zscores. Please select another method or style.") sns.kdeplot(x_data, weights=y_data, bw_adjust=config["bw_adjust"], x="distance", ax=ax, color=color) elif style == "hist": #get location of bins _, bin_edges = np.histogram(x_data, weights=y_data, bins=config["bins"], range=(min(x_data), max(x_data)+1)) #range ensures that the last bin is included ax.hist(x_data, weights=y_data, bins=bin_edges-0.5, density=False, color=color, **kwargs) #if collapsed, plot positive and negative data separately if collapse: plt.bar(x_neg_data, y_neg_data, color='tab:orange') #plt.bar already aligns the bar to the center position, so no -0.5 adjustment is needed #plot axis line if y_data goes below zero (e.g. for zscore) if min(y_data) < 0: ax.axhline(0, color="tab:blue", lw=0.5) elif style == "line": ax.plot(x_data, y_data, color=color) #------------ Plot additional elements (peaks / background) ----------# if method == "peaks": thresh = self.thresh.loc[((self.thresh["TF1"] == tf1) & (self.thresh["TF2"] == tf2))].iloc[0,2] ax.axhline(thresh, ls="--", color="grey", label="Threshold") #plot the threshold in the z-score range elif method == "correction": #Add lowess line lowess = self.lowess.loc[ind, x_data] plt.plot(x_data, lowess, color="red", label="Lowess smoothing", lw=2) ax.legend() #Show peaks in plot if show_peaks == True: #Check if peaks were calculated if peak_df is not None: #Fetch peaks and threshold line peak_positions = peak_df.loc[((peak_df["TF1"] == tf1) & (peak_df["TF2"] == tf2))].iloc[:,2] #peak positions in bp #If any peaks were found if len(peak_positions) > 0: # get indices of the peaks (mainly needed for ranges not starting with position 0) peak_idx = [list(x_data).index(peak) for peak in peak_positions if peak in x_data] x_crosses = x_data[peak_idx] #x-values for peaks y_crosses = y_data[peak_idx] #y-values for peaks # check if at least one peak is at the negative position if collapse: if "neg" in peak_positions.tolist(): x_crosses.insert(0, x_neg_data) y_crosses.insert(0, y_neg_data) ax.plot(x_crosses, y_crosses, "x", color="red", label="Peaks") #plot peaks as crosses ax.legend() #-------- Done plotting data; making final changes to axes -------# # add labels to plot if self.percentage == True: ax.set_xlabel('Distance (%)') else: ax.set_xlabel('Distance (bp)') ax.set_ylabel(ylbl) #Make x-axis labels pretty xlim = ax.get_xlim() #get xlim before changes xt = ax.get_xticks() #positions of xticklabels xtl = xt.astype(int) #labels #Add nice x-axis label for collapsed negative counts if collapse: pos_idx = [i for i, l in enumerate(xtl) if l >= 0] xt = [x_neg_data] + list(xt[pos_idx]) #x_neg_data contains the position of the neg bar xtl = ["neg"] + list(xtl[pos_idx]) ax.set_xticks(xt) #explicitly set xticks to prevent matplotlib error ax.set_xticklabels(xtl, rotation=self._XLBL_ROTATION, fontsize=self._XLBL_FONTSIZE) ax.set_xlim(xlim) #set xlim back to original # Final adjustments of title and spines ax.set_title("{0}-{1}".format(*pair)) ax.spines['right'].set_visible(False) ax.spines['top'].set_visible(False) #save plot if save is not None: plt.savefig(save, dpi=600, bbox_inches="tight") return ax
[docs] def plot_network(self, color_node_by="TF1_count", color_edge_by="Distance", size_edge_by="Distance_percent", **kwargs): """ Plot the rules in .rules as a network using Graphviz for python. This function is a wrapper for building the network (using tfcomb.network.build_network) and subsequently plotting the network (using tfcomb.plotting.network). Parameters ----------- color_node_by : str, optional A column in .rules or .TF_table to color nodes by. Default: 'TF1_count'. color_edge_by : str, optional A column in .rules to color edges by. Default: 'Distance'. size_edge_by : str, optional A column in rules to size edge width by. Default: 'TF1_TF2_count'. **kwargs : arguments All other arguments are passed to tfcomb.plotting.network. See also -------- tfcomb.network.build_network and tfcomb.plotting.network """ #Fetch network from object or build network if self.network is None: self.logger.warning("The .network attribute is not set yet - running build_network().") self.build_network() #running build network() #Plot network G = self.network dot = tfcomb.plotting.network(G, color_node_by=color_node_by, color_edge_by=color_edge_by, size_edge_by=size_edge_by, verbosity=self.verbosity, **kwargs) return(dot)