Created
May 7, 2020 10:45
-
-
Save kizernis/6e4a4d250ea0cf2985f5d119e6569eb0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import sys | |
import csv | |
import shutil | |
import requests | |
import threading | |
import concurrent.futures | |
from glob import glob | |
from tqdm import tqdm | |
from bs4 import BeautifulSoup | |
from urllib.parse import quote_plus | |
from configparser import RawConfigParser | |
config = RawConfigParser() | |
config.read('settings.cfg') | |
login = config.get('General', 'login').strip() | |
password = config.get('General', 'password').strip() | |
output_file = config.get('General', 'output_filepath').strip() | |
assert login and password and output_file | |
post_data = { | |
'Username': login, | |
'Password': password, | |
'TOSAccepted': 'true', | |
'RememberMe': 'false', | |
'WindowWidth': '1263', | |
'JavascriptEnabled': 'true' | |
} | |
list_urls = [ | |
'https://portal.segensolar.de/nav/pv/StorageSystems?SortOrder=Lo&Display=List&PageSize=All', | |
'https://portal.segensolar.de/nav/pv/Module?SortOrder=Lo&Display=List&PageSize=All', | |
'https://portal.segensolar.de/nav/pv/Inverters?SortOrder=Lo&Display=List&PageSize=All' | |
] | |
temp_dir_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'temp') | |
if os.path.isdir(temp_dir_path): | |
shutil.rmtree(temp_dir_path) | |
os.mkdir(temp_dir_path) | |
print('Logging in...') | |
session = requests.Session() | |
response = session.post('https://portal.segensolar.de/Home/Login', data=post_data) | |
if 'LOGOUT' not in response.text: | |
print('Unable to login.') | |
sys.exit(1) | |
print('Downloading lists...') | |
with open(os.path.join(temp_dir_path, 'temp.csv'), 'w', encoding='utf-8') as f: | |
f.write(session.get('https://portal.segensolar.de/reseller/price/ExportData').text) | |
for list_number, list_url in enumerate(tqdm(list_urls), start=1): | |
with open(os.path.join(temp_dir_path, f'list{list_number:07d}.html'), 'w', encoding='utf-8') as f: | |
f.write(session.get(list_url).text) | |
print('Processing lists...') | |
item_urls = [] | |
item_number = 0 | |
for list_number in range(1, len(list_urls) + 1): | |
with open(os.path.join(temp_dir_path, f'list{list_number:07d}.html'), encoding='utf-8') as f: | |
for soup_row in BeautifulSoup(f, 'lxml').find_all('tr', class_='dd-product'): | |
item_number += 1 | |
item_urls.append((item_number, soup_row.a.get('href'))) | |
thread_local = threading.local() | |
def get_session(): | |
if not hasattr(thread_local, "session"): | |
thread_local.session = requests.Session() | |
return thread_local.session | |
def download_item_page(item_url): | |
session = get_session() | |
response = session.post('https://portal.segensolar.de/Home/Login?ReturnUrl={}'.format(quote_plus(item_url[1])), data=post_data) | |
with open(os.path.join(temp_dir_path, f'item{item_url[0]:07d}.html'), 'w', encoding='utf-8', newline='\n') as f: | |
f.write(response.text) | |
progress_bar.update() | |
print('Downloading items...') | |
progress_bar = tqdm(total=len(item_urls)) | |
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor: | |
executor.map(download_item_page, item_urls) | |
progress_bar.close() | |
print('Processing items...') | |
data = {} | |
for file_path in tqdm(sorted(glob(os.path.join(temp_dir_path, 'item*.html')))): | |
with open(file_path, encoding='utf-8') as f: | |
soup = BeautifulSoup(f, 'lxml') | |
soup_divs = soup.find_all('div', class_='dd-p-StockDue grid') | |
if len(soup_divs) > 0: | |
key = soup.find('dd', class_='first').text.strip().replace('Artikel-Nr.: ', '') | |
data[key] = [soup_div.text.strip() for soup_div in soup_divs] | |
with open(os.path.join(temp_dir_path, 'temp.csv'), encoding='utf-8') as f_in, open(output_file, 'w', encoding='utf-8', newline='') as f_out: | |
reader = csv.reader(f_in) | |
writer = csv.writer(f_out) | |
header = next(reader) | |
header += ['future_delivery_date_1', 'future_delivery_stock_1', 'future_delivery_date_2', 'future_delivery_stock_2'] | |
writer.writerow(header) | |
for row in reader: | |
if row[5] not in ('Module', 'Wechselrichter', 'Speichersysteme'): | |
continue | |
values = row + ['', '', '', ''] | |
if values[1] in data: | |
m = re.fullmatch(r'^([\d,]+)[^\d]+(.+)$', data[values[1]][0]) | |
values[-4] = m[2]; values[-3] = m[1].replace(',', '') | |
if len(data[values[1]]) > 1: | |
m = re.fullmatch(r'^([\d,]+)[^\d]+(.+)$', data[values[1]][1]) | |
values[-2] = m[2]; values[-1] = m[1].replace(',', '') | |
writer.writerow(values) | |
shutil.rmtree(temp_dir_path) | |
print('Success!') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment