Last active
November 12, 2018 18:45
-
-
Save p3t3r67x0/81658b81653553f0f79902317f905675 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import uuid | |
import requests | |
from lxml import html | |
from sqlalchemy import create_engine, Column, Integer, String, UniqueConstraint | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.exc import IntegrityError | |
from sqlalchemy.orm import sessionmaker | |
from fake_useragent import UserAgent | |
from urlparse import urljoin | |
from bs4 import BeautifulSoup | |
base = declarative_base() | |
class Pharmacy(base): | |
__tablename__ = 'pharmacy' | |
__table_args__ = tuple(UniqueConstraint('name', 'adr', name='uix_1')) | |
id = Column('id', String(20), primary_key=True) | |
name = Column('name', String(250)) | |
adr = Column('adr', String(250)) | |
zip = Column('zip', Integer) | |
city = Column('city', String(80)) | |
tel = Column('tel', String(80)) | |
def connect_database(): | |
return create_engine('postgresql://<user>:<password>@localhost:5432/apotheken') | |
def replace_htmlchars(data): | |
return data.replace('À', u'À').replace('Á', u'Á').replace('Â', u'Â').replace('Ã', u'Ã').replace('Ä', u'Ä').replace('Å', u'Å').replace('Æ', u'Æ').replace('Ç', u'Ç').replace('È', u'È').replace('É', u'É').replace('Ê', u'Ê').replace('Ë', u'Ë').replace('Ì', u'Ì').replace('Í', u'Í').replace('Î', u'Î').replace('Ï', u'Ï').replace('Ð', u'Ð').replace('Ñ', u'Ñ').replace('Ò', u'Ò').replace('Ó', u'Ó').replace('Ô', u'Ô').replace('Õ', u'Õ').replace('Ö', u'Ö').replace('×', u'×').replace('Ø', u'Ø').replace('Ù', u'Ù').replace('Ú', u'Ú').replace('Û', u'Û').replace('Ü', u'Ü').replace('Ý', u'Ý').replace('Þ', u'Þ').replace('ß', u'ß').replace('à', u'à').replace('á', u'á').replace('â', u'â').replace('ã', u'ã').replace('ä', u'ä').replace('å', u'å').replace('æ', u'æ').replace('ç', u'ç').replace('è', u'è').replace('é', u'é').replace('ê', u'ê').replace('ë', u'ë').replace('ì', u'ì').replace('í', u'í').replace('î', u'î').replace('ï', u'ï').replace('ð', u'ð').replace('ñ', u'ñ').replace('ò', u'ò').replace('ó', u'ó').replace('ô', u'ô').replace('õ', u'õ').replace('ö', u'ö').replace('÷', u'÷').replace('ø', u'ø').replace('ù', u'ù').replace('ú', u'ú').replace('û', u'û').replace('ü', u'ü').replace('ý', u'ý').replace('þ', u'þ').replace('ÿ', u'ÿ').replace('´', u'´') | |
def load_document(url, query, ua, db): | |
headers = { | |
'User-Agent': ua.google | |
} | |
payload = { | |
'q_apotheke': 'on', | |
'q_ort': 'on', | |
'q_plz': 'on', | |
'q_strasse': 'on', | |
'suchbegriffe': query, | |
'suchmodus': 'volltext' | |
} | |
try: | |
r = requests.post(url, data=payload, timeout=3, headers=headers) | |
if r.status_code == 200: | |
r.encoding = 'ISO-8859-1' | |
extract_address(r.content, db) | |
except requests.exceptions.ReadTimeout as e: | |
print e | |
def extract_address(doc, db): | |
soup = BeautifulSoup(doc, 'html.parser') | |
r = soup.find_all('div', class_='apoinfo apoinfo-hover') | |
for a in r: | |
d = html.document_fromstring(a.prettify()) | |
tel_data = d.xpath('//div[@class="slider"]/div[2]/p/text()') | |
apo_data = d.xpath('//div[@class="apotheke"]/div/p/strong/text()') | |
adr_data = d.xpath('//div[@class="apotheke"]/div/p/text()') | |
apo = apo_data[0].replace('\n', '').strip() | |
adr = adr_data[1].split(',')[1].replace('\n', '').strip() | |
zip = adr_data[1].split(',')[2].replace('\n', '').strip().split(' ')[0] | |
city = adr_data[1].split(',')[2].replace('\n', '').strip().split(' ')[1] | |
tel = tel_data[0].replace('\n', '').strip().replace('Tel.: ', '') | |
print u'{}\n{}\n{} {}\n{}\n'.format(apo, adr, zip, city, tel, db) | |
insert_one(apo, adr, zip, city, tel, db) | |
def insert_one(apo, adr, zip, city, tel, db): | |
Session = sessionmaker(db) | |
session = Session() | |
id = str(uuid.uuid4()).split('-')[4] | |
try: | |
insert = Pharmacy(id=id, name=apo, adr=adr, zip=zip, city=city, tel=tel) | |
session.add(insert) | |
session.commit() | |
except IntegrityError as e: | |
print e | |
def main(): | |
db = connect_database() | |
Session = sessionmaker(db) | |
session = Session() | |
base.metadata.create_all(db) | |
ua = UserAgent() | |
load_document('https://www.aknr.de/service/apotheken.php', sys.argv[1], ua, db) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import requests | |
from lxml import html | |
from fake_useragent import UserAgent | |
from urlparse import urljoin | |
def replace_htmlchars(data): | |
return data.replace('À', u'À').replace('Á', u'Á').replace('Â', u'Â').replace('Ã', u'Ã').replace('Ä', u'Ä').replace('Å', u'Å').replace('Æ', u'Æ').replace('Ç', u'Ç').replace('È', u'È').replace('É', u'É').replace('Ê', u'Ê').replace('Ë', u'Ë').replace('Ì', u'Ì').replace('Í', u'Í').replace('Î', u'Î').replace('Ï', u'Ï').replace('Ð', u'Ð').replace('Ñ', u'Ñ').replace('Ò', u'Ò').replace('Ó', u'Ó').replace('Ô', u'Ô').replace('Õ', u'Õ').replace('Ö', u'Ö').replace('×', u'×').replace('Ø', u'Ø').replace('Ù', u'Ù').replace('Ú', u'Ú').replace('Û', u'Û').replace('Ü', u'Ü').replace('Ý', u'Ý').replace('Þ', u'Þ').replace('ß', u'ß').replace('à', u'à').replace('á', u'á').replace('â', u'â').replace('ã', u'ã').replace('ä', u'ä').replace('å', u'å').replace('æ', u'æ').replace('ç', u'ç').replace('è', u'è').replace('é', u'é').replace('ê', u'ê').replace('ë', u'ë').replace('ì', u'ì').replace('í', u'í').replace('î', u'î').replace('ï', u'ï').replace('ð', u'ð').replace('ñ', u'ñ').replace('ò', u'ò').replace('ó', u'ó').replace('ô', u'ô').replace('õ', u'õ').replace('ö', u'ö').replace('÷', u'÷').replace('ø', u'ø').replace('ù', u'ù').replace('ú', u'ú').replace('û', u'û').replace('ü', u'ü').replace('ý', u'ý').replace('þ', u'þ').replace('ÿ', u'ÿ').replace('´', u'´') | |
def load_document(url, ua): | |
h = headers = { | |
'User-Agent': ua.google | |
} | |
r = requests.post(url, headers=headers) | |
if r.status_code == 200: | |
extract_address(r.content) | |
extract_links(url, r.content, ua) | |
def extract_links(url, doc, ua): | |
d = html.fromstring(doc) | |
r = d.xpath('//li[@class="category"]/a/@href') | |
a = [] | |
for l in r: | |
a.append(l) | |
for i in set(a): | |
p = i.lstrip('-') | |
u = urljoin(url, p) | |
load_document(u, ua) | |
def extract_address(doc): | |
d = html.fromstring(doc) | |
r = d.xpath('//div[@class="box_adress"]') | |
for a in r: | |
apo = a.xpath('./h2/text()')[0] | |
adr = a.xpath('./p[@class="adress"]/text()') | |
tel = a.xpath('./p[@class="contact"]/text()')[0].strip().replace('Tel.:', '').replace(' ', '').replace('\n', '') | |
print u'{}\n{}\n{}\n{}\n'.format(apo, adr[0], adr[1], tel) | |
def main(): | |
ua = UserAgent() | |
load_document(sys.argv[1], ua) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment