Created
June 27, 2020 10:10
-
-
Save rizemon/4200e1ff77c160c54e2b12d1efd755a6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import argparse | |
import csv | |
from os.path import isfile | |
from simplewa import SimpleWhatsapp | |
from sys import exit | |
from jinja2 import Template | |
from typing import Union | |
from time import sleep | |
class CustomException(Exception): | |
def __init__(self, message: str): | |
self.message = message | |
super().__init__(self.message) | |
def parse_args() -> argparse.Namespace: | |
parser = argparse.ArgumentParser(description="WhatsApp Personalised Message Blaster") | |
parser.add_argument("-d","--driver", help="Path of chromedriver.exe", default="") | |
parser.add_argument("-c","--csv", help="Path of .csv file containing contact details", default="") | |
parser.add_argument("-t","--template", help="Path of file containing the message template", default="") | |
parser.add_argument("--test", action='store_true' ,help="Prints example message produced from the given template", default=False) | |
return parser.parse_args() | |
def read_csv(csv_path: str) -> (list, list): | |
"""Reads from given .csv file and returns a tuple(list of column names, list of rows)""" | |
column_names = [] | |
rows = [] | |
with open(csv_path, "r") as csvfile: | |
csvreader = csv.DictReader(csvfile) | |
column_names = csvreader.fieldnames or column_names | |
rows = [row for row in csvreader] | |
csvfile.close() | |
return column_names, rows | |
def render_message(template_path: str, row: dict) -> str: | |
"""Returns rendered template""" | |
template = None | |
with open(template_path, "r") as templatefile: | |
template = Template(templatefile.read()) | |
templatefile.close() | |
return template.render(row) | |
def validate_number(number: Union[int, str]) -> int: | |
"""Returns extracted handphone number""" | |
if isinstance(number, str): | |
number = "".join([c for c in number if c.isdigit()]) | |
try: | |
return int(number) | |
except: | |
return 0 | |
def blast_messages(driver_path: str, csv_path: str, template_path: str, test: bool) -> None: | |
"""Send messages personalized using given template""" | |
if not driver_path or not isfile(driver_path): | |
raise FileNotFoundError("[X] Driver cannot be found".format(driver_path)) | |
if not csv_path or not isfile(csv_path): | |
raise FileNotFoundError("[X] CSV cannot be found".format(csv_path)) | |
if not template_path or not isfile(template_path): | |
raise FileNotFoundError("[X] Template cannot be found".format(template_path)) | |
column_names, rows = read_csv(csv_path) | |
if not column_names or not rows: | |
raise CustomException("[X] Error while reading CSV file") | |
number_column = "" | |
while True: | |
number_column = input("[*] Select the field containing handphone number {}: ".format(column_names)).strip() | |
if number_column in column_names: | |
for row in rows: | |
number = validate_number(row[number_column]) | |
if not number: | |
raise CustomException("[X] Unable to parse handphone number: {}".format(row[number_column])) | |
row[number_column] = number | |
break | |
print("[X] Invalid column!") | |
if test: | |
print("----- Example Message -----") | |
print(render_message(template_path, rows[0])) | |
return | |
whatsapp_client = SimpleWhatsapp(driver_path) | |
for row in rows: | |
number = row[number_column] | |
message = render_message(template_path, row) | |
print("[*] Sending message to {}".format(number)) | |
whatsapp_client.send_message(number=number, message=message) | |
sleep(2) | |
whatsapp_client.close() | |
def main() -> int: | |
try: | |
args = parse_args() | |
blast_messages(args.driver, args.csv, args.template, args.test) | |
return 0 | |
except KeyboardInterrupt: | |
return 0 | |
except CustomException as e: | |
print(e.message) | |
return 1 | |
if __name__ == "__main__": | |
exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment