Skip to content

Instantly share code, notes, and snippets.

@moonwatcher
Last active October 20, 2015 07:29
Show Gist options
  • Save moonwatcher/87e9ebaf69930cc161ce to your computer and use it in GitHub Desktop.
Save moonwatcher/87e9ebaf69930cc161ce to your computer and use it in GitHub Desktop.
Calculate PCC and CSI values
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: Lior Galanti < [email protected] >
# NYU Center for Genetics and System Biology 2015
import sys
import io
import json
import uuid
import re
import math
KEEP = False
# METHOD = 'abs' # using an absolute value means treating for positive and negative correlations
METHOD = 'hueyling' # wrong on so many ways...
# METHOD = 'positive' # set CSI to 0 when pivot is negative or 0
CSI_CUTOFF = 0.7
def to_json(node):
return json.dumps(node, sort_keys=True, ensure_ascii=False, indent=4)
def pcc(data):
def pearson(left, right):
# actually this calculates the Phi coefficient, which is the same as pearson for binary data
# see: https://en.wikipedia.org/wiki/Phi_coefficient
result = None
c = [[0,0,0], [0,0,0], [0,0,0]]
for i in range(len(left[2:])):
if left[i] == 1:
if right[i] == 1:
c[0][0] += 1
else:
c[0][1] += 1
else:
if right[i] == 1:
c[1][0] += 1
else:
c[1][1] += 1
c[0][2] = c[0][0] + c[0][1]
c[1][2] = c[1][0] + c[1][1]
c[2][0] = c[0][0] + c[1][0]
c[2][1] = c[0][1] + c[1][1]
c[2][2] = c[2][0] + c[2][1]
numerator = (float(c[0][0]) * float(c[1][1])) - (float(c[0][1]) * float(c[1][0]))
denominator = math.sqrt((float(c[2][0])*float(c[2][1])*float(c[0][2])*float(c[1][2])))
if denominator != 0:
result = numerator / denominator
return result
pcc = []
for i in range(len(data['table'])):
row = []
for j in range(len(data['table'])):
left = data['table'][i]
right = data['table'][j]
row.append(pearson(data['table'][i], data['table'][j]))
pcc.append(row)
return pcc
def csi(pcc):
def related(pcc, i, j):
result = None
pivot = pcc[i][j]
if pivot is not None:
result = 0
for r in range(len(pcc)):
left = pcc[r][i]
right = pcc[r][j]
if METHOD == 'positive':
if pivot > 0:
if left is not None and left - pivot >= 0.05:
result += 1
elif right is not None and right - pivot >= 0.05:
result += 1
else:
result = len(pcc)
if METHOD == 'abs':
if left is not None and abs(abs(left) - abs(pivot)) <= 0.05:
result += 1
elif right is not None and abs(abs(right) - abs(pivot)) <= 0.05:
result += 1
elif METHOD == 'hueyling':
if left is not None and left - pivot >= 0.05:
result += 1
elif right is not None and right - pivot >= 0.05:
result += 1
result = 1.0 - (float(result) / float(len(pcc)))
return result
csi = []
for i in range(len(pcc)):
row = []
for j in range(len(pcc)):
row.append(related(pcc, i, j))
csi.append(row)
return csi
def load(path):
def check(row):
return any([i == 1 for i in row[2:]]) and any([i == 0 for i in row[2:]])
data = {
'table': [],
'oligo': [],
'gene': [],
'locus': [],
'description': [],
'head': None
}
header = True
with io.open(path, 'r') as f:
for line in f:
row = line.strip('\n').split(',')
if header:
data['head'] = row
header = False
else:
row = row[0:2] + [ int(i) for i in row[2:38] ]
if KEEP or check(row):
data['table'].append(row)
for i in range(len(data['table'])):
data['oligo'].append(data['table'][i][0])
data['gene'].append(data['table'][i][1])
data['table'][i] = data['table'][i][2:]
functional = { 'table': [], 'head': None, 'index':{ 'molecular_name': {}} }
header = True
molecular_name_strip = re.compile('[a-z]$')
with io.open('c_elegans.PRJNA13758.WS250.functional_descriptions.txt', 'r') as f:
for line in f:
line = line.strip()
if line[0] != '#':
if header:
row = line.strip('\n').split(' ')
functional['head'] = row
header = False
else:
row = line.strip('\n').split('\t')
row[2] = molecular_name_strip.sub('', row[2])
functional['table'].append(row)
if row[2] and row[2] != 'not know':
functional['index']['molecular_name'][row[2]] = row
for name in data['gene']:
if name in functional['index']['molecular_name']:
f = functional['index']['molecular_name'][name]
data['locus'].append(f[1])
data['description'].append(f[3])
else:
data['locus'].append(name)
data['description'].append('unknown')
return data
def csv(matrix, names, path):
with io.open(path, 'w') as f:
f.write(','.join([''] + names))
f.write('\n')
for i in range(len(matrix)):
f.write(','.join([names[i]] + ['NA' if j is None else '{:.2}'.format(j) for j in matrix[i]]))
f.write('\n')
def table_nodes(data):
with io.open('table_nodes.csv', 'w') as f:
for index,name in enumerate(data['gene']):
f.write('{}\t{}'.format(name, data['description'][index]))
f.write('\n')
def table_syn(data):
with io.open('table_syn.csv', 'w') as f:
for index,name in enumerate(data['gene']):
# public_name from c_elegans.PRJNA13758.WS250.functional_descriptions.txt
f.write('{}\t{}\t{}\t{}'.format(name, data['locus'][index], 'locus', '1'))
f.write('\n')
# same as the gene name from the data file / same as molecular_name (after stripping the [a-z] from the end)
# this is used to match to the c_elegans.PRJNA13758.WS250.functional_descriptions.txt file
f.write('{}\t{}\t{}\t{}'.format(name, name, 'CosmidID', '2'))
f.write('\n')
def table_gnbi(data, csi):
with io.open('table_gnbi.csv', 'w') as f:
for i in range(len(csi)):
for j in range(len(csi)):
value = csi[i][j]
if i > j and value > CSI_CUTOFF:
one = data['gene'][i]
two = data['gene'][j]
f.write('{}\t{}\t{}\t{}\t{}\t{}'.format(one, two, 'pc', 'csi', '{:.3}'.format(value), '0'))
f.write('\n')
path = sys.argv[1]
data = load(path)
pcc = pcc(data)
csi = csi(pcc)
csv(pcc, data['gene'], 'ppc.csv')
csv(csi, data['gene'], 'csi.csv')
table_nodes(data)
table_syn(data)
table_gnbi(data, csi)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment