Created
April 18, 2020 13:48
-
-
Save emrekasg/a77f71e52aec346e32bf26c3f284f8cd to your computer and use it in GitHub Desktop.
Stardox edit
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import os | |
import colors | |
import Logo | |
import argparse | |
import mysql.connector | |
from http.server import HTTPServer, BaseHTTPRequestHandler | |
mydb = mysql.connector.connect( | |
host="localhost", | |
user="root", | |
passwd="", | |
database="test" | |
) | |
mycursor = mydb.cursor() | |
mycursor.execute("CREATE TABLE IF NOT EXISTS `allinfo` (id INT AUTO_INCREMENT PRIMARY KEY, name TEXT, location TEXT, username TEXT,website TEXT,followers INT,stars INT,following INT,email TEXT,type VARCHAR(10),repositories INT)") | |
def getting_header(soup_text): | |
title = soup_text.title.get_text() | |
start = title.find('/') | |
stop = title.find(':') | |
return title[start + 1: stop] | |
def format_url(url): | |
if url.startswith('http://'): | |
url = url.replace('http', 'https') | |
elif url.startswith('www.'): | |
url = url.replace('www.', 'https://') | |
elif url.startswith('https://') or url.startswith('https://www.'): | |
pass | |
else: | |
colors.error("Enter the repositories url in given format" | |
"[ https://github.com/username/repository_name ]") | |
sys.exit(1) | |
return url | |
def verify_url(page_data): | |
data = str(page_data) | |
if "Popular repositories" in data: | |
return False | |
elif "Page not found" in data: | |
return False | |
else: | |
return True | |
def get_latest_commit(repo_name, username): | |
email = "" | |
commit_data = requests.get( | |
"https://github.com" | |
"/{}/{}/commits?author={}".format( | |
username, | |
repo_name, | |
username)).text | |
soup = BeautifulSoup(commit_data, "lxml") | |
a_tags = soup.findAll("a") | |
for a_tag in a_tags: | |
URL = a_tag.get("href") | |
if URL.startswith("/{}/{}/commit/".format(username, repo_name)): | |
label = str(a_tag.get("aria-label")) | |
if "Merge" not in label and label != "None": | |
patch_data = requests.get("https://github.com{}{}".format( | |
URL, ".patch")).text | |
try: | |
start = patch_data.index("<") | |
stop = patch_data.index(">") | |
email = patch_data[start + 1: stop] | |
except ValueError: | |
return "Not enough information." | |
break | |
if email != "": | |
return email | |
else: | |
return "Not enough information." | |
def save_info(dat='stardox'): | |
try: | |
import data | |
import csv | |
except ImportError: | |
colors.error('Error importing data module') | |
sys.exit(1) | |
if dat == 'stardox': | |
fields = ["Realname",'Username',"Website","Location", 'Repositories', 'Stars', 'Followers', | |
'Following', 'Email','Type'] | |
rows = [[0 for x in range(10)] for y in range(len(data.username_list))] | |
for row in range(len(data.username_list)): | |
rows[row][0] = data.realname_list[row] | |
rows[row][1] = '@' + data.username_list[row] | |
rows[row][2] = data.url_list[row] | |
rows[row][3] = data.location_list[row] | |
rows[row][4] = data.repo_list[row] | |
rows[row][5] = data.star_list[row].strip() | |
rows[row][6] = data.followers_list[row].strip() | |
rows[row][7] = data.following_list[row].strip() | |
rows[row][8] = data.email_list[row] | |
rows[row][9] = data.type_list[row] | |
csv_file = data.header + '.csv' # Name of csv file | |
file_path = os.path.join("./", csv_file) | |
with open(file_path, 'w',encoding="utf-8") as csvfile: | |
csvwriter = csv.writer(csvfile) | |
csvwriter.writerow(fields) | |
csvwriter.writerows(rows) | |
colors.success("Saved the data into " + file_path, True) | |
for arr in rows: | |
sql = "INSERT INTO allinfo (name, username,website,location,repositories,stars,followers,following,email,type) VALUES ('"+arr[0]+"','"+arr[1]+"','"+arr[2]+"','"+arr[3]+"','"+arr[4]+"','"+arr[5]+"','"+arr[6]+"','"+arr[7]+"','"+arr[8]+"','"+arr[9]+"')" | |
mycursor.execute(sql) | |
mydb.commit() | |
def stardox(repo_link): | |
i=0 | |
repository_link = repo_link | |
try: | |
html = requests.get(repository_link, timeout=8).text | |
except (requests.exceptions.RequestException, | |
requests.exceptions.HTTPError): | |
colors.error( | |
"Enter the repositories url in given format " | |
"[ https://github.com/username/repository_name ]") | |
sys.exit(1) | |
result = verify_url(html) | |
soup1 = BeautifulSoup(html, "lxml") | |
import data | |
colors.error('Error importing data module') | |
sys.exit(1) | |
title = getting_header(soup1) # Getting the title of the page | |
data.header = title # Storing title of the page as Project Title | |
star_value = watch_value = fork_value = 0 | |
a_tags = soup1.findAll("a") | |
for a_tag in a_tags: # Finding total stargazers of the repository | |
string = a_tag.get("href") | |
if(string.endswith("/watchers")): # Finding total watchers | |
watch_value = (a_tag.get_text()).strip() | |
if(string.endswith("/stargazers")): # Finding total stargazers | |
star_value = (a_tag.get_text()).strip() | |
if(string.endswith("/members")): # Finding total members | |
fork_value = (a_tag.get_text()).strip() | |
break | |
stargazer_link = repository_link + "/stargazers" | |
watchers_link = repository_link + "/watchers" | |
forkers_link = repository_link + "/network/members" | |
while (stargazer_link is not None): | |
stargazer_html = requests.get(stargazer_link).text | |
soup2 = BeautifulSoup(stargazer_html, "lxml") | |
a_next = soup2.findAll("a") | |
for a in a_next: | |
if a.get_text() == "Next": | |
stargazer_link = a.get('href') | |
break | |
else: | |
stargazer_link = None | |
follow_names = soup2.findAll("h3", {"class": "follow-list-name"}) | |
for name in follow_names: | |
a_tag = name.findAll("a") | |
data.name_list.append(a_tag[0].get_text()) | |
username = a_tag[0].get("href") | |
data.username_list.append(username[1:]) | |
data.type_list.append("Stargazer") | |
while(watchers_link is not None): | |
watchers_html = requests.get(watchers_link).text | |
soup2 = BeautifulSoup(watchers_html, "lxml") | |
a_next = soup2.findAll("a") | |
for a in a_next: | |
if a.get_text() == "Next": | |
watchers_link = a.get('href') | |
break | |
else: | |
watchers_link = None | |
follow_names = soup2.findAll("h3", {"class": "follow-list-name"}) | |
for name in follow_names: | |
a_tag = name.findAll("a") | |
data.name_list.append(a_tag[0].get_text()) | |
username = a_tag[0].get("href") | |
data.username_list.append(username[1:]) | |
data.type_list.append("Watcher") | |
while(forkers_link is not None): | |
forkers_html = requests.get(forkers_link).text | |
soup2 = BeautifulSoup(forkers_html, "lxml") | |
a_next = soup2.findAll("a") | |
follow_names = soup2.findAll("div", {"class": "repo"}) | |
i=i+1 | |
if(i == len(follow_names)): | |
break | |
for name in follow_names: | |
a_tag = name.findAll("a") | |
data.name_list.append(a_tag[1].get_text()) | |
username = a_tag[1].get("href") | |
print(username) | |
data.username_list.append(username[1:]) | |
data.type_list.append("Forkers") | |
count = 1 | |
pos = 0 | |
print(colors.red + "{0}".format("-") * 75, colors.green, end="\n\n") | |
while(count <= len(data.username_list)): | |
starer_url = "https://github.com/" + data.username_list[pos] | |
user_html = requests.get(starer_url).text | |
soup3 = BeautifulSoup(user_html, "lxml") | |
repo_data = requests.get( | |
"https://github.com/{}?tab=repositories&type=source" | |
.format(data.username_list[pos])).text | |
repo_soup = BeautifulSoup(repo_data, "lxml") | |
a_tags = repo_soup.findAll("a") | |
repositories_list = [] | |
for a_tag in a_tags: | |
if a_tag.get("itemprop") == "name codeRepository": | |
repositories_list.append(a_tag.get_text().strip()) | |
if len(repositories_list) > 0: | |
email = get_latest_commit( | |
repositories_list[0], | |
data.username_list[pos]) # Getting stargazer's email | |
data.email_list.append(str(email)) | |
else: | |
data.email_list.append("Not enough information.") | |
if(user_html is not None): | |
users_name = soup3.findAll("span",{"class":"p-name"}) | |
if(users_name): | |
data.realname_list.append(users_name[0].get_text()) | |
else: | |
data.realname_list.append("Nothing") | |
user_url = soup3.findAll("a",{"rel":"nofollow me"}) | |
if(user_url): | |
data.url_list.append(user_url[1].get_text()) | |
else: | |
data.url_list.append("Nothing.") | |
location = soup3.findAll("span",{"class":"p-label"}) | |
if(location): | |
data.location_list.append(location[0].get_text()) | |
else: | |
data.location_list.append("Nothing.") | |
items = soup3.findAll("a", {"class": "UnderlineNav-item"}) | |
for item in items[1:]: | |
if item.get("href").endswith("repositories") is True: | |
a_tag = item.findAll("span") | |
repo_count = a_tag[0].get_text() | |
data.repo_list.append(repo_count) | |
# Getting total stars by the stargazer | |
elif item.get("href").endswith("stars") is True: | |
a_tag = item.findAll("span") | |
star_count = a_tag[0].get_text() | |
data.star_list.append(star_count) | |
# Getting total followers of the stargazers | |
elif item.get("href").endswith("followers") is True: | |
a_tag = item.findAll("span") | |
followers_count = a_tag[0].get_text() | |
data.followers_list.append(followers_count) | |
# Getting following list of the stargazers | |
elif item.get("href").endswith("following") is True: | |
a_tag = item.findAll("span") | |
following_count = a_tag[0].get_text() | |
data.following_list.append(following_count) | |
count += 1 | |
pos += 1 | |
save_info() | |
if __name__ == '__main__': | |
from bs4 import BeautifulSoup | |
import requests | |
import urllib | |
HOST_ADDRESS = "127.0.0.1" | |
HOST_PORT = 8000 | |
class RequestHandler(BaseHTTPRequestHandler): | |
""" Our custom, example request handler """ | |
def send_response(self, code, message=None): | |
""" override to customize header """ | |
self.log_request(code) | |
self.send_response_only(code) | |
self.send_header('Server','python3 http.server Development Server') | |
self.send_header('Date', self.date_time_string()) | |
self.end_headers() | |
def do_GET(self): | |
""" response for a GET request """ | |
self.send_response(200) | |
self.wfile.write(b'<head><style>p, button {font-size: 1em}</style></head>') | |
self.wfile.write(b'<body>') | |
self.wfile.write(b'<form method="POST" enctype="application/x-www-form-urlencoded">') | |
self.wfile.write(b'<span>Enter something:</span>\ | |
<input name="test"> \ | |
<button style="color:blue">Submit</button>') | |
self.wfile.write(b'</form>') | |
self.wfile.write(b'</body>') | |
def do_POST(self): | |
try: | |
content_length = int(self.headers['Content-Length']) | |
(input,value) = self.rfile.read(content_length).decode('utf-8').split('=') | |
value = urllib.parse.unquote_plus(value) | |
stardox(bytes(value,'utf-8')) | |
self.send_response(200) | |
self.wfile.write(b'Successfull') | |
except: | |
self.send_response(200) | |
self.wfile.write(b'Shit') | |
def run(server_class=HTTPServer, handler_class=RequestHandler): | |
server_address = (HOST_ADDRESS, HOST_PORT) | |
httpd = server_class(server_address, handler_class) | |
httpd.serve_forever() | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment