|
from enum import Enum, auto |
|
import re |
|
import sys |
|
from typing import List, Optional, Union |
|
from pydantic import BaseModel |
|
|
|
class TokenType(Enum): |
|
ENTITY = 'ENTITY' |
|
PROPERTY = 'PROPERTY' |
|
ARROW = 'ARROW' |
|
VARIABLE = 'VARIABLE' |
|
EQUALS = 'EQUALS' |
|
LBRACKET = 'LBRACKET' |
|
RBRACKET = 'RBRACKET' |
|
LBRACE = 'LBRACE' |
|
RBRACE = 'RBRACE' |
|
LPAREN = 'LPAREN' |
|
RPAREN = 'RPAREN' |
|
SEMICOLON = 'SEMICOLON' |
|
COMMA = 'COMMA' |
|
VALUE = 'VALUE' |
|
WHITESPACE = 'WHITESPACE' |
|
|
|
class Property(BaseModel): |
|
name: str |
|
value: Union[str, 'Entity'] |
|
|
|
class Entity(BaseModel): |
|
name: str |
|
properties: List[Property] |
|
|
|
class Group(BaseModel): |
|
entities: List[Entity] |
|
|
|
class Context(BaseModel): |
|
temporal: Optional['TimeInterval'] = None |
|
source: Optional[Entity] = None |
|
|
|
class TimeInterval(BaseModel): |
|
start: str # ISO date format |
|
end: Optional[str] = None |
|
|
|
class Relation(BaseModel): |
|
"""A relation between entities or groups""" |
|
subject: Union[Entity, Group] |
|
predicate: Optional[str] = None |
|
object: Union[Entity, Group] |
|
context: Optional[Context] = None |
|
|
|
class AST(BaseModel): |
|
entities: List[Entity] |
|
relations: List[Relation] = [] |
|
variables: dict = {} |
|
|
|
class MRSLParser: |
|
def __init__(self): |
|
self.token_patterns = [ |
|
(TokenType.WHITESPACE, r'[ \t\n\r]+'), |
|
(TokenType.PROPERTY, r'[a-zA-Z][\w]*:[a-zA-Z][\w-]*'), |
|
(TokenType.ENTITY, r'@[a-zA-Z_][\w/]*'), |
|
(TokenType.VARIABLE, r'[a-zA-Z_]\w*(?=\s*=)'), |
|
(TokenType.VALUE, r'"[^"]*"|\d{4}-\d{2}-\d{2}|\d+|[A-Za-z]+'), |
|
(TokenType.EQUALS, r'='), |
|
(TokenType.ARROW, r'-\[([^\]]+)\]->|->'), |
|
(TokenType.LBRACKET, r'\['), |
|
(TokenType.RBRACKET, r'\]'), |
|
(TokenType.LBRACE, r'\{'), |
|
(TokenType.RBRACE, r'\}'), |
|
(TokenType.LPAREN, r'\('), |
|
(TokenType.RPAREN, r'\)'), |
|
(TokenType.SEMICOLON, r';'), |
|
(TokenType.COMMA, r','), |
|
] |
|
self.variables = {} # Store variable definitions |
|
|
|
def tokenize(self, text): |
|
tokens = [] |
|
position = 0 |
|
while position < len(text): |
|
match = None |
|
remaining = text[position:] |
|
for token_type, pattern in self.token_patterns: |
|
regex = re.compile(pattern) |
|
match = regex.match(remaining) |
|
if match: |
|
value = match.group(0) |
|
if token_type == TokenType.ARROW and '[' in value: |
|
# Extract predicate from -[predicate]-> |
|
predicate = match.group(1) # Get the captured group |
|
tokens.append((TokenType.ARROW, value, predicate)) # Store predicate in token |
|
elif token_type != TokenType.WHITESPACE: |
|
tokens.append((token_type, value)) |
|
position += len(value) |
|
break |
|
if not match: |
|
context = text[max(0, position-10):position] + "👉" + text[position:position+10] |
|
raise SyntaxError(f"Invalid syntax at position {position}\nContext: {context}") |
|
assert len(tokens) < 1000, "Tokens length is too long" |
|
return tokens |
|
|
|
# Apply CSS classes to the AST |
|
# .mrsl-entity { color: blue; } |
|
# .mrsl-property { color: green; } |
|
# .mrsl-operator { color: red; } |
|
# .mrsl-value { color: purple; } |
|
# .mrsl-variable { color: orange; } |
|
|
|
def parse(self, text: str) -> Union[Entity, AST]: |
|
"""Parse MRSL text into an AST""" |
|
|
|
# import traceback |
|
# print(''.join(traceback.format_stack()), file=sys.stderr) |
|
|
|
tokens = self.tokenize(text) |
|
ast = AST(entities=[], relations=[], variables={}) |
|
|
|
while tokens: |
|
# Skip whitespace |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
if not tokens: |
|
break |
|
|
|
# Parse next statement |
|
if tokens[0][0] == TokenType.VARIABLE: |
|
var_name = tokens[0][1] |
|
tokens = tokens[1:] # Consume variable name |
|
|
|
# Skip whitespace before equals |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
if not tokens or tokens[0][0] != TokenType.EQUALS: |
|
raise SyntaxError("Expected = after variable name") |
|
tokens = tokens[1:] # Consume equals |
|
|
|
# Parse the value (entity or relation) |
|
value, tokens = self.parse_value(tokens) |
|
ast.variables[var_name] = value |
|
|
|
elif tokens[0][0] == TokenType.LPAREN: |
|
relation, tokens = self.parse_relation(tokens) |
|
ast.relations.append(relation) |
|
elif tokens[0][0] == TokenType.ENTITY: |
|
entity, tokens = self.parse_entity(tokens) |
|
ast.entities.append(entity) |
|
else: |
|
raise SyntaxError(f"Expected entity, relation, or variable, got {tokens[0][0]}") |
|
|
|
return ast |
|
|
|
def parse_entity(self, tokens: List[tuple]) -> tuple[Entity, List[tuple]]: |
|
"""Parse an entity definition, return (Entity, remaining_tokens)""" |
|
|
|
if not tokens or tokens[0][0] != TokenType.ENTITY: |
|
raise SyntaxError("Expected entity starting with @") |
|
|
|
entity_name = tokens[0][1] |
|
tokens = tokens[1:] # Consume entity token |
|
|
|
properties = [] |
|
|
|
# Skip whitespace before properties |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
# Check if we have properties |
|
if tokens and tokens[0][0] == TokenType.LBRACKET: |
|
tokens = tokens[1:] # Consume [ |
|
|
|
while tokens: |
|
# Skip whitespace |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
if not tokens: |
|
raise SyntaxError("Unexpected end of input in properties") |
|
|
|
if tokens[0][0] == TokenType.RBRACKET: |
|
tokens = tokens[1:] # Consume ] |
|
break |
|
|
|
# Parse property |
|
if tokens[0][0] != TokenType.PROPERTY: |
|
raise SyntaxError(f"Expected property name, got {tokens[0][0]}") |
|
|
|
prop, tokens = self.parse_property(tokens) |
|
properties.append(prop) |
|
|
|
# Skip whitespace after property |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
# Check for comma or closing bracket |
|
if tokens and tokens[0][0] == TokenType.COMMA: |
|
tokens = tokens[1:] # Consume comma |
|
continue |
|
elif tokens and tokens[0][0] == TokenType.RBRACKET: |
|
tokens = tokens[1:] # Consume ] |
|
break |
|
else: |
|
raise SyntaxError("Expected , or ] after property") |
|
|
|
return Entity(name=entity_name, properties=properties), tokens |
|
|
|
def parse_property(self, tokens: List[tuple]) -> tuple[Property, List[tuple]]: |
|
"""Parse a property definition, return (Property, remaining_tokens)""" |
|
if not tokens or tokens[0][0] != TokenType.PROPERTY: |
|
raise SyntaxError(f"Expected property name, got {tokens[0][0] if tokens else 'EOF'}") |
|
|
|
prop_name = tokens[0][1] |
|
tokens = tokens[1:] # Consume property name |
|
|
|
# Skip whitespace |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
if not tokens: |
|
raise SyntaxError("Unexpected end of input after property name") |
|
|
|
# Handle both direct nested structures and equals assignments |
|
if tokens[0][0] == TokenType.LBRACKET: |
|
# Direct nested structure |
|
nested_entity, tokens = self.parse_nested_structure(tokens) |
|
return Property(name=prop_name, value=nested_entity), tokens |
|
elif tokens[0][0] == TokenType.EQUALS: |
|
# Equals assignment |
|
tokens = tokens[1:] # Consume equals |
|
|
|
# Skip whitespace after equals |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
if not tokens: |
|
raise SyntaxError("Expected value after =") |
|
|
|
if tokens[0][0] == TokenType.ENTITY: |
|
# Value is a nested entity |
|
entity, tokens = self.parse_entity(tokens) |
|
value = entity |
|
elif tokens[0][0] == TokenType.VALUE: |
|
# Value is a simple value |
|
value = tokens[0][1] |
|
tokens = tokens[1:] # Consume value |
|
elif tokens[0][0] == TokenType.LBRACKET: |
|
# Handle nested structure after equals |
|
nested_entity, tokens = self.parse_nested_structure(tokens) |
|
value = nested_entity |
|
else: |
|
raise SyntaxError(f"Expected value, entity, or nested structure, got {tokens[0][0]}") |
|
|
|
return Property(name=prop_name, value=value), tokens |
|
else: |
|
raise SyntaxError(f"Expected = or [ after property name, got {tokens[0][0]}") |
|
|
|
def parse_nested_structure(self, tokens: List[tuple]) -> tuple[Entity, List[tuple]]: |
|
"""Parse a nested property structure""" |
|
if not tokens or tokens[0][0] != TokenType.LBRACKET: |
|
raise SyntaxError("Expected [ at start of nested structure") |
|
tokens = tokens[1:] # Consume [ |
|
|
|
properties = [] |
|
while tokens: |
|
# Skip whitespace |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
if not tokens: |
|
raise SyntaxError("Unexpected end of input in nested structure") |
|
|
|
if tokens[0][0] == TokenType.RBRACKET: |
|
tokens = tokens[1:] # Consume ] |
|
break |
|
|
|
# Parse property |
|
prop, tokens = self.parse_property(tokens) |
|
properties.append(prop) |
|
|
|
# Skip whitespace |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
# Check for comma or closing bracket |
|
if tokens and tokens[0][0] == TokenType.COMMA: |
|
tokens = tokens[1:] # Consume comma |
|
continue |
|
elif tokens and tokens[0][0] == TokenType.RBRACKET: |
|
tokens = tokens[1:] # Consume ] |
|
break |
|
else: |
|
raise SyntaxError("Expected , or ] in nested structure") |
|
|
|
return Entity(name="", properties=properties), tokens |
|
|
|
def parse_relation(self, tokens: List[tuple]) -> tuple[Relation, List[tuple]]: |
|
"""Parse a relation definition, return (Relation, remaining_tokens)""" |
|
# Skip opening parenthesis if present |
|
if tokens and tokens[0][0] == TokenType.LPAREN: |
|
tokens = tokens[1:] |
|
|
|
# Parse subject |
|
if not tokens: |
|
raise SyntaxError("Unexpected end of input in relation") |
|
|
|
if tokens[0][0] == TokenType.LBRACE: |
|
subject, tokens = self.parse_group(tokens) |
|
elif tokens[0][0] == TokenType.ENTITY: |
|
subject, tokens = self.parse_entity(tokens) |
|
else: |
|
raise SyntaxError(f"Expected entity or group at start of relation, got {tokens[0][0]}") |
|
|
|
# Skip whitespace before first arrow |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
# Parse first arrow |
|
if not tokens or tokens[0][0] != TokenType.ARROW: |
|
raise SyntaxError("Expected -> in relation") |
|
|
|
arrow_token = tokens[0] |
|
predicate = None |
|
|
|
# Check if the arrow token contains a predicate |
|
if len(arrow_token) > 2 and arrow_token[2]: |
|
predicate = arrow_token[2] # Extract predicate from the third element |
|
# print(f"DEBUG: Found predicate in arrow: {predicate}") |
|
tokens = tokens[1:] # Consume arrow |
|
else: |
|
tokens = tokens[1:] # Consume arrow |
|
# Skip whitespace after first arrow |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
# Check for predicate as a separate property |
|
if tokens and tokens[0][0] == TokenType.PROPERTY: |
|
predicate = tokens[0][1] # Get predicate from property token |
|
# print(f"DEBUG: Found predicate as property: {predicate}") |
|
tokens = tokens[1:] # Consume predicate |
|
|
|
# Skip whitespace |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
# Expect second arrow |
|
if not tokens or tokens[0][0] != TokenType.ARROW: |
|
raise SyntaxError("Expected -> after predicate") |
|
tokens = tokens[1:] # Consume second arrow |
|
|
|
# Skip whitespace before object |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
# Parse object |
|
if not tokens: |
|
raise SyntaxError("Unexpected end of input in relation") |
|
|
|
if tokens[0][0] == TokenType.LBRACE: |
|
obj, tokens = self.parse_group(tokens) |
|
elif tokens[0][0] == TokenType.ENTITY: |
|
obj, tokens = self.parse_entity(tokens) |
|
else: |
|
raise SyntaxError(f"Expected entity or group after arrow, got {tokens[0][0]}") |
|
|
|
# Handle optional closing parenthesis |
|
if tokens and tokens[0][0] == TokenType.RPAREN: |
|
tokens = tokens[1:] |
|
|
|
# Parse context if present |
|
if tokens and tokens[0][0] == TokenType.LBRACKET: |
|
context, tokens = self.parse_context(tokens) |
|
else: |
|
context = None |
|
|
|
relation = Relation(subject=subject, predicate=predicate, object=obj, context=context) |
|
return relation, tokens |
|
|
|
def parse_group(self, tokens: List[tuple]) -> tuple[Group, List[tuple]]: |
|
"""Parse a group of entities, return (Group, remaining_tokens)""" |
|
# Skip opening brace |
|
if not tokens or tokens[0][0] != TokenType.LBRACE: |
|
raise SyntaxError("Expected { at start of group") |
|
tokens = tokens[1:] |
|
|
|
entities = [] |
|
while tokens: |
|
# Skip whitespace |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
if not tokens: |
|
raise SyntaxError("Unexpected end of input in group") |
|
|
|
if tokens[0][0] == TokenType.RBRACE: |
|
tokens = tokens[1:] # Consume closing brace |
|
break |
|
|
|
# Parse entity |
|
if tokens[0][0] != TokenType.ENTITY: |
|
raise SyntaxError(f"Expected entity in group, got {tokens[0][0]}") |
|
|
|
entity, tokens = self.parse_entity(tokens) |
|
entities.append(entity) |
|
|
|
# Skip whitespace |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
# Check for comma or closing brace |
|
if tokens and tokens[0][0] == TokenType.COMMA: |
|
tokens = tokens[1:] # Consume comma |
|
continue |
|
elif tokens and tokens[0][0] == TokenType.RBRACE: |
|
tokens = tokens[1:] # Consume closing brace |
|
break |
|
else: |
|
raise SyntaxError("Expected , or } in group") |
|
|
|
return Group(entities=entities), tokens |
|
|
|
def parse_value(self, tokens: List[tuple]) -> tuple[Union[Entity, Relation], List[tuple]]: |
|
"""Parse a value (entity or relation) after a variable assignment""" |
|
# Skip whitespace |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
if not tokens: |
|
raise SyntaxError("Expected value after =") |
|
|
|
# Check if it's a relation (starts with parenthesis) |
|
if tokens[0][0] == TokenType.LPAREN: |
|
return self.parse_relation(tokens) |
|
# Or if it's an entity |
|
elif tokens[0][0] == TokenType.ENTITY: |
|
return self.parse_entity(tokens) |
|
else: |
|
raise SyntaxError(f"Expected entity or relation, got {tokens[0][0]}") |
|
|
|
def parse_context(self, tokens: List[tuple]) -> tuple[Context, List[tuple]]: |
|
"""Parse a context definition, return (Context, remaining_tokens)""" |
|
if not tokens or tokens[0][0] != TokenType.LBRACKET: |
|
raise SyntaxError("Expected [ at start of context") |
|
tokens = tokens[1:] # Consume [ |
|
|
|
temporal = None |
|
source = None |
|
|
|
while tokens: |
|
# Skip whitespace |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
if not tokens: |
|
raise SyntaxError("Unexpected end of input in context") |
|
|
|
if tokens[0][0] == TokenType.RBRACKET: |
|
tokens = tokens[1:] # Consume ] |
|
break |
|
|
|
# Parse property |
|
if tokens[0][0] != TokenType.PROPERTY: |
|
raise SyntaxError(f"Expected property in context, got {tokens[0][0]}") |
|
|
|
prop_name = tokens[0][1] |
|
tokens = tokens[1:] # Consume property name |
|
|
|
# Skip whitespace before equals |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
if not tokens or tokens[0][0] != TokenType.EQUALS: |
|
raise SyntaxError("Expected = in context property") |
|
tokens = tokens[1:] # Consume equals |
|
|
|
# Skip whitespace after equals |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
if not tokens: |
|
raise SyntaxError("Expected value in context property") |
|
|
|
if prop_name in ['time:hasTime', 'time:start']: # Added time:start |
|
if tokens[0][0] == TokenType.PROPERTY and tokens[0][1] == 'time:Interval': |
|
tokens = tokens[1:] # Consume time:Interval |
|
interval_entity, tokens = self.parse_nested_structure(tokens) |
|
# Extract start/end from interval properties |
|
start = end = None |
|
for prop in interval_entity.properties: |
|
if prop.name == 'time:hasBeginning': |
|
start = prop.value.strip('"') |
|
elif prop.name == 'time:hasEnd': |
|
end = prop.value.strip('"') |
|
temporal = TimeInterval(start=start, end=end) |
|
else: |
|
temporal = TimeInterval(start=tokens[0][1].strip('"')) |
|
tokens = tokens[1:] |
|
elif prop_name == 'dc:source': |
|
if tokens[0][0] != TokenType.ENTITY: |
|
raise SyntaxError(f"Expected entity as source, got {tokens[0][0]}") |
|
source, tokens = self.parse_entity(tokens) |
|
else: |
|
raise SyntaxError(f"Unknown context property: {prop_name}") |
|
|
|
# Skip whitespace after value |
|
while tokens and tokens[0][0] == TokenType.WHITESPACE: |
|
tokens = tokens[1:] |
|
|
|
# Check for comma or closing bracket |
|
if tokens and tokens[0][0] == TokenType.COMMA: |
|
tokens = tokens[1:] # Continue to next property |
|
elif tokens and tokens[0][0] == TokenType.RBRACKET: |
|
tokens = tokens[1:] # End of context |
|
break |
|
else: |
|
raise SyntaxError("Expected , or ] in context") |
|
|
|
return Context(temporal=temporal, source=source), tokens |
|
|
|
def to_rdf(self, ast: Union[AST, Entity]) -> str: |
|
"""Convert AST or Entity to RDF (Turtle format)""" |
|
if isinstance(ast, Entity): |
|
return self._entity_to_rdf(ast) |
|
else: |
|
# Handle AST with multiple entities and relations |
|
lines = [] |
|
for entity in ast.entities: |
|
lines.append(self._entity_to_rdf(entity)) |
|
for relation in ast.relations: |
|
lines.append(self._relation_to_rdf(relation)) |
|
return '\n'.join(lines) |
|
|
|
def to_html(self, ast: Union[AST, Entity]) -> str: |
|
"""Convert AST or Entity to HTML with syntax highlighting""" |
|
if isinstance(ast, Entity): |
|
return self._entity_to_html(ast) |
|
else: |
|
# Handle AST with multiple entities and relations |
|
lines = ['<div class="mrsl-block">'] |
|
for entity in ast.entities: |
|
lines.append(self._entity_to_html(entity)) |
|
for relation in ast.relations: |
|
lines.append(self._relation_to_html(relation)) |
|
lines.append('</div>') |
|
return '\n'.join(lines) |
|
|
|
def to_latex(self, ast: Union[AST, Entity]) -> str: |
|
"""Convert AST or Entity to LaTeX""" |
|
if isinstance(ast, Entity): |
|
return self._entity_to_latex(ast) |
|
else: |
|
# Handle AST with multiple entities and relations |
|
lines = [] |
|
for entity in ast.entities: |
|
lines.append(self._entity_to_latex(entity)) |
|
for relation in ast.relations: |
|
lines.append(self._relation_to_latex(relation)) |
|
return '\n'.join(lines) |
|
|
|
def _entity_to_rdf(self, entity: Entity) -> str: |
|
"""Convert an entity to RDF (Turtle format)""" |
|
# Define prefixes in the order they should appear |
|
prefixes = [ |
|
('pers', 'http://example.org/person#'), |
|
('org', 'http://example.org/org#'), |
|
('geo', 'http://example.org/geo#'), |
|
('schema', 'http://schema.org/'), |
|
('loc', 'http://example.org/loc#'), |
|
('name', 'http://example.org/name#'), |
|
('time', 'http://example.org/time#'), |
|
('dc', 'http://purl.org/dc/terms/') |
|
] |
|
|
|
# Output prefixes in defined order |
|
lines = [] |
|
for prefix, uri in prefixes: |
|
lines.append(f'@prefix {prefix}: <{uri}> .') |
|
lines.append('') # Empty line after prefixes |
|
|
|
def entity_to_rdf(entity: Entity, depth: int = 0) -> List[str]: |
|
"""Convert an entity and its properties to RDF triples""" |
|
indent = " " * depth |
|
triples = [] |
|
|
|
# Convert @org/AcmeInc to org:AcmeInc |
|
if '/' in entity.name: |
|
prefix, name = entity.name.replace('@', '').split('/') |
|
subject = f"{prefix}:{name}" |
|
else: |
|
subject = entity.name.replace('@', '') |
|
if ':' not in subject: |
|
subject = f"pers:{subject}" |
|
|
|
for prop in entity.properties: |
|
predicate = prop.name |
|
|
|
if isinstance(prop.value, Entity): |
|
triples.append(f"{indent}{subject} {predicate} [") |
|
triples.extend(entity_to_rdf(prop.value, depth + 1)) |
|
triples.append(f"{indent}] .") |
|
else: |
|
value = f'"{str(prop.value)}"' if isinstance(prop.value, str) else prop.value |
|
triples.append(f"{indent}{subject} {predicate} {value} .") |
|
|
|
return triples |
|
|
|
lines.extend(entity_to_rdf(entity)) |
|
return '\n'.join(lines).rstrip() # Remove trailing whitespace |
|
|
|
def _relation_to_rdf(self, relation: Relation) -> str: |
|
"""Convert a relation to RDF format""" |
|
lines = [] |
|
|
|
# Convert subject |
|
if isinstance(relation.subject, Entity): |
|
ns, name = relation.subject.name.replace('@', '').split('/') |
|
subject = f"{ns}:{name}" |
|
else: # Group |
|
subject = "_:group1" # TODO: Generate unique blank node IDs |
|
|
|
# Convert predicate |
|
predicate = relation.predicate if relation.predicate else "relates" |
|
|
|
# Convert object |
|
if isinstance(relation.object, Entity): |
|
ns, name = relation.object.name.replace('@', '').split('/') |
|
obj = f"{ns}:{name}" |
|
else: # Group |
|
obj = "_:group2" # TODO: Generate unique blank node IDs |
|
|
|
# Basic relation triple |
|
lines.append(f"{subject} {predicate} {obj} .") |
|
|
|
# Add context if present |
|
if relation.context: |
|
if relation.context.temporal: |
|
lines.append(f"{subject} time:start \"{relation.context.temporal.start}\" .") |
|
if relation.context.temporal.end: |
|
lines.append(f"{subject} time:end \"{relation.context.temporal.end}\" .") |
|
if relation.context.source: |
|
ns, name = relation.context.source.name.replace('@', '').split('/') |
|
lines.append(f"{subject} dc:source {ns}:{name} .") |
|
|
|
return '\n'.join(lines) |
|
|
|
def _entity_to_html(self, entity: Entity) -> str: |
|
"""Convert an entity to HTML with syntax highlighting""" |
|
parts = ['<div class="mrsl-block">'] |
|
|
|
# Entity name |
|
parts.append(f'<span class="mrsl-entity">{entity.name}</span>') |
|
|
|
if entity.properties: |
|
parts.append('<span class="mrsl-bracket">[</span>') |
|
|
|
# Properties |
|
for i, prop in enumerate(entity.properties): |
|
parts.append('\n ') # Indent properties |
|
# Property name |
|
parts.append(f'<span class="mrsl-property">{prop.name}</span>') |
|
parts.append('<span class="mrsl-equals"> = </span>') |
|
|
|
# Property value |
|
if isinstance(prop.value, Entity): |
|
parts.append(self._entity_to_html(prop.value)) |
|
else: |
|
parts.append(f'<span class="mrsl-value">{str(prop.value)}</span>') |
|
|
|
if i < len(entity.properties) - 1: |
|
parts.append('<span class="mrsl-comma">,</span>') |
|
|
|
parts.append('\n') # Newline before closing bracket |
|
parts.append('<span class="mrsl-bracket">]</span>') |
|
|
|
parts.append('</div>') |
|
return ''.join(parts) |
|
|
|
def _relation_to_html(self, relation: Relation) -> str: |
|
"""Convert a relation to HTML with syntax highlighting""" |
|
lines = [] |
|
|
|
lines.append('<span class="mrsl-paren">(</span>') |
|
|
|
# Subject |
|
if isinstance(relation.subject, Entity): |
|
lines.append(self._entity_to_html(relation.subject)) |
|
else: # Group |
|
lines.append('<span class="mrsl-bracket">{</span>') |
|
for i, entity in enumerate(relation.subject.entities): |
|
lines.append(self._entity_to_html(entity)) |
|
if i < len(relation.subject.entities) - 1: |
|
lines.append('<span class="mrsl-comma">, </span>') |
|
lines.append('<span class="mrsl-bracket">}</span>') |
|
|
|
# Arrow and predicate |
|
if relation.predicate: |
|
lines.append(f'<span class="mrsl-arrow">-[{relation.predicate}]-></span>') |
|
else: |
|
lines.append('<span class="mrsl-arrow">-></span>') |
|
|
|
# Object |
|
if isinstance(relation.object, Entity): |
|
lines.append(self._entity_to_html(relation.object)) |
|
else: # Group |
|
lines.append('<span class="mrsl-bracket">{</span>') |
|
for i, entity in enumerate(relation.object.entities): |
|
lines.append(self._entity_to_html(entity)) |
|
if i < len(relation.object.entities) - 1: |
|
lines.append('<span class="mrsl-comma">, </span>') |
|
lines.append('<span class="mrsl-bracket">}</span>') |
|
|
|
# Context |
|
if relation.context: |
|
lines.append('<span class="mrsl-bracket">[</span>') |
|
if relation.context.temporal: |
|
lines.append(f'<span class="mrsl-property">time:start</span>') |
|
lines.append('<span class="mrsl-equals"> = </span>') |
|
lines.append(f'<span class="mrsl-value">"{relation.context.temporal.start}"</span>') |
|
if relation.context.source: |
|
if relation.context.temporal: |
|
lines.append('<span class="mrsl-comma">, </span>') |
|
lines.append(f'<span class="mrsl-property">dc:source</span>') |
|
lines.append('<span class="mrsl-equals"> = </span>') |
|
lines.append(self._entity_to_html(relation.context.source)) |
|
lines.append('<span class="mrsl-bracket">]</span>') |
|
|
|
lines.append('<span class="mrsl-paren">)</span>') |
|
|
|
return ''.join(lines) |
|
|
|
def _entity_to_latex(self, entity: Entity) -> str: |
|
"""Convert an entity to LaTeX""" |
|
def escape_latex(text: str) -> str: |
|
"""Escape special LaTeX characters""" |
|
# Handle #1 as a special case first |
|
text = str(text).replace('#1', '\\#1') |
|
|
|
replacements = [ |
|
('\\', '\\textbackslash{}'), |
|
('_', '\\_'), |
|
('$', '\\$'), |
|
('%', '\\%'), |
|
('&', '\\&'), |
|
('{', '\\{'), |
|
('}', '\\}'), |
|
('~', '\\textasciitilde{}'), |
|
('^', '\\textasciicircum{}'), |
|
('#', '\\#') |
|
] |
|
|
|
for char, replacement in replacements: |
|
text = text.replace(char, replacement) |
|
return text |
|
|
|
content = [] |
|
|
|
# Entity name |
|
content.append(escape_latex(entity.name)) |
|
|
|
if entity.properties: |
|
content.append('[') |
|
prop_parts = [] |
|
|
|
for prop in entity.properties: |
|
if isinstance(prop.value, Entity): |
|
value = self._entity_to_latex(prop.value) |
|
else: |
|
value = escape_latex(str(prop.value)) |
|
prop_parts.append(f"{escape_latex(prop.name)} = {value}") |
|
|
|
content.append(', '.join(prop_parts)) |
|
content.append(']') |
|
|
|
# Wrap in lstlisting environment |
|
latex = [ |
|
'\\begin{lstlisting}[language=MRSL,', |
|
'basicstyle=\\ttfamily,', |
|
'keywordstyle=\\color{blue},', |
|
'stringstyle=\\color{purple},', |
|
'commentstyle=\\color{green},', |
|
'breaklines=true,', |
|
'showstringspaces=false]', |
|
''.join(content), |
|
'\\end{lstlisting}' |
|
] |
|
|
|
return '\n'.join(latex) |
|
|
|
def _relation_to_latex(self, relation: Relation) -> str: |
|
"""Convert a relation to LaTeX""" |
|
parts = [] |
|
|
|
# Convert subject |
|
if isinstance(relation.subject, Entity): |
|
parts.append(self._entity_to_latex(relation.subject)) |
|
else: # Group |
|
parts.append('\\{') |
|
for i, entity in enumerate(relation.subject.entities): |
|
parts.append(self._entity_to_latex(entity)) |
|
if i < len(relation.subject.entities) - 1: |
|
parts.append(', ') |
|
parts.append('\\}') |
|
|
|
# Add arrow with predicate |
|
parts.append(f' -[{relation.predicate}]-> ') |
|
|
|
# Convert object |
|
if isinstance(relation.object, Entity): |
|
parts.append(self._entity_to_latex(relation.object)) |
|
else: # Group |
|
parts.append('\\{') |
|
for i, entity in enumerate(relation.object.entities): |
|
parts.append(self._entity_to_latex(entity)) |
|
if i < len(relation.object.entities) - 1: |
|
parts.append(', ') |
|
parts.append('\\}') |
|
|
|
return ''.join(parts) |
|
|
|
def _context_to_latex(self, context: Optional[Context]) -> str: |
|
"""Convert a context to LaTeX""" |
|
if context is None: |
|
return '' |
|
|
|
lines = [] |
|
if context.temporal: |
|
lines.append(f" time: {context.temporal.start} {context.temporal.end} .") |
|
if context.source: |
|
lines.append(f" dc:source {context.source.name} .") |
|
|
|
return '\n'.join(lines) |