Last active
November 24, 2015 19:59
-
-
Save jpr71/71463a989ca4415265fc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gzip | |
import os.path | |
from peewee import * | |
from subprocess import check_output | |
from collections import OrderedDict | |
import re | |
build = "WS245" | |
gff_url = "ftp://ftp.wormbase.org/pub/wormbase/releases/{build}/species/c_elegans/PRJNA13758/c_elegans.PRJNA13758.{build}.annotations.gff3.gz".format(build = build) | |
gff = "c_elegans.{build}.gff3.gz".format(build = build) | |
if not os.path.exists(gff): | |
comm = "curl {gff_url} > {gff}".format(**locals()) | |
print(check_output(comm, shell = True)) | |
os.remove("worms.db") | |
db = SqliteDatabase('worms.db') | |
db.connect() | |
class BaseModel(Model): | |
class Meta: | |
database = db | |
class Region(BaseModel): | |
chrom= CharField() | |
type_of = CharField(null = True) | |
start = IntegerField() | |
end = IntegerField() | |
strand = CharField() | |
reading_frame = CharField() | |
class Meta: | |
order_by = ('chrom',) | |
class Id_Set(BaseModel): | |
region = ForeignKeyField(Region, related_name="ID", null=True ) | |
id_key = CharField(null=True) | |
id_value = CharField(null=True) | |
class Meta: | |
database = db | |
db.create_tables([Region,Id_Set]) | |
def boolify(s): | |
if s == 'True': | |
return True | |
if s == 'False': | |
return False | |
raise ValueError("huh?") | |
def autoconvert(s): | |
for fn in (boolify, int, float): | |
try: | |
return fn(s) | |
except ValueError: | |
pass | |
return s | |
correct_ids = set(["locus", "Name", "ID", "sequence_name"]) | |
with gzip.open(gff, 'rb') as f: | |
c = 0 | |
while True: | |
c += 1 | |
if c > 2000000: | |
break | |
line = f.next().strip().split("\t") | |
if line[0].startswith("#"): | |
continue | |
header = ["chrom", "source", "type_of", "start", "end", "score", "strand", "reading_frame", "attributes"] | |
if line[1] == "WormBase": | |
#Put in bulk, every 1000th iteration | |
record = OrderedDict(zip(header, map(autoconvert, line))) | |
record = {k:v for k,v in record.items() if k not in ["source", "score", "strand", "reading_frame"]} | |
attributes = {} | |
region_save = {k:v for k,v in record.items() if k != "attributes"} | |
region_id = Region.create(**region_save) | |
region_id.save() | |
for ID, VAL in re.findall("([^=;]+)=([^; ]+)", record["attributes"]): | |
if ID in correct_ids: | |
if VAL.find(":") > 0: | |
VAL = VAL.split(":")[1] | |
attributes["id_key"]= ID | |
attributes["id_value"] = VAL | |
attributes["region"] = region_id.get_id() | |
#print attributes | |
id_save = {k:v for k,v in attributes.items()} | |
id_set = Id_Set.create(**id_save) | |
id_set.save() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment