Last active
January 4, 2021 14:14
-
-
Save jrevels/782acb6b25f71f14a8cee0a3dae85079 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using UUIDs, Dates, Arrow, Tables | |
##### | |
##### Signals | |
##### | |
struct Signal <: Tables.AbstractRow | |
recording_uuid::UUID | |
type::String | |
file_uri::String | |
file_metadata::Union{Missing,Nothing,Dict{String,String}} # `Missing` is needed here to match the generated Arrow.Table schema | |
channel_names::Vector{String} | |
start_nanosecond::Nanosecond | |
stop_nanosecond::Nanosecond | |
sample_unit::String | |
sample_resolution_in_unit::Float64 | |
sample_offset_in_unit::Float64 | |
sample_type::String | |
sample_rate::Float64 | |
end | |
Signal(row) = Signal(row.recording_uuid, row.type, row.file_uri, row.file_metadata, | |
row.channel_names, row.start_nanosecond, row.stop_nanosecond, | |
row.sample_unit, row.sample_resolution_in_unit, row.sample_offset_in_unit, | |
row.sample_type, row.sample_rate) | |
Base.propertynames(::Signal) = fieldnames(SIGNAL_FIELDS) | |
Base.getproperty(signal::Signal, nm::Symbol) = getproperty(getfield(signal, :_row), nm)::fieldtype(SIGNAL_FIELDS, nm) | |
Tables.getcolumn(signal::Signal, i::Int) = Tables.getcolumn(getfield(signal, :_row), i)::fieldtype(SIGNAL_FIELDS, i) | |
Tables.getcolumn(signal::Signal, nm::Symbol) = Tables.getcolumn(getfield(signal, :_row), nm)::fieldtype(SIGNAL_FIELDS, nm) | |
Tables.getcolumn(signal::Signal, ::Type{T}, i::Int, nm::Symbol) where {T} = Tables.getcolumn(getfield(signal, :_row), T, i, nm) | |
Tables.columnnames(signal::Signal) = Tables.columnnames(getfield(signal, :_row)) | |
Arrow.ArrowTypes.registertype!(Signal, Signal) | |
const SIGNALS_TABLE_SCHEMA = Tables.Schema{fieldnames(Signal),Tuple{fieldtypes(Signal)...}}() | |
struct SignalsTable{T} <: Tables.AbstractColumns | |
_table::T | |
function SignalsTable(_table::T) where {T} | |
schema = Tables.schema(_table) | |
schema === SIGNALS_TABLE_SCHEMA || throw(ArgumentError("_table does not have appropriate SignalsTable schema: $schema")) | |
return new{T}(_table) | |
end | |
end | |
Tables.istable(table::SignalsTable) = true | |
Tables.columnaccess(table::SignalsTable) = Tables.columnaccess(getfield(table, :_table)) | |
Tables.columns(table::SignalsTable) = Tables.columns(getfield(table, :_table)) | |
Tables.columnnames(table::SignalsTable) = Tables.columnnames(getfield(table, :_table)) | |
Tables.getcolumn(table::SignalsTable, x::Int) = Tables.getcolumn(getfield(table, :_table), x) | |
Tables.getcolumn(table::SignalsTable, x::Symbol) = Tables.getcolumn(getfield(table, :_table), x) | |
Tables.rowaccess(table::SignalsTable) = Tables.rowaccess(getfield(table, :_table)) | |
Tables.rows(table::SignalsTable) = (Signal(row) for row in Tables.rows(getfield(table, :_table))) | |
Tables.schema(table::SignalsTable) = Tables.schema(getfield(table, :_table)) | |
Tables.materializer(table::SignalsTable) = Tables.materializer(getfield(table, :_table)) | |
##### | |
##### Annotations | |
##### | |
struct Annotation{V} <: Tables.AbstractRow | |
recording_uuid::UUID | |
uuid::UUID | |
value::V | |
start_nanosecond::Nanosecond | |
stop_nanosecond::Nanosecond | |
end | |
Annotation(row) = Annotation(row.recording_uuid, row.uuid, row.value, row.start_nanosecond, row.stop_nanosecond) | |
Annotation{V}(row) where {V} = Annotation{V}(row.recording_uuid, row.uuid, row.value, row.start_nanosecond, row.stop_nanosecond) | |
Base.propertynames(::Annotation) = fieldnames(Annotation) | |
Base.getproperty(annotation::Annotation, name::Symbol) = getfield(annotation, name) | |
Arrow.ArrowTypes.registertype!(Annotation, Annotation) | |
function annotations_table_schema(::Type{V}) where {V} | |
return Tables.Schema{fieldnames(Annotation),Tuple{UUID,UUID,V,Nanosecond,Nanosecond}}() | |
end | |
struct AnnotationsTable{V,T} <: Tables.AbstractColumns | |
_table::T | |
function AnnotationsTable{V}(_table::T) where {V,T} | |
schema = Tables.schema(_table) | |
schema === annotations_table_schema(V) || throw(ArgumentError("_table does not have appropriate AnnotationsTable schema: $schema")) | |
return new{V,T}(_table) | |
end | |
function AnnotationsTable(_table) | |
schema = Tables.schema(_table) | |
length(schema.types) === 5 || throw(ArgumentError("_table does not have appropriate AnnotationsTable schema: $schema")) | |
return AnnotationsTable{schema.types[3]}(_table) | |
end | |
end | |
Tables.istable(table::AnnotationsTable) = true | |
Tables.columnaccess(table::AnnotationsTable) = Tables.columnaccess(getfield(table, :_table)) | |
Tables.columns(table::AnnotationsTable) = Tables.columns(getfield(table, :_table)) | |
Tables.columnnames(table::AnnotationsTable) = Tables.columnnames(getfield(table, :_table)) | |
Tables.getcolumn(table::AnnotationsTable, x::Int) = Tables.getcolumn(getfield(table, :_table), x) | |
Tables.getcolumn(table::AnnotationsTable, x::Symbol) = Tables.getcolumn(getfield(table, :_table), x) | |
Tables.rowaccess(table::AnnotationsTable) = Tables.rowaccess(getfield(table, :_table)) | |
Tables.rows(table::AnnotationsTable{V}) where {V} = (Annotation{V}(row) for row in Tables.rows(getfield(table, :_table))) | |
Tables.schema(table::AnnotationsTable) = Tables.schema(getfield(table, :_table)) | |
Tables.materializer(table::AnnotationsTable) = Tables.materializer(getfield(table, :_table)) | |
##### | |
##### Recordings | |
##### | |
function by_recording(signals::SignalsTable, annotations::AnnotationsTable{V}) where {V} | |
recordings = Dict{UUID,NamedTuple{(:signals, :annotations),Tuple{Dict{String,Signal},Dict{UUID,Annotation{V}}}}}() | |
for signal in Tables.rows(signals) | |
recording = get!(() -> (signals = Dict{String,Signal}(), annotations = Dict{UUID,Annotation{V}}()), | |
recordings, signal.recording_uuid) | |
recording.signals[signal.type] = signal | |
end | |
for annotation in Tables.rows(annotations) | |
recording = get(recordings, annotation.recording_uuid, nothing) | |
recording === nothing && continue | |
recording.annotations[annotation.uuid] = annotation | |
end | |
return recordings | |
end | |
function by_recording(annotations::AnnotationsTable{V}, signals::SignalsTable) where {V} | |
recordings = Dict{UUID,NamedTuple{(:annotations, :signals),Tuple{Dict{UUID,Annotation{V}},Dict{String,Signal}}}}() | |
for annotation in Tables.rows(annotations) | |
recording = get!(() -> (annotations = Dict{UUID,Annotation{V}}(), signals = Dict{String,Signal}()), | |
recordings, annotation.recording_uuid) | |
recording.annotations[annotation.uuid] = annotation | |
end | |
for signal in Tables.rows(signals) | |
recording = get(recordings, signal.recording_uuid, nothing) | |
recording === nothing && continue | |
recording.signals[signal.type] = signal | |
end | |
return recordings | |
end | |
function by_recording(signals::SignalsTable) | |
recordings = Dict{UUID,Dict{String,Signal}}() | |
for signal in Tables.rows(signals) | |
recording = get!(() -> Dict{String,Signal}(), recordings, signal.recording_uuid) | |
recording[signal.type] = signal | |
end | |
return recordings | |
end | |
function by_recording(annotations::AnnotationsTable{V}) where {V} | |
recordings = Dict{UUID,Dict{UUID,Annotation{V}}}() | |
for annotation in Tables.rows(annotations) | |
recording = get!(() -> Dict{UUID,Annotation{V}}(), recordings, annotation.recording_uuid) | |
recording[annotation.uuid] = annotation | |
end | |
return recordings | |
end | |
##### | |
##### experiments | |
##### | |
n = 10000 | |
signals = Signal[] | |
annotations = Annotation{NamedTuple{(:x, :y),Tuple{Int64,String}}}[] | |
for _ in 1:n | |
rec = uuid4() | |
push!(signals, Signal(rec, "eeg", "file://$(rec)/eeg.lpcm.zst", nothing, | |
["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"], | |
Nanosecond(0), Nanosecond(100000), | |
"microvolt", 1.0, 1.0, "float64", 256.0)) | |
push!(signals, Signal(rec, "ecg", "file://$(rec)/ecg.lpcm.zst", Dict("a" => "absidufbaid", "b" => "adjfhaudi"), | |
["a", "b", "c", "d"], | |
Nanosecond(0), Nanosecond(100000), | |
"microvolt", 0.134, 1.9875, "int32", 128.0)) | |
push!(annotations, Annotation(rec, uuid4(), (x = 1, y = "2"), Nanosecond(0), Nanosecond(10))) | |
push!(annotations, Annotation(rec, uuid4(), (x = 231, y = "asdkfjh"), Nanosecond(0), Nanosecond(10))) | |
end | |
write_table(tbl) = (io = IOBuffer(); Arrow.write(io, tbl); seekstart(io); Arrow.Table(io)) | |
sigs_arrow = write_table(signals) | |
anns_arrow = write_table(annotations) | |
sigs = SignalsTable(Tables.columntable(sigs_arrow)); | |
anns = AnnotationsTable(Tables.columntable(anns_arrow)); | |
@time by_recording(sigs, anns); # 0.064109 seconds (839.00 k allocations: 74.526 MiB) | |
@time by_recording(anns, sigs); # 0.073439 seconds (839.00 k allocations: 74.526 MiB) | |
@time by_recording(sigs); # 0.031596 seconds (639.51 k allocations: 53.616 MiB) | |
@time by_recording(anns); # 0.031726 seconds (199.51 k allocations: 21.267 MiB) | |
######################################################################################################################################################################### | |
######################################################################################################################################################################### | |
######################################################################################################################################################################### | |
#= | |
This alternative approach creates thin `AbstractRow`/`AbstractColumns` wrappers that intentionally try to take on the charateristics of their underlying objects | |
via delegation. Unlike the previous approach above, this version's `AbstractRow` subtypes delegate as much as the `AbstractColumns` subtypes. | |
One might ask: At this point, what is the benefit of adding this layer at all over just passing around `Arrow.Table` and/or `NamedTuple`-based tables? | |
Answers: | |
1. Some useful schema/type checking/hinting opportunities can be surfaced in this layer. | |
2. This laye helps avoid type piracy/ambiguities/etc. when specializing common methods for these tables (e.g. `show`, `getindex`, etc.). | |
3. The composition-based approach here gives you 1 and 2 in a manner that's nicely orthogonal to implementational details of underlying | |
table types. For example, the caller can choose to work with `Signals(table::Arrow.Table)` or a fully-materialized | |
`Signals(map(collect, Tables.columntable(table::Arrow.Table))`, or whatever fits their use-case. | |
=# | |
using UUIDs, Dates, Arrow, Tables | |
##### | |
##### Signals | |
##### | |
struct Signal{R} <: Tables.AbstractRow | |
_row::R | |
end | |
const SIGNAL_FIELDS = NamedTuple{(:recording_uuid, :type, :file_uri, :file_metadata, :channel_names, :start_nanosecond, :stop_nanosecond, :sample_unit, :sample_resolution_in_unit, :sample_offset_in_unit, :sample_type, :sample_rate), | |
Tuple{UUID,String,String,Union{Missing,Nothing,Dict{String,String}},Vector{String},Nanosecond,Nanosecond,String,Float64,Float64,String,Float64}} | |
Base.propertynames(::Signal) = fieldnames(SIGNAL_FIELDS) | |
Base.getproperty(signal::Signal, name::Symbol) = getproperty(getfield(signal, :_row), name)::fieldtype(SIGNAL_FIELDS, name) | |
is_valid_signals_schema(::Nothing) = true | |
is_valid_signals_schema(::Tables.Schema) = false | |
is_valid_signals_schema(::Tables.Schema{fieldnames(SIGNAL_FIELDS),<:Tuple{fieldtypes(SIGNAL_FIELDS)...}}) = true | |
struct Signals{C} <: Tables.AbstractColumns | |
_columns::C | |
function Signals(_columns::C) where {C} | |
schema = Tables.schema(_columns) | |
is_valid_signals_schema(schema) || throw(ArgumentError("_table does not have appropriate Signals schema: $schema")) | |
return new{C}(_columns) | |
end | |
end | |
Tables.istable(signals::Signals) = Tables.istable(getfield(signals, :_columns)) | |
Tables.schema(signals::Signals) = Tables.schema(getfield(signals, :_columns)) | |
Tables.materializer(signals::Signals) = Tables.materializer(getfield(signals, :_columns)) | |
Tables.rowaccess(signals::Signals) = Tables.rowaccess(getfield(signals, :_columns)) | |
Tables.rows(signals::Signals) = (Signal(row) for row in Tables.rows(getfield(signals, :_columns))) | |
Tables.columnaccess(signals::Signals) = Tables.columnaccess(getfield(signals, :_columns)) | |
Tables.columns(signals::Signals) = Tables.columns(getfield(signals, :_columns)) | |
Tables.columnnames(signals::Signals) = Tables.columnnames(getfield(signals, :_columns)) | |
Tables.getcolumn(signals::Signals, i::Int) = Tables.getcolumn(getfield(signals, :_columns), i) | |
Tables.getcolumn(signals::Signals, nm::Symbol) = Tables.getcolumn(getfield(signals, :_columns), nm) | |
Tables.getcolumn(signals::Signals, ::Type{T}, i::Int, nm::Symbol) where {T} = Tables.getcolumn(getfield(signals, :_columns), T, i, nm) | |
##### | |
##### Annotations | |
##### | |
struct Annotation{V,R} <: Tables.AbstractRow | |
_row::R | |
end | |
Annotation(_row::R) where {R} = Annotation{fieldtype(R, :value),R}(_row) | |
Annotation{V}(_row::R) where {V,R} = Annotation{V,R}(_row) | |
function _annotation_fields(::Type{<:Annotation{V}}) where {V} | |
return NamedTuple{(:recording_uuid, :uuid, :value, :start_nanosecond, :stop_nanosecond),Tuple{UUID,UUID,V,Nanosecond,Nanosecond}} | |
end | |
Base.propertynames(::Annotation) = fieldnames(_annotation_fields(Annotation{Any})) | |
Base.getproperty(annotation::Annotation, name::Symbol) = getproperty(getfield(annotation, :_row), name)::fieldtype(_annotation_fields(typeof(annotation)), name) | |
is_valid_annotations_schema(::Nothing) = true | |
is_valid_annotations_schema(::Tables.Schema) = false | |
is_valid_annotations_schema(::Tables.Schema{fieldnames(_annotation_fields(Annotation{Any})),<:Tuple{fieldtypes(_annotation_fields(Annotation{Any}))...}}) = true | |
struct Annotations{V,C} <: Tables.AbstractColumns | |
_columns::C | |
function Annotations(_columns::C) where {C} | |
schema = Tables.schema(_columns) | |
is_valid_annotations_schema(schema) || throw(ArgumentError("_table does not have appropriate Annotations schema: $schema")) | |
return new{schema.types[3],C}(_columns) | |
end | |
end | |
Tables.istable(annotations::Annotations) = Tables.istable(getfield(annotations, :_columns)) | |
Tables.schema(annotations::Annotations) = Tables.schema(getfield(annotations, :_columns)) | |
Tables.materializer(annotations::Annotations) = Tables.materializer(getfield(annotations, :_columns)) | |
Tables.rowaccess(annotations::Annotations) = Tables.rowaccess(getfield(annotations, :_columns)) | |
Tables.rows(annotations::Annotations{V}) where {V} = (Annotation{V}(row) for row in Tables.rows(getfield(annotations, :_columns))) | |
Tables.columnaccess(annotations::Annotations) = Tables.columnaccess(getfield(annotations, :_columns)) | |
Tables.columns(annotations::Annotations) = Tables.columns(getfield(annotations, :_columns)) | |
Tables.columnnames(annotations::Annotations) = Tables.columnnames(getfield(annotations, :_columns)) | |
Tables.getcolumn(annotations::Annotations, i::Int) = Tables.getcolumn(getfield(annotations, :_columns), i) | |
Tables.getcolumn(annotations::Annotations, nm::Symbol) = Tables.getcolumn(getfield(annotations, :_columns), nm) | |
Tables.getcolumn(annotations::Annotations, ::Type{T}, i::Int, nm::Symbol) where {T} = Tables.getcolumn(getfield(annotations, :_columns), T, i, nm) | |
##### | |
##### by_recording | |
##### | |
function by_recording(signals::Signals, annotations::Annotations{V}) where {V} | |
recordings = Dict{UUID,NamedTuple{(:signals, :annotations),Tuple{Dict{String,Signal},Dict{UUID,Annotation{V}}}}}() | |
for signal in Tables.rows(signals) | |
recording = get!(() -> (signals = Dict{String,Signal}(), annotations = Dict{UUID,Annotation{V}}()), | |
recordings, signal.recording_uuid) | |
recording.signals[signal.type] = signal | |
end | |
for annotation in Tables.rows(annotations) | |
recording = get(recordings, annotation.recording_uuid, nothing) | |
recording === nothing && continue | |
recording.annotations[annotation.uuid] = annotation | |
end | |
return recordings | |
end | |
function by_recording(annotations::Annotations{V}, signals::Signals) where {V} | |
recordings = Dict{UUID,NamedTuple{(:annotations, :signals),Tuple{Dict{UUID,Annotation{V}},Dict{String,Signal}}}}() | |
for annotation in Tables.rows(annotations) | |
recording = get!(() -> (annotations = Dict{UUID,Annotation{V}}(), signals = Dict{String,Signal}()), | |
recordings, annotation.recording_uuid) | |
recording.annotations[annotation.uuid] = annotation | |
end | |
for signal in Tables.rows(signals) | |
recording = get(recordings, signal.recording_uuid, nothing) | |
recording === nothing && continue | |
recording.signals[signal.type] = signal | |
end | |
return recordings | |
end | |
function by_recording(signals::Signals) | |
recordings = Dict{UUID,Dict{String,Signal}}() | |
for signal in Tables.rows(signals) | |
recording = get!(() -> Dict{String,Signal}(), recordings, signal.recording_uuid) | |
recording[signal.type] = signal | |
end | |
return recordings | |
end | |
function by_recording(annotations::Annotations{V}) where {V} | |
recordings = Dict{UUID,Dict{UUID,Annotation{V}}}() | |
for annotation in Tables.rows(annotations) | |
recording = get!(() -> Dict{UUID,Annotation{V}}(), recordings, annotation.recording_uuid) | |
recording[annotation.uuid] = annotation | |
end | |
return recordings | |
end | |
##### | |
##### experiments | |
##### | |
n = 10000 | |
signals = Signal[] | |
annotations = Annotation{NamedTuple{(:x, :y),Tuple{Int64,String}}}[] | |
for _ in 1:n | |
rec = uuid4() | |
push!(signals, Signal((recording_uuid=rec, type="eeg", file_uri="file://$(rec)/eeg.lpcm.zst", file_metadata=nothing, | |
channel_names=["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"], | |
start_nanosecond=Nanosecond(0), stop_nanosecond=Nanosecond(100000), | |
sample_unit="microvolt", sample_resolution_in_unit=1.0, sample_offset_in_unit=1.0, | |
sample_type="float64", sample_rate=256.0))) | |
push!(signals, Signal((recording_uuid=rec, type="ecg", file_uri="file://$(rec)/ecg.lpcm.zst", file_metadata=Dict("a" => "absidufbaid", "b" => "adjfhaudi"), | |
channel_names=["a", "b", "c", "d"], | |
start_nanosecond=Nanosecond(0), stop_nanosecond=Nanosecond(100000), | |
sample_unit="microvolt", sample_resolution_in_unit=0.134, sample_offset_in_unit=1.9875, | |
sample_type="int32", sample_rate=128.0))) | |
push!(annotations, Annotation((recording_uuid=rec, uuid=uuid4(), value=(x = 1, y = "2"), start_nanosecond=Nanosecond(0), stop_nanosecond=Nanosecond(10)))) | |
push!(annotations, Annotation((recording_uuid=rec, uuid=uuid4(), value=(x = 231, y = "asdkfjh"), start_nanosecond=Nanosecond(0), stop_nanosecond=Nanosecond(10)))) | |
end | |
write_table(tbl) = (io = IOBuffer(); Arrow.write(io, tbl); seekstart(io); Arrow.Table(io)) | |
sigs_arrow = write_table(signals) | |
anns_arrow = write_table(annotations) | |
sigs = Signals(Tables.columntable(sigs_arrow)) | |
anns = Annotations(Tables.columntable(anns_arrow)) | |
@time by_recording(sigs, anns); # 0.010518 seconds (140.02 k allocations: 15.338 MiB) | |
@time by_recording(anns, sigs); # 0.007881 seconds (140.02 k allocations: 15.338 MiB) | |
@time by_recording(sigs); # 0.004087 seconds (80.02 k allocations: 7.542 MiB) | |
@time by_recording(anns); # 0.003655 seconds (60.02 k allocations: 8.152 MiB) | |
########################################################################################################################################################################## | |
########################################################################################################################################################################## | |
########################################################################################################################################################################## | |
using UUIDs, Dates, Arrow, Tables | |
##### | |
##### Signals | |
##### | |
struct Signal{R} <: Tables.AbstractRow | |
_row::R | |
end | |
const SIGNAL_FIELDS = NamedTuple{(:recording_uuid, :type, :file_path, :file_metadata, :channel_names, :start_nanosecond, :stop_nanosecond, :sample_unit, :sample_resolution_in_unit, :sample_offset_in_unit, :sample_type, :sample_rate), | |
Tuple{UUID,String,String,Union{Missing,Nothing,Dict{String,String}},Vector{String},Nanosecond,Nanosecond,String,Float64,Float64,String,Float64}} | |
function Signal(; recording_uuid::UUID, | |
type, | |
file_path, | |
file_metadata::Union{Missing,Nothing,Dict{String,String}}=nothing, | |
channel_names, | |
start_nanosecond, | |
stop_nanosecond, | |
sample_unit, | |
sample_resolution_in_unit, | |
sample_offset_in_unit, | |
sample_type, | |
sample_rate) | |
return Signal{SIGNAL_FIELDS}((; recording_uuid, | |
type=String(type), | |
file_path=String(file_path), | |
file_metadata, | |
channel_names=convert(Vector{String}, channel_names), | |
start_nanosecond=Nanosecond(start_nanosecond), | |
stop_nanosecond=Nanosecond(stop_nanosecond), | |
sample_unit=String(sample_unit), | |
sample_resolution_in_unit=Float64(sample_resolution_in_unit), | |
sample_offset_in_unit=Float64(sample_offset_in_unit), | |
sample_type=String(sample_type), | |
sample_rate=Float64(sample_rate))) | |
end | |
Base.propertynames(::Signal) = fieldnames(SIGNAL_FIELDS) | |
Base.getproperty(signal::Signal, name::Symbol) = getproperty(getfield(signal, :_row), name)::fieldtype(SIGNAL_FIELDS, name) | |
Tables.columnnames(::Signal) = fieldnames(SIGNAL_FIELDS) | |
Tables.getcolumn(signal::Signal, i::Int) = Tables.getcolumn(getfield(signal, :_row), i)::fieldtype(SIGNAL_FIELDS, i) | |
Tables.getcolumn(signal::Signal, nm::Symbol) = Tables.getcolumn(getfield(signal, :_row), nm)::fieldtype(SIGNAL_FIELDS, nm) | |
Tables.getcolumn(signal::Signal, ::Type{T}, i::Int, nm::Symbol) where {T} = Tables.getcolumn(getfield(signal, :_row), T, i, nm)::fieldtype(SIGNAL_FIELDS, i) | |
Tables.schema(::AbstractVector{<:Signal}) = Tables.Schema(fieldnames(SIGNAL_FIELDS), fieldtypes(SIGNAL_FIELDS)) | |
is_valid_signals_schema(::Nothing) = true | |
is_valid_signals_schema(::Tables.Schema) = false | |
is_valid_signals_schema(::Tables.Schema{fieldnames(SIGNAL_FIELDS),<:Tuple{fieldtypes(SIGNAL_FIELDS)...}}) = true | |
struct Signals{C} <: Tables.AbstractColumns | |
_columns::C | |
function Signals(_columns::C) where {C} | |
schema = Tables.schema(_columns) | |
is_valid_signals_schema(schema) || throw(ArgumentError("_table does not have appropriate Signals schema: $schema")) | |
return new{C}(_columns) | |
end | |
end | |
Signals() = Signals(Tables.columntable(SIGNAL_FIELDS[])) | |
Tables.istable(signals::Signals) = Tables.istable(getfield(signals, :_columns)) | |
Tables.schema(signals::Signals) = Tables.schema(getfield(signals, :_columns)) | |
Tables.materializer(signals::Signals) = Tables.materializer(getfield(signals, :_columns)) | |
Tables.rowaccess(signals::Signals) = Tables.rowaccess(getfield(signals, :_columns)) | |
Tables.rows(signals::Signals) = (Signal(row) for row in Tables.rows(getfield(signals, :_columns))) | |
Tables.columnaccess(signals::Signals) = Tables.columnaccess(getfield(signals, :_columns)) | |
Tables.columns(signals::Signals) = Tables.columns(getfield(signals, :_columns)) | |
Tables.columnnames(signals::Signals) = Tables.columnnames(getfield(signals, :_columns)) | |
Tables.getcolumn(signals::Signals, i::Int) = Tables.getcolumn(getfield(signals, :_columns), i) | |
Tables.getcolumn(signals::Signals, nm::Symbol) = Tables.getcolumn(getfield(signals, :_columns), nm) | |
Tables.getcolumn(signals::Signals, ::Type{T}, i::Int, nm::Symbol) where {T} = Tables.getcolumn(getfield(signals, :_columns), T, i, nm) | |
##### | |
##### Annotations | |
##### | |
struct Annotation{V,R} <: Tables.AbstractRow | |
_row::R | |
end | |
_annotation_fields(::Type{V}) where {V} = NamedTuple{(:recording_uuid, :uuid, :start_nanosecond, :stop_nanosecond, :value),Tuple{UUID,UUID,Nanosecond,Nanosecond,V}} | |
Annotation(_row::R) where {R} = Annotation{fieldtype(R, :value),R}(_row) | |
Annotation{V}(_row::R) where {V,R} = Annotation{V,R}(_row) | |
function Annotation{V}(; recording_uuid::UUID, uuid::UUID, start_nanosecond, stop_nanosecond, value) where {V} | |
return Annotation{V,_annotation_fields(V)}((; recording_uuid, uuid, | |
start_nanosecond=Nanosecond(start_nanosecond), | |
stop_nanosecond=Nanosecond(stop_nanosecond), | |
value=convert(V, value))) | |
end | |
function Annotation(; recording_uuid, uuid, start_nanosecond, stop_nanosecond, value::V) where {V} | |
return Annotation{V}(; recording_uuid, uuid, start_nanosecond, stop_nanosecond, value) | |
end | |
Base.propertynames(::Annotation) = fieldnames(_annotation_fields(Any)) | |
Base.getproperty(annotation::Annotation{V}, name::Symbol) where {V} = getproperty(getfield(annotation, :_row), name)::fieldtype(_annotation_fields(V), name) | |
Tables.columnnames(::Annotation) = fieldnames(_annotation_fields(Any)) | |
Tables.getcolumn(ann::Annotation{V}, i::Int) where {V} = Tables.getcolumn(getfield(ann, :_row), i)::fieldtype(_annotation_fields(V), i) | |
Tables.getcolumn(ann::Annotation{V}, nm::Symbol) where {V} = Tables.getcolumn(getfield(ann, :_row), nm)::fieldtype(_annotation_fields(V), nm) | |
Tables.getcolumn(ann::Annotation{V}, ::Type{T}, i::Int, nm::Symbol) where {V,T} = Tables.getcolumn(getfield(ann, :_row), T, i, nm)::fieldtype(_annotation_fields(V), i) | |
function Tables.schema(::AbstractVector{<:Annotation{V}}) where {V} | |
F = _annotation_fields(V) | |
return Tables.Schema(fieldnames(F), fieldtypes(F)) | |
end | |
is_valid_annotations_schema(::Nothing) = true | |
is_valid_annotations_schema(::Tables.Schema) = false | |
is_valid_annotations_schema(::Tables.Schema{fieldnames(_annotation_fields(Any)),<:Tuple{fieldtypes(_annotation_fields(Any))...}}) = true | |
struct Annotations{V,C} <: Tables.AbstractColumns | |
_columns::C | |
function Annotations(_columns::C) where {C} | |
schema = Tables.schema(_columns) | |
is_valid_annotations_schema(schema) || throw(ArgumentError("_table does not have appropriate Annotations schema: $schema")) | |
V = schema === nothing ? Any : schema.types[end] | |
return new{V,C}(_columns) | |
end | |
end | |
Annotations{V}() where {V} = Annotations(Tables.columntable(_annotation_fields(V)[])) | |
Tables.istable(annotations::Annotations) = Tables.istable(getfield(annotations, :_columns)) | |
Tables.schema(annotations::Annotations) = Tables.schema(getfield(annotations, :_columns)) | |
Tables.materializer(annotations::Annotations) = Tables.materializer(getfield(annotations, :_columns)) | |
Tables.rowaccess(annotations::Annotations) = Tables.rowaccess(getfield(annotations, :_columns)) | |
Tables.rows(annotations::Annotations{V}) where {V} = (Annotation{V}(row) for row in Tables.rows(getfield(annotations, :_columns))) | |
Tables.columnaccess(annotations::Annotations) = Tables.columnaccess(getfield(annotations, :_columns)) | |
Tables.columns(annotations::Annotations) = Tables.columns(getfield(annotations, :_columns)) | |
Tables.columnnames(annotations::Annotations) = Tables.columnnames(getfield(annotations, :_columns)) | |
Tables.getcolumn(annotations::Annotations, i::Int) = Tables.getcolumn(getfield(annotations, :_columns), i) | |
Tables.getcolumn(annotations::Annotations, nm::Symbol) = Tables.getcolumn(getfield(annotations, :_columns), nm) | |
Tables.getcolumn(annotations::Annotations, ::Type{T}, i::Int, nm::Symbol) where {T} = Tables.getcolumn(getfield(annotations, :_columns), T, i, nm) | |
##### | |
##### by_recording | |
##### | |
function by_recording(signals::Signals, annotations::Annotations{V}) where {V} | |
recordings = Dict{UUID,NamedTuple{(:signals, :annotations),Tuple{Dict{String,Signal},Dict{UUID,Annotation{V}}}}}() | |
for signal in Tables.rows(signals) | |
recording = get!(() -> (signals = Dict{String,Signal}(), annotations = Dict{UUID,Annotation{V}}()), | |
recordings, signal.recording_uuid) | |
recording.signals[signal.type] = signal | |
end | |
for annotation in Tables.rows(annotations) | |
recording = get(recordings, annotation.recording_uuid, nothing) | |
recording === nothing && continue | |
recording.annotations[annotation.uuid] = annotation | |
end | |
return recordings | |
end | |
function by_recording(annotations::Annotations{V}, signals::Signals) where {V} | |
recordings = Dict{UUID,NamedTuple{(:annotations, :signals),Tuple{Dict{UUID,Annotation{V}},Dict{String,Signal}}}}() | |
for annotation in Tables.rows(annotations) | |
recording = get!(() -> (annotations = Dict{UUID,Annotation{V}}(), signals = Dict{String,Signal}()), | |
recordings, annotation.recording_uuid) | |
recording.annotations[annotation.uuid] = annotation | |
end | |
for signal in Tables.rows(signals) | |
recording = get(recordings, signal.recording_uuid, nothing) | |
recording === nothing && continue | |
recording.signals[signal.type] = signal | |
end | |
return recordings | |
end | |
function by_recording(signals::Signals) | |
recordings = Dict{UUID,Dict{String,Signal}}() | |
for signal in Tables.rows(signals) | |
recording = get!(() -> Dict{String,Signal}(), recordings, signal.recording_uuid) | |
recording[signal.type] = signal | |
end | |
return recordings | |
end | |
function by_recording(annotations::Annotations{V}) where {V} | |
recordings = Dict{UUID,Dict{UUID,Annotation{V}}}() | |
for annotation in Tables.rows(annotations) | |
recording = get!(() -> Dict{UUID,Annotation{V}}(), recordings, annotation.recording_uuid) | |
recording[annotation.uuid] = annotation | |
end | |
return recordings | |
end | |
##### | |
##### conversion | |
##### | |
using MsgPack | |
using Onda: Onda | |
function convert_old_dataset(dataset_path, uuid_from_annotation = _ -> uuid4()) | |
raw_header, raw_recordings = MsgPack.unpack(Onda.zstd_decompress(read(joinpath(dataset_path, "recordings.msgpack.zst")))) | |
v"0.3" <= VersionNumber(raw_header["onda_format_version"]) < v"0.5" || error("unexpected dataset version: $(raw_header["onda_format_version"])") | |
signals = Signal[] | |
annotations = Annotation{String}[] | |
for (uuid, recording) in raw_recordings | |
recording_uuid = UUID(uuid) | |
for (type, signal) in recording["signals"] | |
push!(signals, Signal(; recording_uuid, type, | |
file_path=Onda.samples_path(dataset_path, recording_uuid, type, signal["file_extension"]), | |
file_metadata=signal["file_options"], | |
channel_names=signal["channel_names"], | |
start_nanosecond=signal["start_nanosecond"], | |
stop_nanosecond=signal["stop_nanosecond"], | |
sample_unit=signal["sample_unit"], | |
sample_resolution_in_unit=signal["sample_resolution_in_unit"], | |
sample_offset_in_unit=signal["sample_offset_in_unit"], | |
sample_type=signal["sample_type"], | |
sample_rate=signal["sample_rate"])) | |
end | |
for ann in recording["annotations"] | |
ann_uuid = uuid_from_annotation(ann) | |
push!(annotations, Annotation(; recording_uuid, uuid=ann_uuid, | |
start_nanosecond=ann["start_nanosecond"], | |
stop_nanosecond=ann["stop_nanosecond"], | |
value=ann["value"])) | |
end | |
end | |
return Signals(Tables.columntable(signals)), Annotations(Tables.columntable(annotations)) | |
end | |
I might be missing something obvious, but the methods here https://gist.github.com/jrevels/782acb6b25f71f14a8cee0a3dae85079#file-onda-arrow-flat-multi-table-approach-jl-L27-L32 seem to be referring to the
Signal
struct with a single_row
field defined later in the file, not theSignal
struct with a bunch of fields defined right above, right?
Yeah.
This is just a dumping ground/playground for some ideas (mainly for myself to ensure that certain things were reasonable performance-wise)
In general I wouldn't even spend too much time looking at this at this point - I'd much rather get eyes on beacon-biosignals/Onda.jl#59
Ok, sounds good
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I might be missing something obvious, but the methods here https://gist.github.com/jrevels/782acb6b25f71f14a8cee0a3dae85079#file-onda-arrow-flat-multi-table-approach-jl-L27-L32 seem to be referring to the
Signal
struct with a single_row
field defined later in the file, not theSignal
struct with a bunch of fields defined right above, right?