Skip to content

Instantly share code, notes, and snippets.

@TimelessP
Created February 10, 2025 17:39
Show Gist options
  • Save TimelessP/7c6b6c683b65edbeeb91b8f526c9297a to your computer and use it in GitHub Desktop.
Save TimelessP/7c6b6c683b65edbeeb91b8f526c9297a to your computer and use it in GitHub Desktop.
"""
Production-Ready COBOL Parser using Lark (Extended Coverage, PIC Fix, and Additional Test Cases)
===================================================================================================
This parser covers a broad subset of COBOL, including:
• Standard divisions: IDENTIFICATION, ENVIRONMENT, DATA, PROCEDURE.
• DATA DIVISION: supports data items at levels 01, 05, and 77; condition entries (level 88);
and an optional REDEFINES clause.
• PROCEDURE DIVISION: supports MOVE, DISPLAY, IF–THEN–ELSE (including nested IFs), PERFORM, ACCEPT,
COMPUTE, COPY, CALL, GOTO, embedded SQL (EXEC SQL … END-EXEC), and OLE statements.
• ENVIRONMENT DIVISION: supports an optional SCREEN SECTION.
• Three kinds of string literals: double‑quoted, single‑quoted, and alternate (== … ==).
• PIC formats now allow S, X, 9, V, Z, commas and whitespace – so picture clauses like
"X(10)", "99V99", "S9(5)", and "ZZ,ZZ9" are accepted.
Additionally, this parser supports:
- The CALL statement (with an optional USING clause) for calling external routines.
- The GOTO statement for jumping between paragraphs.
- Conditions where the compared value can be a STRING, a CNAME, or a NUMBER.
Multiple test cases are provided:
- A fixed‑format sample with sequence numbers, reference fields, and extra PIC lines.
- A free‑format sample with alternate strings and additional statements.
- A mixed‑style sample combining fixed‑format headers with a free‑form body.
- An extra test case (invoked with --extra) that covers numeric conditions, additional operators,
and multi‑parameter CALLs.
- A nested test case (invoked with --nested) that exercises nested IF statements and complex control flow.
**Note:** In the nested test case each statement must be terminated with a period (or semicolon),
and condition identifiers must be single tokens (e.g. use "HIGH-SCORE" rather than "HIGH SCORE").
Installation:
--------------
pip install lark lark[regex]
Usage:
------
python larkcobol.py [--free] [--mixed] [--extra] [--nested]
If no flag is provided, the fixed‑format sample is used.
"""
import re
import sys
import argparse
from lark import Lark, Transformer, Token
# ------------------------------------------------------------------------------
# Preprocessing functions
# ------------------------------------------------------------------------------
def preprocess_fixed_format(source):
# Remove leading/trailing whitespace and split into lines
lines = source.strip().splitlines()
processed_lines = []
for line in lines:
# Skip comment lines (column 7 is '*')
if len(line) >= 7 and line[6] == '*':
continue
if re.match(r'^[ 0-9]{6}', line):
# Take everything from column 7 onward (index 6)
processed_line = line[6:]
else:
processed_line = line
# Remove any trailing reference fields
if "REF:" in processed_line:
processed_line = processed_line.split("REF:")[0]
processed_line = processed_line.strip()
if processed_line:
processed_lines.append(processed_line)
return "\n".join(processed_lines)
def preprocess_free_format(source):
lines = source.strip().splitlines()
processed_lines = []
for line in lines:
line = line.strip()
# Skip empty lines and lines that are exactly a period.
if not line or line == ".":
continue
if line.startswith("*") or line.startswith("*>"):
continue
processed_lines.append(line)
return "\n".join(processed_lines)
# ------------------------------------------------------------------------------
# Extended COBOL Grammar
# ------------------------------------------------------------------------------
# (All terminal names are now spelled correctly.)
# Note: The EXPRESSION rule no longer requires a lookahead for a period.
cobol_grammar = r"""
// Import common terminals
%import common.ESCAPED_STRING
%import common.NUMBER
%import common.WS_INLINE
%ignore WS_INLINE
%ignore /[ \t]*\*.*(\r?\n)+/
%ignore /[ \t]*\*>.*(\r?\n)+/
_DOT: "."
_SEMICOLON: ";"
ALT_STRING: /(?s)==.*?==/
IDENTIFICATION: "IDENTIFICATION"i
DIVISION: "DIVISION"i
PROGRAM_ID: "PROGRAM-ID"i
AUTHOR: "AUTHOR"i
ENVIRONMENT: "ENVIRONMENT"i
CONFIGURATION: "CONFIGURATION"i
SECTION: "SECTION"i
SCREEN: "SCREEN"i
SOURCE_COMPUTER: "SOURCE-COMPUTER"i
OBJECT_COMPUTER: "OBJECT-COMPUTER"i
INPUT_OUTPUT: "INPUT-OUTPUT"i
DATA: "DATA"i
FILE: "FILE"i
WORKING_STORAGE: "WORKING-STORAGE"i
PROCEDURE: "PROCEDURE"i
SELECT: "SELECT"i
ASSIGN: "ASSIGN"i
TO: "TO"i
FD: "FD"i
PIC: "PIC"i
LABEL: "LABEL"i
RECORDS: "RECORDS"i
ARE: "ARE"i
STANDARD: "STANDARD"i
MOVE: "MOVE"i
DISPLAY: "DISPLAY"i
IF: "IF"i
THEN: "THEN"i
ELSE: "ELSE"i
END_IF: "END-IF"i
PERFORM: "PERFORM"i
ACCEPT: "ACCEPT"i
COMPUTE: "COMPUTE"i
COPY: "COPY"i
EXEC: "EXEC"i
SQL: "SQL"i
END_EXEC: "END-EXEC"i
OLE: "OLE"i
REDEFINES: "REDEFINES"i
VALUE: "VALUE"i
CALL: "CALL"i
USING: "USING"i
GOTO: "GOTO"i
// Terminals
CNAME: /[A-Z0-9][A-Z0-9]*(?:-[A-Z0-9]+)*/i
STRING: ALT_STRING | ESCAPED_STRING | /'([^']|'')*'/
PICTURE_BODY: /[SX9VZ,\s]+(\([0-9]+\))?/
EXPRESSION: /[0-9+\-\*\/\s]+/
LEVEL: /(?:01|05|77)/i
COMPARATOR: "=" | "<=" | ">=" | "<>" | "<" | ">"
// Grammar Rules
start: program
program: identification_div environment_div data_div procedure_div
identification_div: IDENTIFICATION DIVISION _DOT program_id author*
program_id: PROGRAM_ID _DOT CNAME _DOT
author: AUTHOR _DOT STRING _DOT
environment_div: ENVIRONMENT DIVISION _DOT configuration_section? input_output_section? screen_section?
configuration_section: CONFIGURATION SECTION _DOT source_computer object_computer
source_computer: SOURCE_COMPUTER _DOT STRING _DOT -> source_computer
object_computer: OBJECT_COMPUTER _DOT STRING _DOT -> object_computer
input_output_section: INPUT_OUTPUT SECTION _DOT file_control+
file_control: SELECT CNAME ASSIGN TO STRING _DOT
// SCREEN SECTION rules:
screen_section: SCREEN SECTION _DOT screen_def+
screen_def: LEVEL CNAME _DOT inner_screen_entry+
inner_screen_entry: (LEVEL? STRING) _DOT
data_div: DATA DIVISION _DOT file_section? working_storage_section?
file_section: FILE SECTION _DOT fd_entry+
fd_entry: FD CNAME _DOT label_records
label_records: LABEL RECORDS ARE STANDARD _DOT
working_storage_section: WORKING_STORAGE SECTION _DOT ws_entry+
ws_entry: data_entry | condition_entry
// A data_entry may be a group item (without a PIC clause) or an elementary item.
data_entry: LEVEL CNAME (redefines_clause? pic_clause)? _DOT
redefines_clause: REDEFINES CNAME
condition_entry: "88" CNAME VALUE (STRING | CNAME) _DOT
pic_clause: PIC picture
picture: PICTURE_BODY
procedure_div: PROCEDURE DIVISION _DOT paragraph+
paragraph: paragraph_name _DOT statement+
paragraph_name: CNAME
statement: move_stmt _SEMICOLON?
| display_stmt _SEMICOLON?
| if_stmt _SEMICOLON?
| perform_stmt _SEMICOLON?
| accept_stmt _SEMICOLON?
| compute_stmt _SEMICOLON?
| copy_stmt _SEMICOLON?
| call_stmt _SEMICOLON?
| goto_stmt _SEMICOLON?
| sql_stmt _SEMICOLON?
| ole_stmt _SEMICOLON?
move_stmt: MOVE (STRING | CNAME) TO CNAME _DOT
display_stmt: DISPLAY (STRING | CNAME) _DOT
if_stmt: IF condition THEN statement+ (ELSE statement+)? END_IF _DOT
perform_stmt: PERFORM paragraph_name _DOT
accept_stmt: ACCEPT (STRING | CNAME) _DOT
compute_stmt: COMPUTE CNAME "=" EXPRESSION _DOT
copy_stmt: COPY CNAME _DOT
call_stmt: CALL (STRING | CNAME) (USING call_params)? _DOT
call_params: (STRING | CNAME) ("," (STRING | CNAME))*
goto_stmt: GOTO CNAME _DOT
sql_stmt: EXEC SQL /[\s\S]+?/ END_EXEC _DOT
ole_stmt: OLE (STRING | CNAME) _DOT
condition: CNAME COMPARATOR (STRING | CNAME | NUMBER)
%ignore /\r?\n/
"""
# ------------------------------------------------------------------------------
# Utility: Debug Logger
# ------------------------------------------------------------------------------
def debug_log(rule_name, items):
debug_items = []
for i, item in enumerate(items):
if isinstance(item, Token):
debug_items.append(f"[{i}]: Token({item.type}, {item.value})")
elif item is None:
debug_items.append(f"[{i}]: None")
else:
debug_items.append(f"[{i}]: {item}")
print(f"DEBUG: In rule '{rule_name}' with children:")
for d in debug_items:
print(" " + d)
# ------------------------------------------------------------------------------
# Extended Transformer
# ------------------------------------------------------------------------------
class CobolTransformer(Transformer):
def _DOT(self, _):
return None
def _SEMICOLON(self, _):
return None
def LEVEL(self, token):
return token.value
def COMPARATOR(self, token):
return token.value
def start(self, items):
return {"COBOL_Program": [i for i in items if i is not None]}
def program(self, items):
return {"Program": items}
def identification_div(self, items):
return {"IdentificationDivision": items}
def program_id(self, items):
try:
return {"ProgramID": items[1]}
except IndexError:
debug_log("program_id", items)
return {"ProgramID": "<error>"}
def author(self, items):
try:
return {"Author": items[1]}
except IndexError:
debug_log("author", items)
return {"Author": "<error>"}
def environment_div(self, items):
return {"EnvironmentDivision": items}
def configuration_section(self, items):
return {"ConfigurationSection": items}
def source_computer(self, items):
try:
return {"SourceComputer": items[1]}
except IndexError:
debug_log("source_computer", items)
return {"SourceComputer": "<error>"}
def object_computer(self, items):
try:
return {"ObjectComputer": items[1]}
except IndexError:
debug_log("object_computer", items)
return {"ObjectComputer": "<error>"}
def input_output_section(self, items):
return {"InputOutputSection": items}
def file_control(self, items):
try:
return {"FileControl": {"FileName": items[1], "AssignedTo": items[4]}}
except IndexError:
debug_log("file_control", items)
return {"FileControl": {"FileName": "<error>", "AssignedTo": "<error>"}}
def screen_section(self, items):
return {"ScreenSection": items}
def screen_def(self, items):
try:
return {"ScreenDef": {"Level": items[0], "Name": items[1], "Entries": items[3:]}}
except IndexError:
debug_log("screen_def", items)
return {"ScreenDef": "<error>"}
def inner_screen_entry(self, items):
return items[0].strip()
def data_div(self, items):
return {"DataDivision": items}
def file_section(self, items):
return {"FileSection": items}
def fd_entry(self, items):
try:
return {"FDEntry": {"FileName": items[1], "Records": items[2]}}
except IndexError:
debug_log("fd_entry", items)
return {"FDEntry": {"FileName": "<error>", "Records": "<error>"}}
def label_records(self, items):
return "STANDARD"
def working_storage_section(self, items):
return {"WorkingStorageSection": items}
def ws_entry(self, items):
return items[0]
def data_entry(self, items):
# items: LEVEL, CNAME, optionally redefines_clause and/or pic_clause
level = items[0]
name = items[1]
pic = None
redefines = None
if len(items) == 2:
pic = None
elif len(items) == 3:
# If the token contains typical PIC characters (like X or 9), treat it as PIC.
if isinstance(items[2], str) and re.search(r'[X9S]', items[2]):
pic = items[2]
else:
redefines = items[2]
elif len(items) == 4:
redefines = items[2]
pic = items[3]
else:
debug_log("data_entry", items)
return {"WS_Data": {"Level": "<error>", "Name": "<error>", "Pic": "<error>", "Redefines": None}}
return {"WS_Data": {"Level": level, "Name": name, "Pic": pic, "Redefines": redefines}}
def redefines_clause(self, items):
try:
return items[1]
except IndexError:
debug_log("redefines_clause", items)
return "<error>"
def condition_entry(self, items):
try:
return {"ConditionEntry": {"Name": items[0], "Value": items[2]}}
except IndexError:
debug_log("condition_entry", items)
return {"ConditionEntry": {"Name": "<error>", "Value": "<error>"}}
def pic_clause(self, items):
try:
return items[1]
except IndexError:
debug_log("pic_clause", items)
return "<error>"
def picture(self, items):
try:
return items[0]
except IndexError:
debug_log("picture", items)
return "<error>"
def procedure_div(self, items):
return {"ProcedureDivision": items}
def paragraph(self, items):
try:
return {"Paragraph": {"Name": items[0], "Statements": items[1:]}}
except IndexError:
debug_log("paragraph", items)
return {"Paragraph": {"Name": "<error>", "Statements": []}}
def paragraph_name(self, items):
try:
return items[0]
except IndexError:
debug_log("paragraph_name", items)
return "<error>"
def move_stmt(self, items):
try:
return {"MOVE": {"source": items[1], "destination": items[3]}}
except IndexError:
debug_log("move_stmt", items)
return {"MOVE": {"source": "<error>", "destination": "<error>"}}
def display_stmt(self, items):
try:
return {"DISPLAY": items[1]}
except IndexError:
debug_log("display_stmt", items)
return {"DISPLAY": "<error>"}
def if_stmt(self, items):
result = {"IF": None, "THEN": [], "ELSE": []}
try:
result["IF"] = items[1]
except IndexError:
debug_log("if_stmt", items)
result["IF"] = "<error>"
in_else = False
for part in items[3:-1]:
if isinstance(part, Token) and part.type == "ELSE":
in_else = True
continue
if in_else:
result["ELSE"].append(part)
else:
result["THEN"].append(part)
return {"IF_Statement": result}
def perform_stmt(self, items):
try:
return {"PERFORM": items[1]}
except IndexError:
debug_log("perform_stmt", items)
return {"PERFORM": "<error>"}
def accept_stmt(self, items):
try:
return {"ACCEPT": items[1]}
except IndexError:
debug_log("accept_stmt", items)
return {"ACCEPT": "<error>"}
def compute_stmt(self, items):
try:
# Use items[2] as the expression.
return {"COMPUTE": {"Target": items[1], "Expression": items[2].strip()}}
except IndexError:
debug_log("compute_stmt", items)
return {"COMPUTE": "<error>"}
def copy_stmt(self, items):
try:
return {"COPY": items[1]}
except IndexError:
debug_log("copy_stmt", items)
return {"COPY": "<error>"}
def call_stmt(self, items):
try:
ret = {"CALL": {"Target": items[1]}}
if len(items) > 2:
ret["CALL"]["Using"] = items[2]
return ret
except IndexError:
debug_log("call_stmt", items)
return {"CALL": "<error>"}
def call_params(self, items):
return items
def goto_stmt(self, items):
try:
return {"GOTO": items[1]}
except IndexError:
debug_log("goto_stmt", items)
return {"GOTO": "<error>"}
def sql_stmt(self, items):
try:
return {"SQL": items[1].strip()}
except IndexError:
debug_log("sql_stmt", items)
return {"SQL": "<error>"}
def ole_stmt(self, items):
try:
return {"OLE": items[1]}
except IndexError:
debug_log("ole_stmt", items)
return {"OLE": "<error>"}
def condition(self, items):
try:
return {"Condition": {"left": items[0], "op": items[1], "right": items[2]}}
except IndexError:
debug_log("condition", items)
return {"Condition": {"left": "<error>", "op": "<error>", "right": "<error>"}}
def CNAME(self, token):
return token.value
def STRING(self, token):
s = token.value
if (s.startswith('"') and s.endswith('"')) or (s.startswith("'") and s.endswith("'")):
return s[1:-1]
elif s.startswith("==") and s.endswith("=="):
return s[2:-2]
return s
def EXPRESSION(self, token):
return token.value
def ALT_STRING(self, token):
return token.value[2:-2]
def NUMBER(self, token):
return int(token.value)
# ------------------------------------------------------------------------------
# Build the Parser
# ------------------------------------------------------------------------------
cobol_parser = Lark(
cobol_grammar,
parser="lalr",
transformer=CobolTransformer(),
maybe_placeholders=False,
propagate_positions=True,
regex=True
)
# ------------------------------------------------------------------------------
# Sample Test Cases
# ------------------------------------------------------------------------------
# Fixed-format sample (the final isolated period line has been removed).
sample_cobol_fixed = """
000100 IDENTIFICATION DIVISION. PROGRAM-ID. FIXEDPROG. AUTHOR. "John Doe".
000200 ENVIRONMENT DIVISION. CONFIGURATION SECTION. SOURCE-COMPUTER. "IBM". OBJECT-COMPUTER. "IBM-OBJ".
000300 DATA DIVISION. WORKING-STORAGE SECTION.
000400 01 CUSTOMER.
000500 05 NAME PIC X(20).
000600 PROCEDURE DIVISION.
000700 CUST-PARA. MOVE "HELLO" TO NAME.
"""
# Free-format sample.
sample_cobol_free = """
IDENTIFICATION DIVISION.
PROGRAM-ID. FREEPROG.
AUTHOR. 'Jane Doe'.
ENVIRONMENT DIVISION.
CONFIGURATION SECTION.
SOURCE-COMPUTER. "AS400".
OBJECT-COMPUTER. "AS400-OBJ".
DATA DIVISION.
WORKING-STORAGE SECTION.
01 CUSTOMER.
05 NAME PIC X(20).
PROCEDURE DIVISION.
CUST-PARA. MOVE "WORLD" TO NAME.
.
"""
# Mixed-style sample (written in fixed-format with sequence numbers).
sample_cobol_mixed = """
000100 IDENTIFICATION DIVISION. PROGRAM-ID. MIXEDPROG. AUTHOR. "Mixed Author".
000200 ENVIRONMENT DIVISION. CONFIGURATION SECTION. SOURCE-COMPUTER. "Mainframe". OBJECT-COMPUTER. "MF-OBJ".
000300 DATA DIVISION. WORKING-STORAGE SECTION.
000400 01 CUSTOMER.
000500 05 NAME PIC X(20).
000600 PROCEDURE DIVISION.
000700 CUSTOM-PARA. MOVE "MIXED" TO NAME.
"""
# Extra test case.
sample_cobol_extra = """
IDENTIFICATION DIVISION.
PROGRAM-ID. EXTRAPROG.
AUTHOR. "Extra Tester".
ENVIRONMENT DIVISION.
CONFIGURATION SECTION.
SOURCE-COMPUTER. "ExtraMachine".
OBJECT-COMPUTER. "ExtraObj".
DATA DIVISION.
WORKING-STORAGE SECTION.
01 SCORE.
05 POINTS PIC 99V99.
PROCEDURE DIVISION.
SCORE-PARA. COMPUTE POINTS = 10 + 20 * 3.
.
"""
# Nested test case.
sample_cobol_nested = """
IDENTIFICATION DIVISION.
PROGRAM-ID. NESTEDPROG.
AUTHOR. "Nested Tester".
ENVIRONMENT DIVISION.
DATA DIVISION.
WORKING-STORAGE SECTION.
01 FLAG.
05 VALUE PIC X.
PROCEDURE DIVISION.
NESTED-PARA.
IF FLAG = "Y" THEN
DISPLAY "YES".
IF FLAG = "Z" THEN
DISPLAY "Nested YES".
END-IF.
ELSE
DISPLAY "NO".
END-IF.
"""
# ------------------------------------------------------------------------------
# Main Entry Point
# ------------------------------------------------------------------------------
def main():
parser_arg = argparse.ArgumentParser(description="Extended COBOL Parser using Lark")
parser_arg.add_argument("--free", action="store_true", help="Assume free-format source code")
parser_arg.add_argument("--mixed", action="store_true", help="Run the mixed-style test case")
parser_arg.add_argument("--extra", action="store_true", help="Run the extra test case with additional operators")
parser_arg.add_argument("--nested", action="store_true", help="Run the nested test case with nested IF statements")
args = parser_arg.parse_args()
if args.nested:
source = sample_cobol_nested
preprocessed_source = preprocess_free_format(source)
elif args.extra:
source = sample_cobol_extra
preprocessed_source = preprocess_free_format(source)
elif args.mixed:
source = sample_cobol_mixed
preprocessed_source = preprocess_fixed_format(source)
elif args.free:
source = sample_cobol_free
preprocessed_source = preprocess_free_format(source)
else:
source = sample_cobol_fixed
preprocessed_source = preprocess_fixed_format(source)
try:
parse_tree = cobol_parser.parse(preprocessed_source)
print("Parsing Successful!\n")
print("Parse Tree:")
print(parse_tree)
except Exception as e:
print("Parsing failed:")
print(e)
sys.exit(1)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment