Last active
June 19, 2025 06:55
-
-
Save kojix2/523e45b7c60f06aa134725e7c2fc912f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "option_parser" | |
require "wait_group" | |
require "fastx" | |
require "fiber/execution_context" | |
# ────── CLI options ────── | |
input_file = "" | |
output_file = "" | |
k_size = 21 | |
worker_cnt = 4 | |
chunk_reads = 1_000 | |
verbose = false | |
OptionParser.parse do |p| | |
p.banner = "Usage: kmer_counter [options] -i FILE -k K" | |
p.on("-i", "--input FILE", "FASTQ (.gz) input") { |v| input_file = v } | |
p.on("-o", "--output FILE", "Write TSV here") { |v| output_file = v } | |
p.on("-k", "--kmer-size N", "k-mer size (default: 21)") { |v| kmer_size = v.to_i } | |
p.on("-t", "--threads N", "Worker threads (default: 4)") { |v| threads = v.to_i.clamp(1, 256) } | |
p.on("-c", "--chunk-size N", "Reads per task (default: 1000)") { |v| chunk_size = v.to_i.clamp(1, 10_000) } | |
p.on("-v", "--verbose", "Verbose output") { |v| verbose = v } | |
p.on("-h", "--help", "Show this help message") { puts p; exit } | |
end | |
abort "--input/-i is required" if input_file.empty? | |
out = output_file.empty? ? STDOUT : File.open(output_file, "w") | |
out.sync = true | |
# ────── types & helpers ────── | |
alias KmerCount = Hash(String, UInt32) | |
record ReadRow, | |
idx : Int64, | |
id : String, | |
counts : KmerCount | |
def count_kmers(seq : String, k : Int32) : KmerCount | |
h = KmerCount.new(0_u32) | |
max = seq.bytesize - k | |
return h if max < 0 | |
bytes = seq.upcase.bytes | |
(0..max).each do |i| | |
kmer = String.new Slice.new(bytes.to_unsafe + i, k) | |
h[kmer] += 1 | |
end | |
h | |
end | |
# ────── channels ────── | |
chunk_q = Channel({Int64, Array(String), Array(String)}).new(16) | |
result_q = Channel(ReadRow).new(16) | |
ctx = Fiber::ExecutionContext::MultiThreaded.new("kmer-pool", worker_cnt + 2) | |
wg = WaitGroup.new | |
# Reader ─────────────────────────────── | |
ctx.spawn(name: "reader") do | |
cursor = 0_i64 | |
ids, seqs = [] of String, [] of String | |
Fastx::Fastq::Reader.open(input_file) do |r| | |
r.each do |id, seq, _| | |
ids << id | |
seqs << seq.to_s | |
if ids.size >= chunk_reads | |
chunk_q.send({cursor, ids, seqs}) | |
cursor += ids.size | |
ids.clear; seqs.clear | |
end | |
end | |
chunk_q.send({cursor, ids, seqs}) unless ids.empty? | |
end | |
chunk_q.close | |
end | |
# Workers ────────────────────────────── | |
wg.add(worker_cnt) | |
worker_cnt.times do |wid| | |
ctx.spawn(name: "worker-#{wid}") do | |
begin | |
while chunk = chunk_q.receive? | |
start_idx, id_batch, seq_batch = chunk | |
id_batch.each_with_index do |rid, j| | |
res = ReadRow.new(start_idx + j, rid, count_kmers(seq_batch[j], k_size)) | |
result_q.send res | |
end | |
end | |
ensure | |
wg.done | |
end | |
end | |
end | |
# 終了検知用: 全ワーカー完了で result_q を閉じる | |
ctx.spawn(name: "closer") do | |
wg.wait | |
result_q.close | |
end | |
# Emitter ─────────────────────────────── | |
ctx.spawn(name: "emitter") do | |
pending = Hash(Int64, ReadRow).new | |
next_idx = 0_i64 | |
kmer_set = Set(String).new | |
while row = result_q.receive? | |
pending[row.idx] = row | |
kmer_set.concat row.counts.keys | |
end | |
header = kmer_set.to_a.sort | |
out.print "ID" | |
header.each { |k| out.print '\t', k } | |
out.puts | |
until pending.empty? | |
if row = pending.delete(next_idx) | |
out.print row.id | |
header.each { |k| out.print '\t', (row.counts[k]? || 0) } | |
out.puts | |
next_idx += 1 | |
else | |
# 理論上ここには来ないが保険 | |
sleep Time::Span.new(nanoseconds: 1000000) | |
end | |
end | |
end | |
out.close |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "option_parser" | |
require "fiber/execution_context" | |
require "wait_group" | |
# コマンドライン引数処理 | |
thread_count = 4 | |
OptionParser.parse do |parser| | |
parser.on("-t THREADS", "--threads=THREADS", "Number of threads") do |t| | |
thread_count = t.to_i | |
end | |
end | |
# 並列実行コンテキスト作成 | |
mt_context = Fiber::ExecutionContext::MultiThreaded.new("fastq-worker", thread_count) | |
# 事前にメモリ領域を確保 | |
CHUNK_SIZE = 10000 | |
total_chunks = file_chunks.size | |
results_ptr = Pointer(StaticArray(ResultType, CHUNK_SIZE)).malloc(total_chunks) | |
WaitGroup.wait do |wg| | |
file_chunks.each_with_index do |chunk_data, fiber_id| | |
wg.spawn do | |
# 各Fiberが一意のメモリ位置にアクセス | |
chunk_results = StaticArray(ResultType, CHUNK_SIZE).new { |i| | |
process_fastq_record(chunk_data[i]) | |
} | |
# Pointer + fiber_id でアクセス | |
(results_ptr + fiber_id).value = chunk_results | |
end | |
end | |
end | |
# 結果収集(順序保証) | |
final_results = Array(ResultType).new | |
total_chunks.times do |i| | |
chunk = (results_ptr + i).value | |
final_results.concat(chunk.to_a) | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment