Last active
October 21, 2019 07:26
-
-
Save yogesh-aggarwal/40dcd3650e7bb5851f1dd100b790c6e7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
CMS: Content Management System | |
Manages content specifically for a blogging website which have many posts & requires to update its content on daily basis. | |
This project is based on functional approach & specifically designed for processing huge amount of data. | |
The project supports all types of CRUD operations. | |
-- Note: Designed for School project -- | |
Name: Yogesh | |
Class: XII-B | |
Roll. no.: 34 | |
""" | |
from datetime import datetime | |
from os import getpid as pid, remove | |
from webbrowser import open_new_tab as web | |
from hashlib import sha512 | |
from colorama import init | |
init() | |
default = "data.msp" | |
schemaSeperate = "^" * 100 | |
dataSeperate = "%" * 100 | |
warning = True | |
datab = input(f"Database ({default}): ") | |
database = datab if datab else default | |
def sPrint(s, fg, bg): | |
""" | |
For special printing purposes. | |
""" | |
print(f"\033[1;{fg};{bg}m{s}\033[0m") | |
def create(flag="", execUser=True): | |
""" | |
Creates entry in database. | |
""" | |
try: | |
data = fetchData() | |
except Exception: | |
pass | |
while True: | |
try: | |
newData = [] | |
title = input(f"Title: ").strip() | |
newData.append(title if title else "Title") | |
content = input(f"Content: ").strip() | |
newData.append(content if content else "Content") | |
date = input(f"Date: ").strip() | |
newData.append(date if date else "Date") | |
author = input(f"Author: ").strip() | |
newData.append(author if author else "Author") | |
data.append(newData) | |
with open(database) as f: | |
readData = f.readlines()[ | |
schemaSetup(updateTime=True)[0] : schemaSetup(updateTime=True)[1] | |
+ 1 | |
] | |
final = [] | |
for x in data: | |
new = [f"{x[0]}\n", f"{x[1]}\n", f"{x[2]}\n", f"{x[3]}\n"] | |
final.extend(new) | |
final.append(f"{dataSeperate}\n") | |
final.insert(0, f"{dataSeperate}\n") | |
final = readData + final | |
with open(database, "w+") as f: | |
[f.write(x) for x in final] | |
print(f"Successfully created the record! (was {pid()})") | |
break | |
except KeyboardInterrupt: | |
print(f"\nOperation cancelled (was {pid()})") | |
break | |
except Exception: | |
print("Invalid input!") | |
continue | |
def show(block=True, flag="", execUser=True): | |
""" | |
Displays the existing enries in the database | |
""" | |
data = fetchData() | |
if len(data) != 0: | |
_count = 0 | |
for i, x in enumerate(data): | |
if _count != flag: | |
print( | |
f"|</>| POST {1+i} |</>|\n\tTitle: {x[0]}\n\tContent: {x[1]}\n\tDate published: {x[2]}\n\tAuthor: {x[3]}" | |
) | |
_count += 1 | |
else: | |
_count = 1 | |
try: | |
cont = input("-- More -- ") | |
if cont == "": | |
continue | |
else: | |
break | |
except KeyboardInterrupt: | |
break | |
else: | |
print("No posts to show!") | |
main() if block else False | |
def view(flag="", execUser=True): | |
""" | |
Creates a post view for an existing post (provided the index). | |
""" | |
data = fetchData() | |
if len(data) != 0: | |
try: | |
if execUser and flag <= len(data): | |
post = data[flag - 1] | |
print( | |
f"</>|Post View|</>\n\tTitle: {post[0]}\n\tContent: {post[1]}\n\tDate: {post[2]}\n\tAuthor: {post[2]}" | |
) | |
else: | |
show(flag=flag) | |
except Exception: | |
print("No posts to show") | |
else: | |
print("No posts to show!") | |
def update(flag="", execUser=True): | |
""" | |
Updates the value for a post in the database. | |
""" | |
try: | |
data = fetchData() | |
if len(data) != 0: | |
if not execUser: | |
show(block=False, flag=20) | |
except Exception: | |
pass | |
while True: | |
try: | |
if len(data) != 0: | |
index = flag | |
if not flag or flag > len(data): | |
index = int(input("\nPost number (Ctrl+C to cancel): ")) | |
post = data[index - 1] | |
newData = [] | |
title = input(f"Title ({post[0]}): ") | |
newData.append(title if title else post[0]) | |
content = input(f"Content ({post[1]}): ") | |
newData.append(content if content else post[1]) | |
date = input(f"Date ({post[2]}): ") | |
newData.append(date if date else post[2]) | |
author = input(f"Author ({post[3]}): ") | |
newData.append(author if author else post[3]) | |
if newData == post: | |
print("No data to be updated!\n") | |
else: | |
data[index - 1] = newData | |
with open(database) as f: | |
readData = f.readlines()[ | |
schemaSetup(updateTime=True)[0] : schemaSetup( | |
updateTime=True | |
)[1] | |
+ 1 | |
] | |
final = [] | |
for x in data: | |
new = [f"{x[0]}\n", f"{x[1]}\n", f"{x[2]}\n", f"{x[3]}\n"] | |
final.extend(new) | |
final.append(f"{dataSeperate}\n") | |
final.insert(0, f"{dataSeperate}\n") | |
final = readData + final | |
with open(database, "w+") as f: | |
[f.write(x) for x in final] | |
print(f"Successfully updated the record! (was {pid()})") | |
else: | |
print("No posts to update!") | |
break | |
except KeyboardInterrupt: | |
print(f"\nOperation cancelled (was {pid()})") | |
break | |
except Exception: | |
print("Invalid input!") | |
break | |
# continue | |
def delete(flag="", execUser=True): | |
""" | |
Deletes entry in database | |
""" | |
try: | |
data = fetchData() | |
if len(data) != 0: | |
if not execUser: | |
show(block=False, flag=20) | |
except Exception: | |
pass | |
while True: | |
try: | |
if len(data) != 0: | |
index = flag | |
if not flag or flag > len(data): | |
index = int(input("\nPost number (Ctrl+C to cancel): ")) | |
data.pop(index - 1) | |
confirm = input("Are you sure? (Y/n): ").lower() | |
if "y" in confirm: | |
with open(database) as f: | |
readData = f.readlines()[ | |
schemaSetup(updateTime=True)[0] : schemaSetup( | |
updateTime=True | |
)[1] | |
+ 1 | |
] | |
final = [] | |
for x in data: | |
new = [f"{x[0]}\n", f"{x[1]}\n", f"{x[2]}\n", f"{x[3]}\n"] | |
final.extend(new) | |
final.append(f"{dataSeperate}\n") | |
final.insert(0, f"{dataSeperate}\n") | |
final = readData + final | |
with open(database, "w+") as f: | |
[f.write(x) for x in final] | |
print("Successfully deleted the record!") | |
else: | |
print(f"Delete operation cancelled (was {pid()})") | |
else: | |
print("No posts to delete!") | |
break | |
except KeyboardInterrupt: | |
print(f"\nOperation cancelled (was {pid()})") | |
break | |
except Exception: | |
print("Invalid input!") | |
continue | |
def count(flag="", execUser=True): | |
""" | |
Counts the no. of posts in the database. | |
""" | |
print(f"There are {len(fetchData())} posts in the database\n") | |
def export(flag="", execUser=True): | |
""" | |
Helps in exporting post to different format. | |
""" | |
def setupData(extension): | |
""" | |
Some work related to file name setup | |
""" | |
try: | |
data = fetchData() | |
if len(data) != 0: | |
if not execUser: | |
show(block=False, flag=20) | |
except Exception: | |
pass | |
while True: | |
try: | |
index = int(input("\nPost possition (Ctrl+C to cancel): ")) | |
if not index: | |
continue | |
post = data[index - 1] | |
break | |
except KeyboardInterrupt: | |
print(f"\nOperation cancelled (was {pid()})") | |
break | |
except Exception: | |
print("Invalid input!") | |
while True: | |
try: | |
file = input("File name: ") | |
if not file: | |
continue | |
break | |
except KeyboardInterrupt: | |
print(f"\nOperation cancelled (was {pid()})") | |
break | |
except Exception: | |
print("Invalid input!") | |
while True: | |
try: | |
if not file.endswith(extension): | |
file += extension | |
try: | |
try: | |
with open(file): | |
pass | |
raise FileExistsError | |
except FileExistsError: | |
raise FileExistsError | |
except Exception: | |
return file, post | |
except FileExistsError: | |
choice = input("File already exists, overite (Y/n): ") | |
if "y" in choice: | |
remove(file) | |
continue | |
break | |
except KeyboardInterrupt: | |
print(f"\nOperation cancelled (was {pid()})") | |
break | |
except Exception: | |
print("Invalid input!") | |
def json(): | |
""" | |
Exports to JSON. | |
""" | |
file, post = setupData(".json") | |
data = dict(zip(["title", "content", "date", "author"], post)) | |
final = [] | |
for x in data: | |
final.append(f'"{x}": "{data[x]}"') | |
data = "{" + ", ".join(final) + "}" | |
with open(file, "w") as f: | |
f.write(data) | |
print(f"Data was successfully written to {file} (was {pid()})") | |
def html(): | |
""" | |
Exports to HTML. | |
""" | |
file, post = setupData(".html") | |
data = f"""<html><head><title>{post[0]}</title><style>* {'{font-family: "Verdana", sans-serif}'} h5 {'{text-decoration: underline}'}</style></head><body><h1>{post[0]}</h1><p>{post[1]}</p><h5>Date created: {post[2]}</h5><h5>By {post[3]}</h5></body></html>""" | |
with open(file, "w") as f: | |
f.write(data) | |
web(file) | |
print(f"Data was successfully written to {file} (was {pid()})") | |
def text(): | |
""" | |
Exports to a plain text file. | |
""" | |
file, post = setupData(".txt") | |
data = f"Title: {post[0]}\nContent: {post[1]}\nDate: {post[2]}\nAuthor: {post[3]}\n" | |
with open(file, "w") as f: | |
f.write(data) | |
print(f"Data was successfully written to {file} (was {pid()})") | |
if len(fetchData()) != 0: | |
print("Available formats are:\n\t1) HTML\n\t2) JSON\n\t3) Text\n") | |
while True: | |
try: | |
exportType = input("Type (position) (Ctrl+C to cancel): ") | |
if not exportType: | |
continue | |
{"1": html, "2": json, "3": text}[exportType]() | |
break | |
except KeyboardInterrupt: | |
print(f"\nOperation cancelled (was {pid()})") | |
break | |
except Exception: | |
print("Invalid input!") | |
else: | |
print("No data to export...") | |
def cmsQuit(flag="", execUser=True): | |
""" | |
Halts the program with quit code 0. | |
""" | |
print("\n\nBye! See you again...") | |
quit(0) | |
def schemaSetup(databPath=database, updateTime=False): | |
""" | |
Sets up the schema for the database. | |
""" | |
if not updateTime: | |
sPrint(f"Setting up schema...\n", 34, 40) | |
with open(databPath, "a+") as db: | |
content = [ | |
f"{schemaSeperate}\n", | |
f"Time created: {datetime.now()}\n", | |
f"Last login: {datetime.now()}\n", | |
f"{schemaSeperate}\n", | |
] | |
while True: | |
try: | |
choice = input("Protect? (y/N): ") | |
if "y" in choice: | |
password = input("Password: ") | |
if not password: | |
print( | |
f"No password was assigned to the database ({database}) (was {pid()})" | |
) | |
else: | |
salt = sha512(password.encode("utf-8")).hexdigest() | |
content.insert(3, f"{password};{salt}\n") | |
print( | |
f"Password was assigned to the database ({database}) (was {pid()})" | |
) | |
sPrint(f"Hash: {salt}\n", 37, 45) | |
break | |
except KeyboardInterrupt: | |
print() | |
break | |
except Exception: | |
pass | |
db.writelines(content) | |
else: | |
with open(databPath) as db: | |
data = db.readlines() | |
data = list(map(lambda x: x.replace("\n", ""), data)) | |
if schemaSeperate not in data: | |
schemaSetup() | |
else: | |
lower = data.index(schemaSeperate) | |
upper = data.index(schemaSeperate, lower + 1) | |
if upper - lower == 3: | |
upper += 1 | |
data[lower + 2 : upper - 1] = [f"Last login: {datetime.now()}"] | |
with open(databPath, "w+") as db: | |
for line in data: | |
db.write(f"{line}\n") | |
return (lower, upper) | |
def fetchData(databPath=database, askPassword=False): | |
""" | |
Fetchs the data from the database & returns it. | |
""" | |
try: | |
with open(databPath, "r") as db: | |
dbRead = db.read() | |
if schemaSeperate not in dbRead: | |
schemaSetup() | |
else: | |
schemaSetup(updateTime=True) | |
readData = dbRead.split("\n") | |
readData = list(map(lambda x: x.replace("\n", ""), readData)) | |
limits = schemaSetup(updateTime=True) | |
data = readData.copy() | |
password = "" | |
salt = "" | |
try: | |
data = data[limits[0] + 3 : limits[1]][0].split(";") | |
if len(data) == 1: | |
if data[0] == schemaSeperate: | |
data[0] = "" | |
data = [data[0], ""] | |
password, salt = data | |
except Exception: | |
pass | |
if (not sha512(password.encode("utf-8")).hexdigest() == salt) and ( | |
salt != password | |
): | |
if warning: | |
sPrint( | |
'\nDeprecate Warning: Database manually modified! Don\'t do that, it can cause failure of data! Run "dw" command to disable the warning for current session', | |
37, | |
41, | |
) | |
while True: | |
pInp = "" | |
if password: | |
pInp = input("\nPassword: ") if askPassword else password | |
if pInp == password: | |
final = [] | |
upper = 0 | |
lower = 0 | |
for i in readData: | |
if i == dataSeperate: | |
lower = upper | |
upper = readData.index(i, lower + 1) | |
final.append(readData[lower + 1 : upper]) | |
if schemaSeperate not in dbRead: | |
fetchData() | |
if final: | |
final.pop(0) | |
try: | |
final.remove([]) | |
except Exception: | |
pass | |
return final | |
else: | |
print("Invalid password!") | |
except FileNotFoundError: | |
with open(database, "w+"): | |
pass | |
schemaSetup() | |
except Exception as e: | |
raise e | |
return [] | |
def showCommands(greet=True): | |
""" | |
Prints the available commands to the console. | |
""" | |
keywords = [ | |
"\nWelcome to CMS: Content Management System", | |
"Here're some commands you can operate:\n", | |
] | |
commands = [ | |
"create (c) : Create post", | |
"show (r) : Show all posts", | |
"update <?index> (u) : Update post", | |
"delete <?index> (d) : Delete post", | |
"view <index> (v) : View post", | |
"count (cp) : Count no. of posts", | |
"export (e) : Export post", | |
"quit (q) : Quit CMS", | |
"help (h) : Help", | |
"about (a) : About CMS", | |
] | |
final = commands.copy() | |
final = keywords + final if greet else final | |
[print(x) for x in final] | |
def disbleWarning(flag="", execUser=True): | |
""" | |
Disables warnings for the session. | |
""" | |
global warning | |
warning = False | |
print("Warnings are now disabled for this session!") | |
def about(flag="", execUser=True): | |
""" | |
Displays information about CMS. | |
""" | |
print( | |
""" | |
CMS: Content Management System | |
Manages content specifically for a blogging website which have many posts & requires to update its content on daily basis. | |
This project is based on functional approach & specifically designed for processing huge amount of data. | |
The project supports all types of CRUD operations. | |
-- Note: Designed for School project -- | |
Name: Yogesh | |
Class: XII-B | |
Roll. no.: 34 | |
""" | |
) | |
def help(flag="", execUser=True): | |
""" | |
Displays commands. | |
""" | |
print("Here're some commands you can operate:\n") | |
showCommands(greet=False) | |
keyMap = { | |
# Create | |
"c": create, | |
"create": create, | |
# Show | |
"r": show, | |
"show": show, | |
# View | |
"v": view, | |
"view": view, | |
# Update | |
"u": update, | |
"update": update, | |
# Delete | |
"d": delete, | |
"delete": delete, | |
# Count entries | |
"cp": count, | |
"count": count, | |
# Quit | |
"q": cmsQuit, | |
"quit": cmsQuit, | |
# Export | |
"e": export, | |
"export": export, | |
# About | |
"a": about, | |
"about": about, | |
# Warning | |
"dw": disbleWarning, | |
# Help | |
"h": help, | |
"help": help, | |
} | |
def main(): | |
""" | |
Takes the command & redirects to different functions. | |
""" | |
while True: | |
try: | |
command = input("\n> ").lower() | |
if not command: | |
continue | |
execUser = True | |
words = command.split(" ") | |
flag = "" | |
try: | |
command = words[0] | |
flag = int(words[1]) | |
except Exception: | |
pass | |
execUser = False if not flag else True | |
flag = 15 if not flag else flag | |
try: | |
keyMap[command](flag=flag, execUser=execUser) | |
except Exception: | |
print("Invalid command!") | |
except KeyboardInterrupt: | |
cmsQuit() | |
if __name__ == "__main__": | |
""" | |
Main block of program. | |
""" | |
sPrint(f'\nUsing "{database}" as database', 34, 40) | |
fetchData(askPassword=True) | |
showCommands() | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment