Created
October 8, 2013 16:22
-
-
Save initbrain/6887311 to your computer and use it in GitHub Desktop.
Script permettant d'effectuer des recherches d'images sur TinEye
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# Pour les tests : | |
# python tineye_pic_search.py -d -l "http://upload.wikimedia.org/wikipedia/commons/c/c1/Glider_(clarince63).png" | |
# python tineye_pic_search.py -d -i /home/initbrain/Images/glider.png | |
# Importations | |
import sys | |
import os | |
import re | |
import urllib2 | |
import mimetypes | |
import json | |
import lxml.html as lh | |
from urllib import urlencode | |
from poster.encode import multipart_encode | |
from poster.streaminghttp import register_openers | |
#import getpass | |
# Solution mise en place pour pallier à un problème de buffer avec mingw32... | |
if not "linux" in sys.platform: | |
if not os.getenv("PYTHONUNBUFFERED"): | |
print "[!] La variable d'environnement PYTHONUNBUFFERED doit etre declaree avec une valeure non vide !!!" | |
sys.exit() | |
def get_content_type(filename): | |
return mimetypes.guess_type(filename)[0] or "application/octet-stream" | |
def tineyeSearchPic(url, search_type, target, http_proxy=None): | |
if search_type == 1: # URL | |
data = urlencode({"url": target}) | |
headers = dict() | |
elif search_type == 2: # Fichier | |
# Register the streaming http handlers with urllib2 | |
opener = register_openers() | |
# headers contains the necessary Content-Type and Content-Length | |
# data is a generator object that yields the encoded parameters | |
data, headers = multipart_encode({"image": open(target, "rb")}) | |
headers.update({"User-Agent": "Mozilla/5.0 (Windows NT 5.1; rv:16.0) Gecko/20100101 Firefox/16.0", | |
"Cache-Control": "no-cache", | |
"Pragma": "no-cache", | |
"Referer": "http://www.tineye.com/"}) | |
if http_proxy: | |
# http://username:[email protected]:1337 | |
http_proxy_full_auth_string = "http://%s%s@%s:%s" % (http_proxy["user"], | |
':' + http_proxy["passwd"] if http_proxy.has_key("passwd") else '', | |
http_proxy["server"], | |
http_proxy["port"]) | |
proxy_handler = urllib2.ProxyHandler({"http": http_proxy_full_auth_string, | |
"https": http_proxy_full_auth_string}) | |
if search_type != 2: | |
opener = urllib2.build_opener(proxy_handler) | |
urllib2.install_opener(opener) | |
else: | |
opener.add_handler(proxy_handler) | |
resRequest = req(url, data, headers) | |
if resRequest: | |
return resRequest | |
else: | |
return 0 | |
def req(url, data=None, headers=None, timeout=10, retry=2): | |
if not headers: | |
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 5.1; rv:16.0) Gecko/20100101 Firefox/16.0", | |
"Cache-Control": "no-cache", | |
"Pragma": "no-cache", | |
"Referer": "http://www.tineye.com/"} | |
request = urllib2.Request(url, data, headers) | |
source = False | |
loop = 0 | |
while not source and loop <= retry: | |
try: | |
connection = urllib2.urlopen(request, timeout=timeout) | |
except: | |
# Si il y a une erreur de connexion (timeout etc.) | |
print "ERREUR : " + url + " ne repond pas ..." | |
else: | |
if connection.getcode() != 200: | |
print "ERREUR : la page " + url + " n'existe plus ..." | |
else: | |
#print connection.geturl() | |
source = connection.read() | |
connection.close() | |
if not source: | |
print "ERREUR : le code source de " + url + " est vide ..." | |
else: | |
return source | |
loop += 1 | |
def tineyeResultParser(source): | |
""" Parse les resultats presents sur une page de TinEye.com | |
Exemple de sortie : | |
{"http://www.toto.fr/image.jpg": ["http://www.toto.fr/page1.html", "http://www.toto.fr/page2.html"], | |
"http://www.tata.fr/image.jpg": ["http://www.tata.fr/page1.html"]} | |
""" | |
# data = StringIO(source) | |
# root = lh.parse(data).getroot() | |
root = lh.fromstring(source) | |
divs = root.xpath("//div[@class='search-results-location']") | |
results = {} | |
for div in divs: | |
links = div.xpath('p/a/@href') | |
# Quand beaucoup de pages sur un site utilisent l'image trouvee par le moteur de recherche | |
# TinEye propose une autre page pour les lister ! "http://www.tineye.com/search/show_all/..." | |
skip = False | |
for link in links: | |
if "http://www.tineye.com/search/show_all/" in link: | |
skip = link | |
if skip: | |
source = req(skip) | |
# data = StringIO(source) | |
# subdiv = lh.parse(data).getroot( | |
subdiv = lh.fromstring(source) | |
links = subdiv.xpath("//div[@class='search-results-location']/p/a/@href") | |
results[links[0]] = links[1:] | |
return results | |
def progressbar(i, count, prefix="", size=60): | |
x = int(size * i / count) | |
sys.stdout.write("%s[%s%s] page %i/%i\r" % (prefix, "#" * x, "." * (size - x), i, count)) | |
if i == count: | |
sys.stdout.write("\n") # Deplacer le cuseur sur la ligne suivante | |
sys.stdout.flush() | |
def main(): | |
http_proxy = None | |
# Variables | |
usage = """ | |
Usage : | |
python """ + sys.argv[0] + """ [Type de recherche] \"URL/FILE_PATH\" | |
Options : | |
--lien/-lien/-l : Effectuer une recherche a partir du lien d'une image | |
--image/-image/-i : Effectuer une recherche a partir d'un fichier image | |
--proxy/-proxy/-p : Utiliser un proxy | |
--verbose/-verbose/-v : Afficher les chemins des fichiers enregistres | |
--help/-help/-h : Afficher ce message d'aide | |
Exemples : | |
python """ + sys.argv[0] + """ -l http://www.toto.fr/images/test.jpg | |
Realiser une recherche a partir d'une url sans passer par un proxy | |
python """ + sys.argv[0] + """ -p -i images/toto.jpg | |
Realiser une recherche a partir d'un fichier image en passant par un proxy | |
""" | |
# Verification qu'il y a au moins 2 arguments passe au script | |
if len(sys.argv) < 2: | |
print "ERREUR : Vous devez renseigner au moins 2 arguments\n" + usage | |
sys.exit() | |
else: | |
# Verification du nombre de parametres et de leur type | |
error = "" | |
search_type = 0 | |
target = False | |
for param in sys.argv[1:]: | |
if param in ["--help", "-help", "-h"]: | |
print usage | |
sys.exit() | |
elif param in ["--proxy", "-proxy", "-p"]: | |
# Créer un tableau qui permettra d'accueillir les informations saisies par l'utilisateur par la suite | |
http_proxy = {} | |
# Possibilité d'hardcoder les infos d'un proxy => PAS BIEN ! | |
#http_proxy = {"server": "myproxy.tld", | |
# "port": "8080", | |
# "user": "myuser", | |
# "passwd": "mypasswd"} | |
elif param in ["--verbose", "-verbose", "-v"]: | |
verbose = True # Not used but KEEP IT!!!! | |
elif param in ["--lien", "-lien", "-l"]: | |
if not search_type: | |
if target: | |
error += "ERREUR : le type de recherche (url/fichier) doit etre precise en premier\n" | |
else: | |
search_type = 1 | |
else: | |
error += "ERREUR : le type de recherche (url/fichier) a deja ete renseigne\n" | |
elif param in ["--image", "-image", "-i"]: | |
if target: | |
error += "ERREUR : le type de recherche (url/fichier) doit etre precise en premier\n" | |
else: | |
search_type = 2 | |
elif param[0:7] == "http://" or param[0:8] == "https://": | |
if search_type == 2: | |
error = error + "ERREUR : parametre invalide '" + param + "'\n" | |
else: | |
target = param | |
elif search_type == 2: | |
if not os.path.isfile(param): # check si fichier existe | |
error = error + "ERREUR : le fichier '" + param + "' est introuvable\n" | |
else: | |
# check si le fichier est bien une image et qu'il correspond | |
# aux formats supportes par le moteur de recherche (TinEye) | |
image_format = get_content_type(param) | |
if image_format not in ["image/png", "image/gif", "image/jpeg"]: | |
error = error + "ERREUR : format '" + image_format + "' non pris en charge\n" | |
else: | |
target = param | |
else: | |
error = error + "ERREUR : parametre invalide '" + param + "'\n" | |
if not search_type: | |
error += "ERREUR : veuillez preciser le type de recherche\n" | |
if not target: | |
error += "ERREUR : veuillez indiquer l'image a rechercher\n" | |
if error: | |
print error + usage | |
sys.exit() | |
# Parametrage du proxy en DIRECT ou via PROXY | |
if http_proxy is not None: | |
# On passe par un proxy | |
if not http_proxy.has_key("user") or not http_proxy.has_key("passwd") or not http_proxy.has_key("server") or not http_proxy.has_key("port"): | |
http_proxy["server"] = raw_input('[Proxy] Serveur : ') | |
http_proxy["port"] = raw_input('[Proxy] Port : ') | |
http_proxy["user"] = raw_input('[Proxy] Identifiant (laisser vide si aucun) : ') | |
# getpass permet de ne pas afficher la saisie | |
http_proxy["passwd"] = raw_input("[Proxy] Mot de passe (laisser vide si aucun) : ") | |
# TODO getpass.getpass("blah") fait foirer le unbuffered (mingw32...) | |
else: | |
print "[!] Attention, des identifiants sont visibles dans les variables d'environnement !" | |
print "Envoi de l'image au moteur de recherche TinEye" | |
source = tineyeSearchPic("http://www.tineye.com/search", search_type, target, http_proxy) | |
if not source: | |
sys.exit() | |
if "Your image is too simple to create a unique fingerprint." in source: | |
print "Aucun resultat (image trop simple) !" | |
elif "Could not read the image." in source: | |
print "Aucun resultat (image corrompue) !" | |
else: | |
resRegExpResults = re.compile("<h2><span>(\d+)</span> Results</h2>").findall(source) | |
if len(resRegExpResults) == 1: | |
if resRegExpResults[0] == 0: | |
print "Aucun resultat !" | |
else: | |
resRegExpSearchHash = re.compile("icon\" href=\"http://www\.tineye\.com/query/(.+?)\"", | |
re.MULTILINE).findall(source) | |
if len(resRegExpSearchHash): | |
urlPage = "http://www.tineye.com/search/" + resRegExpSearchHash[0] + "/" | |
resRegExpNbPage = re.compile(">(\d+?)</a><a class=\"next\"", re.MULTILINE).findall(source) | |
if len(resRegExpNbPage): | |
nbPage = int(resRegExpNbPage[0]) | |
numPage = 1 | |
finalResult = {} | |
finalResult.update(tineyeResultParser(source)) | |
while "Next »</a>" in source: | |
numPage += 1 | |
#progressbar(numPage, nbPage, "Recuperation : ") | |
print "Tineye: Page " + str(numPage) + " en cours (" + str( | |
len(finalResult)) + " resultats pour l'instant)" | |
source = req(urlPage + "?page=" + str(numPage) + "&sort=score&order=asc") | |
finalResult.update(tineyeResultParser(source)) | |
else: | |
print "Recherche terminee (" + str(len(finalResult)) + " resultats) !" | |
if len(finalResult) > 0: | |
print "Creation du fichier tineye_search_result.txt" | |
with open('tineye_search_result.txt', 'w') as bakupFile: | |
bakupFile.write(str(json.dumps(finalResult, indent=4))) | |
else: | |
print "[!] L'envoi de l'image a TinEye a echoue !" | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment