Last active
December 4, 2020 01:14
-
-
Save peteroupc/6c3276a988eae7429a39 to your computer and use it in GitHub Desktop.
Reads metadata from an Ogg file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/ruby | |
# Written by Peter O. | |
# Any copyright to this work is released to the Public Domain. | |
# http://creativecommons.org/publicdomain/zero/1.0/ | |
# | |
# If you like this, you should donate to Peter O. | |
# at: http://peteroupc.github.io/ | |
# | |
# Usage: | |
# # takes an IO or IO-like object as a parameter, | |
# # assuming the object is at the beginning of the stream | |
# OggMetadata.getMetadata(io) | |
# # metadata is value returned from getMetadata | |
# OggMetadata.writeMetadata(metadata,inputFilename) | |
# OggMetadata.writeMetadata(metadata,inputFilename,outputFilename) | |
# OggMetadata.getAudioInfo(filename) | |
# | |
module OggMetadata | |
private | |
def self.crc32table() | |
c=0x80000000 | |
ret=[0] | |
i=1; while i<256 | |
c=(c<<1)^(((c & 0x80000000)==0) ? 0 : 0x04c11db7) | |
c&=0xFFFFFFFF | |
for j in 0...i | |
ret[i+j]=c^ret[j] | |
end | |
i<<=1 | |
end | |
return ret | |
end | |
@@crc32table=self.crc32table() | |
def self.crc32(bytes) | |
ret=0 | |
bytes.each_byte{|b| | |
ret=@@crc32table[b^(ret>>24)]^(ret<<8) | |
ret&=0xFFFFFFFF | |
} | |
return ret | |
end | |
def self.rechecksum(page) | |
if page[0x16,4].unpack("V")[0]!=0 | |
# Set checksum to 0 if needed | |
page=page[0,0x16]+([0].pack("V"))+page[0x1A,page.length] | |
end | |
checksum=OggMetadata.pageChecksum(page) | |
# Set checksum to calculated value | |
page=page[0,0x16]+([checksum].pack("V"))+page[0x1A,page.length] | |
return page | |
end | |
def self.resequencePage(page,seq) | |
pageseq=page[0x12,4].unpack("V")[0] | |
if pageseq==seq | |
return page | |
end | |
page=page[0,0x12]+([seq].pack("V"))+page[0x16,page.length] | |
return rechecksum(page) | |
end | |
def self.removeFirstPacket(page,seqno,granularLow,granularHigh) | |
pagebytes=page.bytes | |
pagesec=pagebytes[0x1a] | |
sectionCount=0 | |
packetSize=0 | |
for i in 0...pagesec | |
packetSize+=pagebytes[0x1b+i] | |
sectionCount+=1 | |
break if pagebytes[0x1b+i]!=0xFF | |
end | |
newSectionCount=pagesec-sectionCount | |
pageAfterPacket=0x1b+pagesec+packetSize | |
page=page[0,6]+ | |
([granularLow,granularHigh].pack("VV"))+ | |
page[0x0E,4]+ | |
([seqno,0,newSectionCount].pack("VVC"))+ | |
page[0x1b+sectionCount,newSectionCount]+ | |
page[pageAfterPacket,page.length] | |
return rechecksum(page) | |
end | |
def self.encodePacket(page,packet,seqno,fw) | |
i=0 | |
flags=(page[0][0]&0x02) | |
needsContinuation=false | |
pageCount=0 | |
while i<packet.length || needsContinuation | |
subsize=[65025,packet.length-i].min | |
if i+subsize==packet.length && (page[0][0]&0x04)!=0 | |
# Last page | |
flags|=0x04 | |
end | |
pagedata="OggS"+[ | |
0,flags,0,0,page[0][3],seqno,0].pack("CCVVVVV") | |
seqno+=1 | |
if subsize==0 | |
# Lacing of 0 | |
pagedata+=[1,0].pack("CC") | |
needsContinuation=false | |
else | |
numSegments=(subsize+254)/255 | |
segments=[numSegments] | |
tmpsize=subsize | |
for k in 0...numSegments | |
segsize=[255,tmpsize].min | |
segments.push(segsize) | |
tmpsize-=segsize | |
end | |
if subsize%255==0 && numSegments<255 | |
# Payload size is divisible by 255, add a lacing segment | |
# if possible | |
segments[0]+=1 | |
segments.push(0) | |
end | |
needsContinuation=(segments[segments.length-1]==255) | |
pagedata+=segments.pack("C*") | |
end | |
if subsize>0 | |
pagedata+=packet[i,subsize] | |
end | |
pagedata=rechecksum(pagedata) | |
fw.write(pagedata) | |
pageCount+=1 | |
# Next page is a continuation of the packet | |
flags|=0x01 | |
i+=subsize | |
end | |
return pageCount | |
end | |
def self.getFreeFile(dest) | |
base=dest | |
fn=base;i=0 | |
while FileTest.exist?(fn) | |
fn=base+i.to_s | |
i+=1 | |
end | |
return fn | |
end | |
def self.readPage(f) | |
return nil if f.read(4)!="OggS" | |
f.pos+=1 | |
data=f.read(22).unpack("CVVVVVC") | |
segments=f.read(data[6]).unpack("C*") | |
packets=[] | |
totalSize=0 | |
lastPacket=0 | |
packetOffset=0 | |
i=0;while i<segments.length | |
totalSize+=segments[i] | |
if segments[i]==0xFF | |
lastPacket+=0xFF | |
else | |
lastPacket+=segments[i] | |
packets.push([packetOffset,lastPacket]) | |
packetOffset+=lastPacket | |
lastPacket=0 | |
end | |
i+=1 | |
end | |
return [data,packets,totalSize] | |
end | |
def self.streamSerial(x) | |
return x[0][3] | |
end | |
def self.isContinuation(x) | |
return (x[0][0]&1)!=0 | |
end | |
def self.isLastPage(x) | |
return (x[0][0]&4)!=0 | |
end | |
def self.pageChecksum(x) | |
modpage=x[0,0x16]+([0].pack("V"))+x[0x1A,x.length] | |
return OggMetadata.crc32(modpage) | |
end | |
def self.replaceCommentPacket(f,fw,commentPacket) | |
packetIndex=-1 | |
desiredPacket=1 | |
serial=nil | |
seqno=0 | |
while true | |
pageStart=f.pos | |
page=OggMetadata.readPage(f) | |
firstPacket=packetIndex+1 | |
isContinuation=OggMetadata.isContinuation(page) | |
if isContinuation | |
raise "continuation in first page" if packetIndex<0 | |
#p "Packet continued" | |
firstPacket-=1 | |
end | |
if serial!=nil && serial!=OggMetadata.streamSerial(page) | |
raise "multiple logical bitstreams not supported" | |
end | |
serial=OggMetadata.streamSerial(page) | |
endPagePos=f.pos | |
if desiredPacket>=firstPacket && | |
desiredPacket<(firstPacket+page[1].length) | |
packet=page[1][desiredPacket-firstPacket] | |
f.pos=endPagePos+packet[0] | |
if !isContinuation && page[1].length==1 | |
# Single-packet page, not a continuation | |
pagesWritten=OggMetadata.encodePacket( | |
page,commentPacket,seqno,fw) | |
seqno+=pagesWritten | |
elsif firstPacket==desiredPacket && page[1].length>1 | |
# First packet is the desired packet | |
# Remove the first packet | |
pageSize=(f.pos+page[2])-pageStart | |
f.pos=pageStart | |
pagedata=f.read(pageSize) | |
# Write the comment packet (if not a continuation) | |
if !isContinuation | |
pagesWritten=OggMetadata.encodePacket( | |
page,commentPacket,seqno,fw) | |
seqno+=pagesWritten | |
end | |
# Write the remaining packets | |
pagedata=removeFirstPacket(pagedata,seqno,page[0][1],page[0][2]) | |
seqno+=1 | |
fw.write(pagedata) | |
elsif isContinuation && firstPacket==desiredPacket | |
# Single-packet continuation. This packet was already | |
# written, so no need to write this page | |
else | |
raise "Not supported: comment packet not the first in a page" | |
end | |
else | |
# Packet not represented in this page | |
pageSize=(f.pos+page[2])-pageStart | |
f.pos=pageStart | |
pagedata=f.read(pageSize) | |
pagedata=resequencePage(pagedata,seqno) | |
seqno+=1 | |
fw.write(pagedata) | |
end | |
packetIndex+=page[1].length | |
if isContinuation | |
packetIndex-=1 | |
end | |
f.pos=endPagePos+page[2] | |
break if OggMetadata.isLastPage(page) | |
end | |
end | |
def self.encodeUtf8(str) | |
begin | |
return str.encode("UTF-8") | |
rescue | |
# Fall back by replacing extended bytes with question marks | |
ret="" | |
str.each_byte{|b| | |
ret+=(b>=0x80) ? "?" : b.chr | |
} | |
return ret | |
end | |
end | |
def self.makeCommentPacket(hash) | |
header=3.chr+"vorbis"+([11].pack("V"))+"OggMetadata" | |
comments=[] | |
for key in hash.keys | |
value=hash[key] | |
if value.is_a?(Array) | |
for item in value | |
# Note: Falsy values such as nil are replaced with | |
# an empty string | |
val=encodeUtf8((item||"").to_s) | |
keyvalue=key.downcase+"="+val | |
keyvalue=keyvalue.gsub(/[\r\n]/,"") | |
comments.push([keyvalue.length].pack("V")) | |
comments.push(keyvalue) | |
end | |
else | |
val=encodeUtf8((value||"").to_s) | |
keyvalue=key.downcase+"="+val | |
keyvalue=keyvalue.gsub(/[\r\n]/,"") | |
comments.push([keyvalue.length].pack("V")) | |
comments.push(keyvalue) | |
end | |
end | |
return header+([comments.length/2].pack("V"))+comments.join("")+1.chr | |
end | |
def self.getCommentPacket(f) | |
packetIndex=-1 | |
# Extract the "comment packet", always the second packet in an Ogg bitstream | |
commentPacket="" | |
desiredPacket=1 | |
serial=nil | |
while true | |
pageStart=f.pos | |
page=OggMetadata.readPage(f) | |
firstPacket=packetIndex+1 | |
isContinuation=OggMetadata.isContinuation(page) | |
if isContinuation | |
raise "continuation in first page" if packetIndex<0 | |
#p "Packet continued" | |
firstPacket-=1 | |
end | |
if serial!=nil && serial!=OggMetadata.streamSerial(page) | |
raise "multiple logical bitstreams not supported" | |
end | |
serial=OggMetadata.streamSerial(page) | |
endPagePos=f.pos | |
if desiredPacket>=firstPacket && | |
desiredPacket<(firstPacket+page[1].length) | |
packet=page[1][desiredPacket-firstPacket] | |
f.pos=endPagePos+packet[0] | |
commentPacket+=f.read(packet[1]) | |
end | |
packetIndex+=page[1].length | |
if isContinuation | |
packetIndex-=1 | |
end | |
f.pos=endPagePos+page[2] | |
break if OggMetadata.isLastPage(page) | |
end | |
return commentPacket | |
end | |
public | |
def self.getAudioInfo(f) | |
if f.is_a?(String) | |
File.open(f,"rb"){|ff| return getAudioInfo(ff) } | |
end | |
page=OggMetadata.readPage(f) | |
isContinuation=OggMetadata.isContinuation(page) | |
if isContinuation | |
raise "continuation in first page" if packetIndex<0 | |
end | |
f.pos+=1 | |
if f.read(6)!="vorbis" | |
raise "Unsupported codec" | |
end | |
data=f.read(23).unpack("VCVVVVCC") | |
if data[0]!=0 || data[1]==0 || data[2]==0 || data[7]==0 || | |
(data[6]&15)<6 || (data[6]&15)>13 || | |
((data[6]>>4)&15)<6 || ((data[6]>>4)&15)>13 || | |
(data[6]&15)>((data[6]>>4)&15) | |
raise "Invalid Vorbis header" | |
end | |
channels=data[1] | |
samplerate=data[2] | |
f.pos=0 | |
serial=nil | |
maxGranulePos=0 | |
while true | |
page=OggMetadata.readPage(f) | |
isContinuation=OggMetadata.isContinuation(page) | |
if isContinuation | |
raise "continuation in first page" if packetIndex<0 | |
firstPacket-=1 | |
end | |
if serial!=nil && serial!=OggMetadata.streamSerial(page) | |
raise "multiple logical bitstreams not supported" | |
end | |
granulePos=page[0][1]+(page[0][2]<<32) | |
if (granulePos>>63)==0 | |
# Use only positive granule positions | |
maxGranulePos=[maxGranulePos,granulePos].max | |
end | |
serial=OggMetadata.streamSerial(page) | |
endPagePos=f.pos | |
f.pos=endPagePos+page[2] | |
break if OggMetadata.isLastPage(page) | |
end | |
duration=maxGranulePos*1.0/samplerate | |
return {"channels"=>channels,"samplerate"=>samplerate,"duration"=>duration} | |
end | |
def self.writeMetadata(metadata, fileInput, fileOutput=nil) | |
fileOutput=fileInput if !fileOutput | |
commentPacket=makeCommentPacket(metadata) | |
freefile=getFreeFile(fileOutput) | |
begin | |
File.open(fileInput,"rb"){|f| | |
File.open(freefile,"wb"){|fw| | |
replaceCommentPacket(f,fw,commentPacket) | |
}} | |
if freefile!=fileOutput | |
File.delete(fileOutput) rescue nil | |
File.rename(freefile,fileOutput) | |
end | |
ensure | |
if freefile!=fileOutput | |
File.delete(freefile) rescue nil | |
end | |
end | |
end | |
def self.getMetadata(f) | |
if f.is_a?(String) | |
File.open(f,"rb"){|ff| return getMetadata(ff) } | |
end | |
comment=OggMetadata.getCommentPacket(f) | |
if comment[1,6]!="vorbis" | |
raise "Unsupported comment packet" | |
end | |
offset=7 | |
length=comment[offset,4].unpack("V")[0] | |
offset+=4+length # skip vendor string | |
commentLength=comment[offset,4].unpack("V")[0] | |
offset+=4 | |
commentHash={} | |
for i in 0...commentLength | |
length=comment[offset,4].unpack("V")[0] | |
offset+=4 | |
keyvalue=comment[offset,length] | |
offset+=length | |
if keyvalue.include?("=") | |
idx=keyvalue.index("=") | |
key=keyvalue[0,idx].downcase | |
value=(keyvalue[idx+1,keyvalue.length]||"") | |
existing=commentHash[key] | |
if existing | |
commentHash[key]=[existing] if existing.is_a?(String) | |
commentHash[key].push(value) | |
else | |
commentHash[key]=value | |
end | |
end | |
end | |
framingBit=comment[offset,1].unpack("C")[0] | |
raise "invalid framing bit" if framingBit==0 | |
return commentHash | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment