Last active
September 5, 2022 22:04
-
-
Save hubgit/a99d6387773164400bb88a2ce7c98f8a to your computer and use it in GitHub Desktop.
A ReadableStream created from an async iterator which fetches paginated data, piped into a WritableStream which inserts items into an SQLite database.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { parse } from 'https://deno.land/x/[email protected]/mod.ts' | |
import { readableStreamFromIterable } from 'https://deno.land/[email protected]/io/streams.ts' | |
import { Database } from 'https://deno.land/x/[email protected]/mod.ts' | |
import ProgressBar from 'https://deno.land/x/[email protected]/mod.ts' | |
let counter = 0 | |
const progress = new ProgressBar({ | |
title: 'processing:', | |
interval: 100, | |
display: ':completed/:total :time [:bar] :percent (ETA :eta)', | |
}) | |
const fetchGenbank = async function* (term: string) { | |
const url = new URL( | |
'https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi' | |
) | |
url.searchParams.set('db', 'nucleotide') | |
url.searchParams.set('retmode', 'json') | |
url.searchParams.set('retmax', '0') | |
url.searchParams.set('usehistory', 'y') | |
url.searchParams.set('term', term) | |
const response = await fetch(url) | |
if (!response.ok) { | |
throw new Error(response.statusText) | |
} | |
const { esearchresult } = await response.json() | |
const retmax = 100 | |
for (let retstart = 0; retstart < esearchresult.count; retstart += retmax) { | |
const url = new URL( | |
'https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi' | |
) | |
url.searchParams.set('db', 'nuccore') | |
url.searchParams.set('retmode', 'xml') | |
url.searchParams.set('rettype', 'fasta') | |
url.searchParams.set('retstart', String(retstart)) | |
url.searchParams.set('retmax', String(retmax)) | |
url.searchParams.set('webenv', esearchresult.webenv) | |
url.searchParams.set('query_key', esearchresult.querykey) | |
const response = await fetch(url) | |
if (!response.ok) { | |
throw new Error(response.statusText) | |
} | |
const xml = await response.text() | |
const { TSeqSet } = parse(xml) as unknown as { | |
TSeqSet: { | |
TSeq: { | |
TSeq_sequence: string | |
TSeq_accver: string | |
}[] | |
} | |
} | |
for (const TSeq of TSeqSet.TSeq) { | |
yield TSeq | |
progress.render(++counter, { total: esearchresult.count }) | |
} | |
} | |
} | |
const buildAllWords = (sequence: string, length: number) => { | |
const words: Array<[string, number]> = [] | |
const max = sequence.length - length | |
for (let pos = 0; pos <= max; pos++) { | |
words.push([sequence.substring(pos, pos + length), pos]) | |
} | |
return words | |
} | |
class SQLiteStream extends WritableStream { | |
constructor(private path: string) { | |
const database = new Database(path) | |
database.run('PRAGMA synchronous = OFF') | |
database.run('PRAGMA journal_mode = MEMORY') | |
database.run('PRAGMA locking_mode = EXCLUSIVE') | |
database.run('create table `words` (`word`, `accver`, `pos` integer)') | |
database.run( | |
'create table `items` (`accver`, `orgname`, `defline`, `seqlength` integer)' | |
) | |
const insertWordStatement = database.prepare( | |
'insert into `words` (`word`, `accver`, `pos`) values (?, ?, ?)' | |
) | |
const insertItemStatement = database.prepare( | |
'insert into `items` (`accver`, `orgname`, `defline`, `seqlength`) values (?, ?, ?, ?)' | |
) | |
const insertWords = database.transaction((item) => { | |
const { | |
TSeq_accver: accver, | |
TSeq_orgname: orgname, | |
TSeq_defline: defline, | |
TSeq_length: seqlength, | |
} = item | |
for (const [word, pos] of item.words) { | |
insertWordStatement.run([word, accver, pos]) | |
} | |
insertItemStatement.run([accver, orgname, defline, seqlength]) | |
}) | |
super({ | |
write(item) { | |
if (item.TSeq_accver) { | |
item.words = buildAllWords(item.TSeq_sequence, 12) | |
insertWords(item) | |
} | |
}, | |
close() { | |
insertWordStatement.finalize() | |
insertItemStatement.finalize() | |
console.log('creating index…') | |
database.run('create index `idx_word` on words(`word`)') | |
database.close() | |
console.log('done') | |
}, | |
}) | |
} | |
} | |
const term = [ | |
'(Coronaviridae[Organism] NOT Severe acute respiratory syndrome coronavirus 2[Organism])', | |
'MN908947[Accession]', | |
].join(' OR ') | |
try { | |
await Deno.remove('data/words.db') | |
} catch { | |
// didn't exist | |
} | |
await readableStreamFromIterable(fetchGenbank(term)).pipeTo( | |
new SQLiteStream('data/words.db') | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment