Created
October 20, 2013 20:10
-
-
Save papaver/7074683 to your computer and use it in GitHub Desktop.
Avid Filmscribe Cutlist (.ctl) parser
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#------------------------------------------------------------------------------ | |
# filmscribeCutList.py - avid filmscribe cutlist (.ctl) parser | |
#------------------------------------------------------------------------------ | |
import collections | |
import itertools | |
import logging | |
import os | |
import os.path | |
import re | |
#------------------------------------------------------------------------------ | |
# defines | |
#------------------------------------------------------------------------------ | |
kTimecodePattern = '(?P<timecode>\d{2}:\d{2}:\d{2}:\d{2})' | |
kFootage = 'Footage' | |
kRecordTC = 'Record TC' | |
kStartTC = 'Start TC' | |
kClipName = 'Clip Name' | |
kOptical = 'Optical' | |
kSequence = 'Sequence' | |
kDescription = 'Description' | |
kSideA = 'A' | |
kSideB = 'B' | |
#------------------------------------------------------------------------------ | |
# structs | |
#------------------------------------------------------------------------------ | |
Range = collections.namedtuple('Range', ['start', 'end']) | |
#------------------------------------------------------------------------------ | |
# Timecode | |
#------------------------------------------------------------------------------ | |
class Timecode(object): | |
"""Representation of a video timecode. | |
""" | |
#-------------------------------------------------------------------------- | |
# object | |
#-------------------------------------------------------------------------- | |
def __init__(self, timecode): | |
super(Timecode, self).__init__() | |
tokens = timecode.split(':') | |
self.hours, self.minutes, self.seconds, self.frames = map(int, tokens) | |
#-------------------------------------------------------------------------- | |
def __str__(self): | |
return "%(hours)02d:%(minutes)02d:%(seconds)02d:%(frames)02d" % \ | |
self.__dict__ | |
#-------------------------------------------------------------------------- | |
def __repr__(self): | |
return "Timecode(%s)" % str(self) | |
#------------------------------------------------------------------------------ | |
# Keycode | |
#------------------------------------------------------------------------------ | |
class Keycode(object): | |
"""Representation of a film keycode (edgecode). | |
""" | |
#-------------------------------------------------------------------------- | |
# object | |
#-------------------------------------------------------------------------- | |
def __init__(self, timecode): | |
super(Keycode, self).__init__() | |
tokens = timecode.split('+') | |
self.feet, self.frames = map(int, tokens) | |
#-------------------------------------------------------------------------- | |
def __cmp__(self, other): | |
return cmp((self.feet, self.frames), (other.feet, other.frames)) | |
#-------------------------------------------------------------------------- | |
def __add__(self, other): | |
frames = self.frames + int(other) | |
return Keycode("%(feet)04d+%(frames)02d" % { | |
'feet' : self.feet + (frames / 16), | |
'frames' : frames % 16 | |
}) | |
#-------------------------------------------------------------------------- | |
def __str__(self): | |
return "%(feet)04d+%(frames)02d" % self.__dict__ | |
#-------------------------------------------------------------------------- | |
def __repr__(self): | |
return "Keycode(%s)" % str(self) | |
#-------------------------------------------------------------------------- | |
# methods | |
#-------------------------------------------------------------------------- | |
def totalFrames(self): | |
return (self.feet * 16) + self.frames | |
#------------------------------------------------------------------------------ | |
# LineIterator | |
#------------------------------------------------------------------------------ | |
class LineIterator(object): | |
"""Handles walking through an array of lines, checking for lines with | |
certain content as well and returning blocks of lines. | |
""" | |
#-------------------------------------------------------------------------- | |
# object | |
#-------------------------------------------------------------------------- | |
def __init__(self, lines): | |
super(LineIterator, self).__init__() | |
self._lines = lines | |
self._index = 0 | |
#-------------------------------------------------------------------------- | |
# properties | |
#-------------------------------------------------------------------------- | |
@property | |
def index(self): | |
return self._index | |
#-------------------------------------------------------------------------- | |
@property | |
def line(self): | |
return self._lines[self._index] | |
#-------------------------------------------------------------------------- | |
# methods | |
#-------------------------------------------------------------------------- | |
def isEOF(self): | |
"""Check if the current line index points past the line array. | |
""" | |
return self._index >= len(self._lines) | |
#-------------------------------------------------------------------------- | |
def next(self): | |
"""Increment the current line index by one. | |
""" | |
self._index += 1 | |
#-------------------------------------------------------------------------- | |
def peek(self): | |
"""Return the next line without moving the current index, None is | |
returned if past eof. | |
""" | |
if (self._index + 1) >= len(self._lines): | |
return None | |
else: | |
return self._lines[self._index + 1] | |
#-------------------------------------------------------------------------- | |
def isEmpty(self, line): | |
"""Checks to see if the line is empty. | |
""" | |
return len(line) == 0 | |
#-------------------------------------------------------------------------- | |
def isStartOfHeader(self, line): | |
"""Checks to see if the line contains only a SOH (^A|\x01) character. | |
""" | |
return re.match('^\x01$', line) != None | |
#-------------------------------------------------------------------------- | |
def isEndOfTable(self, line): | |
"""Checks to see if the line contains an end of table entry. | |
""" | |
return re.match('^\(.+\)$', line) != None | |
#-------------------------------------------------------------------------- | |
def isDoubleLineSeperator(self, line): | |
"""Checks to see if the line is a seperator made with '=' chars. | |
""" | |
return re.match('^=+$', line) != None | |
#-------------------------------------------------------------------------- | |
def forwardToStartOfHeader(self): | |
"""Moves the current line forward until a start of header line is | |
found. The current line index will point to the next line. | |
""" | |
# save the index we started at | |
startIndex = self.index | |
# walk forward till SOH is found, raise exception if not found | |
while not self.isStartOfHeader(self.line): | |
self.next() | |
# check if the lines ran out | |
if self.isEOF(): | |
startLine = self._lines[startIndex] | |
raise ValueError("Could not find SOH line, started " \ | |
"looking at line %d (%s)" % (startIndex, startLine)) | |
# consume the line with the SOH | |
self.next() | |
#-------------------------------------------------------------------------- | |
def skipEmpty(self): | |
"""Moves the current line forward until a line with content is found. | |
""" | |
# save the index we started at | |
startIndex = self.index | |
# walk forward till content is found, raise exception if not found | |
while self.isEmpty(self.line): | |
self.next() | |
# check if the lines ran out | |
if self.isEOF(): | |
startLine = self._lines[startIndex] | |
raise ValueError("Could not find more content, started " \ | |
"looking at line %d (%s)" % (startIndex, startLine)) | |
#-------------------------------------------------------------------------- | |
def popLine(self): | |
"""Return the current line and move to the next. | |
""" | |
# check if the lines ran out | |
if self.isEOF(): | |
raise ValueError("Cannot pop line if at end of block.") | |
# grab the current line and step forward | |
line = self.line | |
self.next() | |
return line | |
#-------------------------------------------------------------------------- | |
def popBlock(self): | |
"""Find the next empty line and return the block of lines in between. | |
The empty line will be consumed and not returned. The current index | |
will point to the next line. | |
""" | |
# save the index we started at | |
startIndex = self._index | |
# walk forward till empty is found, raise exception if not found | |
while not self.isEmpty(self.line): | |
self.next() | |
# check if the lines ran out | |
if self.isEOF(): | |
startLine = self._lines[startIndex] | |
raise ValueError("Could not find empty line, started " \ | |
"looking at line %d (%s)" % (startIndex, startLine)) | |
# consume the empty line | |
self.next() | |
# return the block iterated over | |
return self._lines[startIndex:self.index - 1] | |
#-------------------------------------------------------------------------- | |
def popOpticalBlock(self): | |
"""Finds the next double line seperator and returns the lines | |
consisting of the block. The line seperator will be consumed and not | |
returned. The current index will point to the next line. | |
""" | |
# save the index we started at | |
startIndex = self.index | |
# walk forward till end of table is found, raise exception if not found | |
while not self.isDoubleLineSeperator(self.line): | |
self.next() | |
# check if the lines ran out | |
if self.isEOF(): | |
startLine = self._lines[startIndex] | |
raise ValueError("Could not find double line seperator, " \ | |
"started looking at line %d (%s)" % (startIndex, startLine)) | |
# consume the line seperator | |
self.next() | |
# return the block iterated over | |
return self._lines[startIndex:self.index - 1] | |
#-------------------------------------------------------------------------- | |
def popTable(self): | |
"""Find the next end of table line and return the lines consisting of | |
the table. The end of table line will be consumed and not returned. | |
The current index will point to the next line. | |
""" | |
# save the index we started at | |
startIndex = self.index | |
# walk forward till end of table is found, raise exception if not found | |
while not self.isEndOfTable(self.line): | |
self.next() | |
# check if the lines ran out | |
if self.isEOF(): | |
startLine = self._lines[startIndex] | |
raise ValueError("Could not find end of table line, started " \ | |
"looking at line %d (%s)" % (startIndex, startLine)) | |
# consume the end of table tag | |
self.next() | |
# return the block iterated over | |
return self._lines[startIndex:self.index - 1] | |
#------------------------------------------------------------------------------ | |
# Event | |
#------------------------------------------------------------------------------ | |
class Event(object): | |
"""Represents an event found in the assemble table. Its possbile all | |
event data may not be available if column header information is not found. | |
If this event contains an optical its id will be available, if not the | |
optical itself. | |
""" | |
#-------------------------------------------------------------------------- | |
# object | |
#-------------------------------------------------------------------------- | |
def __init__(self, lines, headers): | |
"""Parse the event using the header information. | |
""" | |
super(Event, self).__init__() | |
# verify atleast the footage header was provided | |
if kFootage not in headers: | |
raise ValueError("Header '%s' is required to parse event." % \ | |
kFootage) | |
# use the line iterator to ease the parsing process | |
iterator = LineIterator(lines) | |
# check if this event has an optical marker | |
self._hasOptical = re.match('^\s+\x01$', iterator.line) != None | |
if self._hasOptical: | |
iterator.next() | |
# skip to the line containing the event index | |
while True: | |
match = re.match('^\s*(?P<id>\d+)\.', iterator.line) | |
if match != None: | |
break | |
iterator.next() | |
# save the event index | |
self._id = int(match.group('id')) | |
# parse out the optical id | |
self._opticalId = None | |
if self._hasOptical: | |
match = re.match('^.+OPTICAL #(?P<id>\d+)', iterator.line) | |
if match != None: | |
self._opticalId = int(match.group('id')) | |
else: | |
logging.warning("Optical id could not be " \ | |
"found on event '%s'." % self._id) | |
# cut up the columns for the start/end data | |
starts = self._parseColumns(iterator.popLine(), headers) | |
ends = self._parseColumns(iterator.popLine(), headers) | |
columns = dict(zip(starts.keys(), zip(starts.values(), ends.values()))) | |
# save the available columns | |
self._footage = Range(*map(Keycode, columns[kFootage])) | |
self._recordTC = None if kRecordTC not in headers else \ | |
Range(*map(Timecode, columns[kRecordTC])) | |
# parse remaining data if not optical event | |
self._startTC = None | |
self._clipName = '' | |
if not self._hasOptical: | |
if kStartTC in headers: | |
self._startTC = Range(*map(Timecode, columns[kStartTC])) | |
if kClipName in headers: | |
self._clipName = starts[kClipName].strip() | |
# set default for optical object | |
self._optical = None | |
#-------------------------------------------------------------------------- | |
def __str__(self): | |
return ('e%s' % self._id if self._clipName == '' else self._clipName) + \ | |
(', op%s' % self._optical if self._optical != None else '') | |
#-------------------------------------------------------------------------- | |
def __repr__(self): | |
return "Event(%s)" % str(self) | |
#-------------------------------------------------------------------------- | |
# properties | |
#-------------------------------------------------------------------------- | |
@property | |
def id(self): | |
return self._id | |
#-------------------------------------------------------------------------- | |
@property | |
def footage(self): | |
return self._footage | |
#-------------------------------------------------------------------------- | |
@property | |
def recordTC(self): | |
return self._recordTC | |
#-------------------------------------------------------------------------- | |
@property | |
def startTC(self): | |
return self._startTC | |
#-------------------------------------------------------------------------- | |
@property | |
def clipName(self): | |
return self._clipName | |
#-------------------------------------------------------------------------- | |
@property | |
def optical(self): | |
return self._optical | |
#-------------------------------------------------------------------------- | |
@property | |
def hasOptical(self): | |
return self._hasOptical | |
#-------------------------------------------------------------------------- | |
@property | |
def opticalId(self): | |
return self._opticalId | |
#-------------------------------------------------------------------------- | |
# methods | |
#-------------------------------------------------------------------------- | |
def linkOptical(self, optical): | |
self._optical = optical | |
#-------------------------------------------------------------------------- | |
# internal methods | |
#-------------------------------------------------------------------------- | |
def _parseColumns(self, line, headers): | |
"""Cut up the line into the columns provided by the headers. | |
""" | |
columns = {} | |
# cut up all available columns | |
for key in headers: | |
start, end = headers[key] | |
columns[key] = line[start:end] | |
return columns | |
#------------------------------------------------------------------------------ | |
# AssembleTable | |
#------------------------------------------------------------------------------ | |
class AssembleTable(object): | |
"""Represents the data found in the assemble list in a filmscribe ctl | |
file. It's possbile the data found may be different in every file. If the | |
required columns aren't found an exception will be thrown. | |
""" | |
#-------------------------------------------------------------------------- | |
# object | |
#-------------------------------------------------------------------------- | |
def __init__(self, lines): | |
"""Parse the assemble table from the lines provided. | |
""" | |
super(AssembleTable, self).__init__() | |
# use the line iterator to ease the parsing process | |
iterator = LineIterator(lines) | |
# parse the meta data found in the table header, first 3 lines | |
eventCount = self._parseHeaderEvents(iterator.popLine()) | |
iterator.next() | |
opticalCount, _ = self._parseHeaderOpticalsAndTime(iterator.popLine()) | |
# the next line should be a line break seperating the table and header | |
line = iterator.popLine() | |
if re.match('^-+$', line) == None: | |
logging.warning("Expecting header line break, " \ | |
"instead found: %s." % line) | |
# the column headers should be next | |
headers = self._parseColumnHeaders(iterator.popLine()) | |
iterator.skipEmpty() | |
# parse the events from the rest of the table | |
self._events = [] | |
while not iterator.isEOF(): | |
# grab the next block | |
block = iterator.popBlock() | |
# skip single line blocks, events blocks should be atleast 2 lines | |
if len(block) == 1: | |
continue | |
# parse the event | |
self._events.append(Event(block, headers)) | |
# track if any opticals were present | |
opticalEvents = filter(lambda e: e.hasOptical, self._events) | |
self._hasOpticals = len(opticalEvents) > 0 | |
# validate the correct number of events parsed | |
if (eventCount != None) and (len(self._events) != eventCount): | |
logging.warning("Incorrect number of events parsed, only %s of " \ | |
"%s found" % (len(self._events), eventCount)) | |
# validate the correct number of events have opticals | |
if (opticalCount != None) and (len(opticalEvents) != opticalCount): | |
logging.warning("Incorrect number of optical events parsed, " \ | |
"only %s of %s found" % (len(opticalEvents), opticalCount)) | |
#-------------------------------------------------------------------------- | |
# properties | |
#-------------------------------------------------------------------------- | |
@property | |
def events(self): | |
return self._events | |
#-------------------------------------------------------------------------- | |
@property | |
def hasOpticals(self): | |
return self._hasOpticals | |
#-------------------------------------------------------------------------- | |
# internal methods | |
#-------------------------------------------------------------------------- | |
def _parseHeaderEvents(self, line): | |
"""Parse the event count from the line. | |
""" | |
# parse the event count | |
match = re.match('^.+\s+(?P<events>\d+)\sevents\s+LFOA:.*$', line) | |
if match != None: | |
return int(match.group('events')) | |
logging.warning("Event count could not be parsed from header.") | |
return None | |
#-------------------------------------------------------------------------- | |
def _parseHeaderOpticalsAndTime(self, line): | |
"""Parse the optical count and total time from the line. | |
""" | |
# parse the optical count and total time | |
match = re.match('^\s+(?P<opticals>\d+)\sopticals\s+total time:\s%s.*$' % \ | |
kTimecodePattern, line) | |
if match != None: | |
return int(match.group('opticals')), Timecode(match.group('timecode')) | |
logging.warning("Optical count and total time could not be parsed " \ | |
"from header.") | |
return None, None | |
#-------------------------------------------------------------------------- | |
def _parseColumnHeaders(self, line): | |
"""Since the header positions can change we need to figure out the | |
cuts to properly extract the columns. | |
""" | |
headers = {} | |
# look for the footage keycode start, column is required | |
start = line.find(kFootage) | |
if start == -1: | |
raise ValueError("Required header '%s' not found in line '%s'." % \ | |
(kFootage, line)) | |
headers[kFootage] = Range(start, start + 7) | |
# look for the record timecode | |
start = line.find(kRecordTC) | |
if start != -1: | |
headers[kRecordTC] = Range(start, start + 11) | |
else: | |
logging.warning("Could not find expected '%s' " \ | |
"header in line %s." % (kRecordTC, line)) | |
# look for the start timecode | |
start = line.find(kStartTC) | |
if start != -1: | |
headers[kStartTC] = Range(start, start + 11) | |
else: | |
logging.warning("Could not find expected '%s' " \ | |
"header in line %s." % (kStartTC, line)) | |
# look for the clip name, since its a variable size, calculate the end | |
start = line.find(kClipName) | |
if start != -1: | |
rest = line[start+len(kClipName):] | |
rest = "".join((itertools.dropwhile(lambda c: c == ' ', rest))) | |
end = None if len(rest) == 0 else line.find(rest) | |
headers[kClipName] = Range(start, end) | |
else: | |
logging.warning("Could not find expected '%s' " \ | |
"header in line %s." % (kClipName, line)) | |
return headers | |
#------------------------------------------------------------------------------ | |
# OpticalClip | |
#------------------------------------------------------------------------------ | |
class OpticalClip(object): | |
"""Represents a single clip found in the optical block. | |
""" | |
#-------------------------------------------------------------------------- | |
# object | |
#-------------------------------------------------------------------------- | |
def __init__(self, lines, headers, sideHeaders): | |
"""Parse the optical clip content from the block. | |
""" | |
super(OpticalClip, self).__init__() | |
# use the line iterator to ease the parsing process | |
iterator = LineIterator(lines) | |
# first line should contain clip id | |
match = re.match('^\s*(?P<id>\d+)\.', iterator.line) | |
if match == None: | |
raise ValueError("Failed to parse optical clip id on line: %s" % \ | |
line) | |
self._id = match.group('id') | |
# parse the columns with start codes | |
starts = self._parseColumns(iterator.popLine(), headers) | |
# check the next line for an overflow | |
if re.match('^\s+$', iterator.line[:headers[kSideA].start]) != None: | |
# patch the desciption | |
starts[kDescription] += starts[kSideA] | |
# reparse the sides | |
line = iterator.popLine() | |
for side in [kSideA, kSideB]: | |
start, end = headers[side] | |
starts[side] = line[start:end] | |
# parse the column with end codes | |
ends = self._parseColumns(iterator.popLine(), headers) | |
# save the available columns | |
columns = dict(zip(starts.keys(), zip(starts.values(), ends.values()))) | |
self._opticalFootage = Range(*map(Keycode, columns[kOptical])) | |
self._sequenceFootage = Range(*map(Keycode, columns[kSequence])) | |
self._description = starts[kDescription].strip() | |
# parse side columns, use side B if side A is empty | |
side = kSideB if re.match('^\s+$', starts[kSideA]) else kSideA | |
sideHeaders = sideHeaders[side] | |
self._side = side | |
# check for start timecode | |
self._startTC = None | |
if kStartTC in sideHeaders: | |
start, end = sideHeaders[kStartTC] | |
startTCs = map(lambda l: l[start:end], columns[side]) | |
validTCs = filter(lambda tc: re.match(kTimecodePattern, tc), startTCs) | |
if len(validTCs) == 2: | |
self._startTC = Range(*map(Timecode, startTCs)) | |
# look for the clip name | |
self._clipName = '' | |
if kClipName in sideHeaders: | |
start, end = sideHeaders[kClipName] | |
self._clipName = starts[side][start:end].strip() | |
# possible to link to a sub optical | |
self._subMaster = None | |
#-------------------------------------------------------------------------- | |
def __str__(self): | |
return self._clipName | |
#-------------------------------------------------------------------------- | |
def __repr__(self): | |
return "OpticalClip(%s)" % str(self) | |
#-------------------------------------------------------------------------- | |
# properties | |
#-------------------------------------------------------------------------- | |
@property | |
def id(self): | |
return self._id | |
#-------------------------------------------------------------------------- | |
@property | |
def opticalFootage(self): | |
return self._opticalFootage | |
#-------------------------------------------------------------------------- | |
@property | |
def sequenceFootage(self): | |
return self._sequenceFootage | |
#-------------------------------------------------------------------------- | |
@property | |
def description(self): | |
return self._description | |
#-------------------------------------------------------------------------- | |
@property | |
def side(self): | |
return self._side | |
#-------------------------------------------------------------------------- | |
@property | |
def startTC(self): | |
return self._startTC | |
#-------------------------------------------------------------------------- | |
@property | |
def clipName(self): | |
return self._clipName | |
#-------------------------------------------------------------------------- | |
@property | |
def subMaster(self): | |
return self._subMaster | |
#-------------------------------------------------------------------------- | |
# methods | |
#-------------------------------------------------------------------------- | |
def linkSubMaster(self, subMaster): | |
self._subMaster = subMaster | |
#-------------------------------------------------------------------------- | |
# internal methods | |
#-------------------------------------------------------------------------- | |
def _parseColumns(self, line, headers): | |
"""Cut up the line into the columns provided by the headers. | |
""" | |
columns = {} | |
# cut up all available columns | |
for key in headers: | |
start, end = headers[key] | |
columns[key] = line[start:end] | |
return columns | |
#------------------------------------------------------------------------------ | |
# Optical | |
#------------------------------------------------------------------------------ | |
class Optical(object): | |
"""Represents an optical found in the optical table. Its possbile all | |
optical data may not be available if column header information is not | |
found. Opticals may also contain sub opticals. | |
""" | |
#-------------------------------------------------------------------------- | |
# object | |
#-------------------------------------------------------------------------- | |
def __init__(self, lines): | |
"""Parse the optical block, both header and content are contained. | |
""" | |
super(Optical, self).__init__() | |
# use the line iterator to ease the parsing process | |
iterator = LineIterator(lines) | |
# skip any empy lines | |
iterator.skipEmpty() | |
# first content line should contain optical/event ids | |
match = re.match('^OPTICAL #(?P<id>\d+)\s(?:SubMaster #(?P<subId>\d+))?' \ | |
'\s+Assemble Event #(?P<eventId>\d+)\s+total length:\s' \ | |
'(?P<length>\d{4}\+\d{2})$', iterator.popLine()) | |
if match == None: | |
raise ValueError("Could not parse optical id from block.") | |
# save the ids | |
groups = match.groupdict() | |
self._id = int(groups['id']) | |
self._subId = int(groups['subId']) if groups['subId'] != None else None | |
self._eventId = int(groups['eventId']) | |
self._length = Keycode(groups['length']) | |
# skip forward to the blocks header | |
iterator.skipEmpty() | |
# parse the column headers next, spread over three lines, with | |
# multiple sections, sideA and sideB | |
headers, sideHeaders = self._parseColumnHeaders(iterator.popBlock()) | |
# parse the opticals from the rest of the table | |
layer = [] | |
self._layers = [] | |
while not iterator.isEOF(): | |
# check for layer change | |
match = re.match('^Layer \d+ of \d+$', iterator.line) | |
if match != None: | |
iterator.popBlock() | |
if len(layer) != 0: | |
self._layers.append(layer) | |
layer = [] | |
continue | |
# parse the optical clip | |
layer.append(OpticalClip(iterator.popBlock(), headers, sideHeaders)) | |
# add the last layer to the list | |
self._layers.append(layer) | |
#-------------------------------------------------------------------------- | |
def __str__(self): | |
return str(self._id) | |
#-------------------------------------------------------------------------- | |
def __repr__(self): | |
return "Optical(%s)" % str(self) | |
#-------------------------------------------------------------------------- | |
# properties | |
#-------------------------------------------------------------------------- | |
@property | |
def id(self): | |
return self._id | |
#-------------------------------------------------------------------------- | |
@property | |
def subMasterId(self): | |
return self._subId | |
#-------------------------------------------------------------------------- | |
@property | |
def eventId(self): | |
return self._eventId | |
#-------------------------------------------------------------------------- | |
@property | |
def length(self): | |
return self._length | |
#-------------------------------------------------------------------------- | |
@property | |
def layers(self): | |
return self._layers | |
#-------------------------------------------------------------------------- | |
# internal methods | |
#-------------------------------------------------------------------------- | |
def _parseColumnHeaders(self, lines): | |
"""Since the header positions can change we need to figure out the | |
cuts to properly extract the columns. The optical header also | |
contains header sections for tapeA and tapeB. | |
""" | |
headers = {} | |
sideHeaders = { kSideA : {}, kSideB : {} } | |
# use the line iterator to ease the parsing process | |
iterator = LineIterator(lines) | |
# the inital line should contain optical footage and the side sections | |
line = iterator.popLine() | |
# look for the optical footage keycode start, column is required | |
start = line.find(kOptical) | |
if start == -1: | |
raise ValueError("Required header '%s' not found in line '%s'." % \ | |
(kOptical, line)) | |
headers[kOptical] = Range(start, start + 7) | |
# look for the sequence footage keycode start, column is required | |
start = line.find(kSequence) | |
if start == -1: | |
raise ValueError("Required header '%s' not found in line '%s'." % \ | |
(kSequence, line)) | |
headers[kSequence] = Range(start, start + 7) | |
# look for the side section markers | |
sectionMarkers = [match.start() for match in re.finditer('\+', line)] | |
if len(sectionMarkers) != 4: | |
raise ValueError("Required side section markers missing on " \ | |
"line '%s'." % line) | |
headers[kSideA] = Range(sectionMarkers[0], sectionMarkers[1] + 1) | |
headers[kSideB] = Range(sectionMarkers[2], sectionMarkers[3] + 1) | |
# grab the description column from the second line | |
line = iterator.popLine() | |
start = line.find(kDescription) | |
if start == -1: | |
raise ValueError("Required header '%s' not found in line '%s'." % \ | |
(kDescription, line)) | |
headers[kDescription] = Range(start, headers[kSideA].start) | |
# the last line contains the side headings | |
line = iterator.popLine() | |
for side in [kSideA, kSideB]: | |
# cut out the side section from the line, keeps parsing easier | |
start, end = headers[side] | |
sideLine = line[start:end] | |
# look for start timecode | |
start = sideLine.find(kStartTC) | |
if start != -1: | |
sideHeaders[side][kStartTC] = Range(start, start + 11) | |
# look for the clip name, since its variable look for the end | |
start = sideLine.find(kClipName) | |
if start != -1: | |
rest = sideLine[start+len(kClipName):] | |
rest = "".join((itertools.dropwhile(lambda c: c == ' ', rest))) | |
end = None if len(rest) == 0 else sideLine.find(rest) | |
sideHeaders[side][kClipName] = Range(start, end) | |
return (headers, sideHeaders) | |
#------------------------------------------------------------------------------ | |
# OpticalTable | |
#------------------------------------------------------------------------------ | |
class OpticalTable(object): | |
"""Represents the data found in the optical list in a filmscribe ctl | |
file. It's possbile the data found may be different in every file. | |
""" | |
#-------------------------------------------------------------------------- | |
# object | |
#-------------------------------------------------------------------------- | |
def __init__(self, lines): | |
"""Parse the optical table from the lines provided. | |
""" | |
super(OpticalTable, self).__init__() | |
# the optical table doesn't contain a seperator for the last entry, | |
# add one to the end of the table so an edge case doesn't exist | |
lines[-1] = '=' * 59 | |
# use the line iterator to ease the parsing process | |
iterator = LineIterator(lines) | |
# parse the meta data found in the table header, first 3 lines | |
opticalCount = self._parseHeaderOpticals(iterator.popLine()) | |
iterator.next() | |
iterator.next() | |
# the next line should be a line break seperating the table and header | |
line = iterator.popLine() | |
if re.match('^-+$', line) == None: | |
logging.warning("Expecting header line break, " \ | |
"instead found: %s." % line) | |
# parse the opticals from the rest of the table | |
submasters = [] | |
self._opticals = [] | |
while not iterator.isEOF(): | |
# grab the next block | |
block = iterator.popOpticalBlock() | |
# parse the optical | |
optical = Optical(block) | |
if optical.subMasterId == None: | |
self._opticals.append(optical) | |
else: | |
submasters.append(optical) | |
# link up the sub master opticals | |
for submaster in submasters: | |
opticalId = submaster.id | |
startFootage = submaster.layers[0][0].sequenceFootage.start | |
# attempt to find by index | |
optical = None | |
if opticalId > len(self._opticals): | |
optical = self._opticals[opticalId] | |
# if not valid search for it | |
if (optical == None) or (optical.id != opticalId): | |
opticalsWithId = filter(lambda o: o.id == opticalId, self._opticals) | |
if len(opticalsWithId) != 1: | |
logging.warning("Could not find optical %d linked to " \ | |
"submaster %d." % (opticalId, submaster.subMasterId)) | |
continue | |
optical = opticalsWithId[0] | |
# link up submaster to proper optical clip | |
for clip in itertools.chain(*optical.layers): | |
if clip.clipName.startswith('Optical') and \ | |
(clip.sequenceFootage.start == startFootage): | |
clip.linkSubMaster(submaster) | |
# validate the correct number of opticals parsed | |
if (opticalCount != None) and (len(self._opticals) != opticalCount): | |
logging.warning("Incorrect number of opticals parsed, " \ | |
"only %s of %s found" % (len(self._opticals), opticalCount)) | |
#-------------------------------------------------------------------------- | |
# properties | |
#-------------------------------------------------------------------------- | |
@property | |
def opticals(self): | |
return self._opticals | |
#-------------------------------------------------------------------------- | |
# internal methods | |
#-------------------------------------------------------------------------- | |
def _parseHeaderOpticals(self, line): | |
"""Parse the optical count from the line. | |
""" | |
# parse the event count | |
match = re.match('^.+\s+(?P<opticals>\d+)\soptical\sunits.*$', line) | |
if match != None: | |
return int(match.group('opticals')) | |
logging.warning("Optical count could not be parsed from header.") | |
return None | |
#------------------------------------------------------------------------------ | |
# FilmscribeCutList | |
#------------------------------------------------------------------------------ | |
class FilmscribeCutList(object): | |
"""Avid Filmscribe cutlist (.ctl) file parser. | |
""" | |
#-------------------------------------------------------------------------- | |
# object | |
#-------------------------------------------------------------------------- | |
def __init__(self, ctlPath): | |
"""Attempt to parse the ctl file. | |
""" | |
super(FilmscribeCutList, self).__init__() | |
# make sure the path is valid | |
if not os.path.exists(ctlPath): | |
raise ValueError("Filmscribe .ctl file not found at '%s'." % \ | |
ctlPath) | |
# open up the file and cut it up into lines, open in universal mode | |
# so we don't have to deal with line ending issues... | |
with open(ctlPath, 'ru') as ctlFile: | |
lines = ctlFile.read().replace('\r', '\n').split('\n') | |
# if no lines found we got a problem houston | |
if len(lines) == 0: | |
raise ValueError("Invalid .ctl file, no data found!") | |
# wrap the lines up in a line manager to make our lives easier | |
iterator = LineIterator(lines) | |
# parse the table of contents | |
iterator.forwardToStartOfHeader() | |
block = iterator.popBlock() | |
tableCount = len(block) | |
# expecting only two tables, the assembly and opticals | |
if tableCount > 2: | |
logging.warning("%d entries in toc, expecting " \ | |
"only 2." % tableCount) | |
# only parse the first two tables | |
tables = [] | |
for _ in range(min(2, tableCount)): | |
iterator.forwardToStartOfHeader() | |
tables.append(iterator.popTable()) | |
# parse the assembly table | |
self._assembleList = AssembleTable(tables[0]) | |
# parse the optical table if opticals present and table available | |
self._opticalList = None | |
if (len(tables) > 1) and self._assembleList.hasOpticals: | |
self._opticalList = OpticalTable(tables[1]) | |
# link up the opticals with thier events | |
events = self._assembleList.events | |
for optical in self._opticalList.opticals: | |
eventId = optical.eventId | |
# attempt to get event by indexing | |
event = events[eventId] if eventId < len(events) else None | |
# use filtering if indexing fails | |
if (event == None) or (event.id != eventId): | |
eventsWithId = filter(lambda e: e.id == eventId, events) | |
if len(eventsWithId) != 1: | |
logging.warning("Could not find event %d linked to " \ | |
"optical %d." % (eventId, optical.id)) | |
continue | |
event = eventsWithId[0] | |
# link optical w/ event | |
event.linkOptical(optical) | |
#-------------------------------------------------------------------------- | |
# properties | |
#-------------------------------------------------------------------------- | |
@property | |
def assembleList(self): | |
return self._assembleList | |
#-------------------------------------------------------------------------- | |
@property | |
def opticalList(self): | |
return self._opticalList |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment