Last active
October 20, 2015 07:29
-
-
Save moonwatcher/87e9ebaf69930cc161ce to your computer and use it in GitHub Desktop.
Calculate PCC and CSI values
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# Author: Lior Galanti < [email protected] > | |
# NYU Center for Genetics and System Biology 2015 | |
import sys | |
import io | |
import json | |
import uuid | |
import re | |
import math | |
KEEP = False | |
# METHOD = 'abs' # using an absolute value means treating for positive and negative correlations | |
METHOD = 'hueyling' # wrong on so many ways... | |
# METHOD = 'positive' # set CSI to 0 when pivot is negative or 0 | |
CSI_CUTOFF = 0.7 | |
def to_json(node): | |
return json.dumps(node, sort_keys=True, ensure_ascii=False, indent=4) | |
def pcc(data): | |
def pearson(left, right): | |
# actually this calculates the Phi coefficient, which is the same as pearson for binary data | |
# see: https://en.wikipedia.org/wiki/Phi_coefficient | |
result = None | |
c = [[0,0,0], [0,0,0], [0,0,0]] | |
for i in range(len(left[2:])): | |
if left[i] == 1: | |
if right[i] == 1: | |
c[0][0] += 1 | |
else: | |
c[0][1] += 1 | |
else: | |
if right[i] == 1: | |
c[1][0] += 1 | |
else: | |
c[1][1] += 1 | |
c[0][2] = c[0][0] + c[0][1] | |
c[1][2] = c[1][0] + c[1][1] | |
c[2][0] = c[0][0] + c[1][0] | |
c[2][1] = c[0][1] + c[1][1] | |
c[2][2] = c[2][0] + c[2][1] | |
numerator = (float(c[0][0]) * float(c[1][1])) - (float(c[0][1]) * float(c[1][0])) | |
denominator = math.sqrt((float(c[2][0])*float(c[2][1])*float(c[0][2])*float(c[1][2]))) | |
if denominator != 0: | |
result = numerator / denominator | |
return result | |
pcc = [] | |
for i in range(len(data['table'])): | |
row = [] | |
for j in range(len(data['table'])): | |
left = data['table'][i] | |
right = data['table'][j] | |
row.append(pearson(data['table'][i], data['table'][j])) | |
pcc.append(row) | |
return pcc | |
def csi(pcc): | |
def related(pcc, i, j): | |
result = None | |
pivot = pcc[i][j] | |
if pivot is not None: | |
result = 0 | |
for r in range(len(pcc)): | |
left = pcc[r][i] | |
right = pcc[r][j] | |
if METHOD == 'positive': | |
if pivot > 0: | |
if left is not None and left - pivot >= 0.05: | |
result += 1 | |
elif right is not None and right - pivot >= 0.05: | |
result += 1 | |
else: | |
result = len(pcc) | |
if METHOD == 'abs': | |
if left is not None and abs(abs(left) - abs(pivot)) <= 0.05: | |
result += 1 | |
elif right is not None and abs(abs(right) - abs(pivot)) <= 0.05: | |
result += 1 | |
elif METHOD == 'hueyling': | |
if left is not None and left - pivot >= 0.05: | |
result += 1 | |
elif right is not None and right - pivot >= 0.05: | |
result += 1 | |
result = 1.0 - (float(result) / float(len(pcc))) | |
return result | |
csi = [] | |
for i in range(len(pcc)): | |
row = [] | |
for j in range(len(pcc)): | |
row.append(related(pcc, i, j)) | |
csi.append(row) | |
return csi | |
def load(path): | |
def check(row): | |
return any([i == 1 for i in row[2:]]) and any([i == 0 for i in row[2:]]) | |
data = { | |
'table': [], | |
'oligo': [], | |
'gene': [], | |
'locus': [], | |
'description': [], | |
'head': None | |
} | |
header = True | |
with io.open(path, 'r') as f: | |
for line in f: | |
row = line.strip('\n').split(',') | |
if header: | |
data['head'] = row | |
header = False | |
else: | |
row = row[0:2] + [ int(i) for i in row[2:38] ] | |
if KEEP or check(row): | |
data['table'].append(row) | |
for i in range(len(data['table'])): | |
data['oligo'].append(data['table'][i][0]) | |
data['gene'].append(data['table'][i][1]) | |
data['table'][i] = data['table'][i][2:] | |
functional = { 'table': [], 'head': None, 'index':{ 'molecular_name': {}} } | |
header = True | |
molecular_name_strip = re.compile('[a-z]$') | |
with io.open('c_elegans.PRJNA13758.WS250.functional_descriptions.txt', 'r') as f: | |
for line in f: | |
line = line.strip() | |
if line[0] != '#': | |
if header: | |
row = line.strip('\n').split(' ') | |
functional['head'] = row | |
header = False | |
else: | |
row = line.strip('\n').split('\t') | |
row[2] = molecular_name_strip.sub('', row[2]) | |
functional['table'].append(row) | |
if row[2] and row[2] != 'not know': | |
functional['index']['molecular_name'][row[2]] = row | |
for name in data['gene']: | |
if name in functional['index']['molecular_name']: | |
f = functional['index']['molecular_name'][name] | |
data['locus'].append(f[1]) | |
data['description'].append(f[3]) | |
else: | |
data['locus'].append(name) | |
data['description'].append('unknown') | |
return data | |
def csv(matrix, names, path): | |
with io.open(path, 'w') as f: | |
f.write(','.join([''] + names)) | |
f.write('\n') | |
for i in range(len(matrix)): | |
f.write(','.join([names[i]] + ['NA' if j is None else '{:.2}'.format(j) for j in matrix[i]])) | |
f.write('\n') | |
def table_nodes(data): | |
with io.open('table_nodes.csv', 'w') as f: | |
for index,name in enumerate(data['gene']): | |
f.write('{}\t{}'.format(name, data['description'][index])) | |
f.write('\n') | |
def table_syn(data): | |
with io.open('table_syn.csv', 'w') as f: | |
for index,name in enumerate(data['gene']): | |
# public_name from c_elegans.PRJNA13758.WS250.functional_descriptions.txt | |
f.write('{}\t{}\t{}\t{}'.format(name, data['locus'][index], 'locus', '1')) | |
f.write('\n') | |
# same as the gene name from the data file / same as molecular_name (after stripping the [a-z] from the end) | |
# this is used to match to the c_elegans.PRJNA13758.WS250.functional_descriptions.txt file | |
f.write('{}\t{}\t{}\t{}'.format(name, name, 'CosmidID', '2')) | |
f.write('\n') | |
def table_gnbi(data, csi): | |
with io.open('table_gnbi.csv', 'w') as f: | |
for i in range(len(csi)): | |
for j in range(len(csi)): | |
value = csi[i][j] | |
if i > j and value > CSI_CUTOFF: | |
one = data['gene'][i] | |
two = data['gene'][j] | |
f.write('{}\t{}\t{}\t{}\t{}\t{}'.format(one, two, 'pc', 'csi', '{:.3}'.format(value), '0')) | |
f.write('\n') | |
path = sys.argv[1] | |
data = load(path) | |
pcc = pcc(data) | |
csi = csi(pcc) | |
csv(pcc, data['gene'], 'ppc.csv') | |
csv(csi, data['gene'], 'csi.csv') | |
table_nodes(data) | |
table_syn(data) | |
table_gnbi(data, csi) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment