Skip to content

Instantly share code, notes, and snippets.

@LeperGnome
Last active July 15, 2020 10:40
Show Gist options
  • Save LeperGnome/45ca30ff71e222e30d80d9e2ed267d13 to your computer and use it in GitHub Desktop.
Save LeperGnome/45ca30ff71e222e30d80d9e2ed267d13 to your computer and use it in GitHub Desktop.
import json
from lxml import html
import pandas as pd
import random
import re
import requests as req
import time
base_url = 'https://lc-pro.ru'
catalog_url = base_url
CATEGORIES_BLACKLIST = []
agents = [
'Mozilla/5.0 (Linux; Android 6.0.1; RedMi Note 5 Build/RB3N5C; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/68.0.3440.91 Mobile Safari/537.36',
'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36',
'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95 YaBrowser/17.1.0.2034 Yowser/2.5 Safari/537.36',
'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.186 YaBrowser/18.3.1.1232 Yowser/2.5 Safari/537.36'
]
session = req.Session()
def compose_headers() -> dict:
'''
Composing headers with random User-Agent
Returns:
Headers dict
'''
headers = {
'User-Agent': random.choice(agents)
}
return headers
def clean_trash(objects: list) -> list:
'''
Removing unwanted symbols from list of strings
Args:
objects: list of strings
Returns:
list of cleaned strings
'''
return [re.sub(r'[\t\n\xa0\b]', '', value).strip() for value in objects]
def fetch(url: str) -> bytes:
'''
Retrieving page content from url.
Args:
url: url to make request to
Returns:
response content in bytes
'''
global session
while True:
try:
res = session.get(url, headers=compose_headers())
except req.exceptions.ConnectionError as err:
print(f'Connection error occured: {err}\nTrying to reconnect...')
time.sleep(1)
continue
print(url, res.status_code)
if res.status_code == 503:
# update session in case request gets blocked
session = req.Session()
elif res.status_code == 200:
break
return res.content
def extract(url: str, xpath: str, content_required: bool = False) -> list:
'''
Function, extracting elements from page by xpath.
Args:
url: page url from which to fetch content
xpaht: str, containing xpath expression
content_required: marker, which makes function refetch page, until product of xpath scraping is not empty
Returns:
result of xpath extraction -> list of elements
'''
while True:
content = fetch(url)
tree = html.fromstring(content)
out = tree.xpath(xpath)
if content_required and not out:
continue
return out
def get_categories_urls() -> list:
urls = extract(
catalog_url,
"//div[@class='assortment']//a/@href",
content_required=True
)
return urls
def get_series_urls(cat_url: str) -> list:
urls = extract(
base_url+cat_url,
"//div[@class='catalogue_product_block']/a/@href",
content_required=True
)
return urls
def reorder_columns(df: pd.DataFrame) -> pd.DataFrame:
'''
Reordering columns in DataFrame. Puts a list of required columns first.
'''
required_columns = [
'Артикул',
'Название продукции',
'Классификация',
'Ссылка',
'Изображения',
'Цена производителя',
]
rest_columns = [col for col in df.columns if col not in required_columns]
ordered = required_columns + rest_columns
if 0 in ordered:
ordered.remove(0)
df = df[ordered]
return df
def get_product_info(url: str) -> list:
'''
Extracting products info from url, using pandas.DataFrame
'''
full_url = base_url + url
while True:
try:
# extracting essential product info
content = fetch(full_url)
tree = html.fromstring(content)
products_table = tree.xpath(
"//section[@class='table_wrap']/table"
)[0]
detail_table = tree.xpath(
"//section[contains(@class, 'table_wrap') and contains(@class, 'full')]/table"
)[0]
images = tree.xpath(
"//div[@class='gallery_container']//a[@class='lightboxgallery-gallery-item']/@href"
)
breadclumbs = tree.xpath("//div[@class='breadcrumbs']//a/text()")
df = pd.read_html(html.tostring(products_table))[0]
df_detail = pd.read_html(html.tostring(detail_table))[0]
break
except IndexError as err:
print(f'Error occured: {err}.\nRefetching...')
# cleaning data
df['Цена производителя'] = ''
df['Классификация'] = '|'.join([el for el in breadclumbs if el])
df['Изображения'] = '|'.join([base_url+url for url in images])
df['Ссылка'] = full_url
df.rename(columns={
'Название светильника': 'Название продукции',
}, inplace=True)
df.drop(columns=['Паспорт', 'Сертификат', 'IES'], inplace=True)
# adding detailed info
for idx, row in df_detail.iterrows():
df[row[0]] = row[1]
df = reorder_columns(df.copy())
return df.to_dict(orient='records')
def save_json(data: list, fname: str):
with open(fname, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
print(f'\n\n{len(data)} records successfully dumped.')
def parse():
out = []
categories_urls = get_categories_urls()
print(f'Categories num: {len(categories_urls)}')
for cat_url in categories_urls:
if cat_url in CATEGORIES_BLACKLIST:
continue
for series_url in get_series_urls(cat_url):
products_info = get_product_info(series_url)
out += products_info
save_json(out, f'lc-pro_data_{time.time()}.json')
if __name__ == '__main__':
parse()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment