Created
August 19, 2013 02:21
-
-
Save moorepants/6265289 to your computer and use it in GitHub Desktop.
Image uploader for localwikis.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
from upload_tagged_photos import ImageUploader | |
def TestUploadWiki(): | |
url = "http://clevelandwiki.org/api/" | |
user_name = raw_input("What is your username?") | |
api_key = getpassword("What is your api_key?") | |
def setup(self): | |
self.api = slumber.API(self.url, auth=(self.user_name, | |
self.password)) | |
# create two directories with some images, some should have correct | |
# tags | |
# create test page | |
page_dict = { | |
"content": "<p>The Upload Test Page.</p>", | |
"name": "Upload Test Page", | |
} | |
test_page = self.api.page.post(page_dict) | |
test_page_slug = self.api.page(test_page['id'])['slug'] | |
# add an image to it | |
with open('image.jpg') as f: | |
self.api.file.post(name='image.jpg', slug=test_page_slug, | |
files={'file': f}) | |
self.uploader = ImageUploader(self.api_url, | |
user_name=self.user_name, | |
api_key=self.api_key) | |
def test_init(self): | |
uploader = ImageUploader(self.api_url, user_name=self.user_name, | |
api_key=self.api_key) | |
assert self.api._store['base_url'] == self.api_url | |
assert self.api._store['format'] == 'json' | |
assert sefl.api._store['session'].auth[0] == self.user_name | |
assert sefl.api._store['session'].auth[1] == self.api_key | |
def test_remove_tmp_dirs(): | |
directories = ['localwikidir1', 'localwikidir1'] | |
for directory in directories: | |
os.mkdir(os.path.join('/tmp', directory, | |
self.uploader._tmp_dir_name)) | |
files = ['file1.jpg', 'file2.jpg'] | |
file_paths = [] | |
for directory, file_name in zip(directores, files): | |
file_path = os.path.join('/tmp', directory, | |
self.uploader._tmp_dir_name, file_name) | |
with open(file_path, 'w') as f: | |
pass | |
file_path.append() | |
assert os.exists(file_path) | |
self.uploader.remove_tmp_dirs(file_paths) | |
for file_path in file_paths: | |
assert not os.exists(os.split(file_path)[0]) | |
assert not os.exists(file_path) | |
for directory in directories: | |
shutil.rmtree(os.path.join('/tmp', directory)) | |
def test_find_files_in_page(self): | |
assert find_files_in_page('this will never be the name of a page') is None | |
files_in_page = find_files_in_page('Upload Test Page') | |
assert files_in_page[0]['name'] = 'image.jpg' | |
assert files_in_page[0]['slug'] = 'upload test page' | |
def teardown(): | |
# remove the files attached to the test page | |
# delete the test page and all versions | |
# delete the tmp image directories | |
test_page = api.page('Upload Test Page').get() | |
test_file_id = \ | |
api.files.get(slug=test_page['slug'])['objects'][0]['id'] | |
self.api.files.id.delete() | |
self.api.page('Upload Test Page').delete() | |
def test_find_wiki_images(): | |
os.mkdir('/tmp/wiki-images') | |
upload_wiki_images('dir']) | |
upload_wiki_images(['dir1', 'dir2']) | |
upload_wiki_images('dir'], create_new_page=True, embed_image_on_create=True) | |
wiki_images = find_wiki_images('dir') | |
wiki_images = find_wiki_images(['dir1', 'dir2']) | |
wiki_images['/tmp/cw/test_image.jpg'] == ['page:Test Page'] | |
check_if_file_exists(file_name) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import os | |
import shutil | |
import getpass | |
import slumber | |
from gi.repository import GExiv2 | |
class ImageUploader(object): | |
_tmp_dir_name = '.localwiki' | |
_file_extensions = ['.png', '.jpg', '.gif'] | |
def __init__(self, api_url, user_name=None, api_key=None): | |
"""Initializes the uploader. | |
Parameters | |
========== | |
api_url : string | |
This should point to your API and have a trailing slash, e.g. | |
'http://clevelandwiki.org/api/'. | |
user_name : string, optional, default=None | |
The user name for you API key. If you don't provide this here, | |
then you will be prompted to enter it on initialization. | |
api_key : string, optional, default=None | |
The api_key for this user. If you don't provide this here, then | |
you will be prompted to enter it on initialization. | |
""" | |
if user_name is None: | |
user_name = raw_input("What is your username?\n") | |
if api_key is None: | |
api_key = getpass.getpass("What is your api_key?\n") | |
self.api = slumber.API(api_url, auth=(user_name, api_key)) | |
def upload(self, main_keyword, *directories, page_keyword_prefix="page:"): | |
"""Uploads all the new files in the specified directories with the | |
proper tags to localwiki and creates new pages if needed. | |
Parameters | |
========== | |
main_keyword : string | |
The keyword embedd in Iptc.Application2.Keywords that identifies | |
your image as one that belongs on your localwiki site, e.g. | |
'cleveland wiki'. | |
directories : str | |
The directories to search for images. | |
page_keyword_prefix : string, optional, default="page:" | |
This is the prefix for the keyword embedded in your image which | |
contains the page name where the image belongs, e.g. if you you | |
image belongs on the front page then you keyword should look | |
like "page:Front Page". | |
""" | |
self.directories = list(directories) | |
self.main_keyword = main_keyword | |
self.page_keyword_prefix = page_keyword_prefix | |
wiki_images = self.find_localwiki_images() | |
for file_path, page_names in wiki_images.items(): | |
for page_name in page_names: | |
page = create_page(page_name) | |
if not file_exists_on_server(os.path.split(file_path)[1]): | |
upload_file(page, file_path) | |
embed_image(page_name, image_name) | |
else: | |
print("{} already exists on the localwiki.".format(file_path)) | |
print('Cleaning up image rotations.') | |
self.remove_tmp_dirs(wiki_images.keys()) | |
print('Done.') | |
def remove_tmp_dirs(self, file_paths): | |
"""Removes any of the temporary directories used to rotate images.""" | |
fondled_directories = set([os.path.split(path)[0] for path in | |
file_paths]) | |
for directory in fondled_directories: | |
tmp_dir = os.path.join(directory, self._tmp_dir_name) | |
if os.path.exists(tmp_dir): | |
shutil.rmtree(tmp_dir) | |
def find_localwiki_images(self): | |
"""Returns a dictionary mapping local image paths to local wiki page | |
names for all images in the provided directories that have the | |
corret tags.""" | |
wiki_images = {} | |
for directory in self.directories: | |
wiki_images.upate(self.find_images_in_directory(directory)) | |
return wiki_images | |
def find_localwiki_images_in_directory(self, directory): | |
"""Returns a dictionary mapping local image paths to local wiki page | |
names. | |
directory : string | |
The path to the directory that should be scanned for localwiki | |
images. | |
""" | |
file_names = os.listdir(directory) | |
wiki_images = {} | |
for file_name in file_names: | |
if True in [file_name.endswith(ext) for ext in_file_extensions]: | |
metadata = GExiv2.Metadata(file_name) | |
keywords = \ | |
metadata.get_tag_multiple('Iptc.Application2.Keywords') | |
if self.main_keyword in keywords: | |
wiki_images[os.join.path(directory, file_name)] = \ | |
[keyword.split(':')[1] for keyword in keywords if | |
keyword.startswith('page:')] | |
return wiki_images | |
def create_page(self, page_name): | |
"""Creates a new blank page on the server with the provided page | |
name and returns the data recieved from the post, unless it already | |
exists, in which case it returns the exsiting page. | |
Parameters | |
========== | |
page_name : string | |
The name of the page to create. | |
Returns | |
======= | |
page_dict : dictionary | |
The response from the post. | |
""" | |
page_dict = { | |
"content": "<p>Please add some content to help describe this page.</p>", | |
"name": page_name, | |
} | |
try: | |
return self.api.page(page_name).get() | |
except HTTPClientError: | |
print("Creating the new page: {}".format(page_name)) | |
return self.api.page.post(page_dict) | |
def find_files_in_page(self, page_name): | |
"""Returns a list of dictionaries, one for each file, attached to a | |
page and returns None if the page doesn't exist. | |
Parameters | |
========== | |
page_name : string | |
The name of the page to create. | |
Returns | |
======= | |
files : list of dictionaries or None | |
The respons dictionaries, one for each file associated with the | |
page. None if the page doesn't exist. | |
""" | |
try: | |
slug = self.api.page(page_name).get()['slug'] | |
except HTTPClientError: | |
return None | |
else: | |
return self.api.files.get(slug=slug)['objects'] | |
def file_exists_on_server(self, file_name): | |
"""Returns true if the file already exists on the server. | |
Parameters | |
========== | |
file_name : string | |
The name of the file, as stored in localwiki. | |
""" | |
file_list = self.api.file.get()['objects'] | |
exists = False | |
for file_dict in file_list: | |
if file_dict['name'] == file_name: | |
exists = True | |
return exists | |
def rotate_image(self, file_path): | |
"""Creates a temporary directory beside the file, copies the file | |
into the directory, and rotates it based on the EXIF orientaiton | |
data. | |
Parameters | |
========== | |
file_path : string | |
The path to the image file. | |
""" | |
directory, file_name = os.path.split(file_path) | |
os.mkdir(os.path.join(directory, self._tmp_dir_name)) | |
tmp_file_path = os.path.join(directory, self._tmp_dir_name, | |
file_name) | |
shutil.copyfile(file_path, tmp_file_path) | |
os.system("jhead -ft -autorot {}".format(tmp_file_path)) | |
def upload_image(self, page, file_path): | |
"""Uploads the image to the server. | |
Parameters | |
========== | |
page : dictionary | |
The response dictionary for a page. | |
file_path : string | |
The path to the image file. | |
""" | |
metadata = GExiv2.Metadata(file_path) | |
directory, file_name = os.path.split(file_path) | |
tmp_file_path = os.path.join(directory, 'tmp', file_name) | |
try: | |
with open(tmp_file_path): | |
pass | |
except IOError: | |
rotated = False | |
else: | |
rotated = True | |
if metadata['Exif.Image.Orientation'] != '1' and not rotated: | |
rotate_image(file_path) | |
image = open(tmp_file_path, 'r') | |
else: | |
image = open(file_path, 'r') | |
print('Uploading {} to {}'.format(file_name, page['name'])) | |
# may need this instead: | |
#self.api.file.post({'name': file_name, 'slug'=page['slug']}, | |
#files={'file': image}) | |
self.api.file.post(name=file_name, slug=page['slug'], | |
files={'file': image}) | |
print('Done.') | |
image.close() | |
def embed_image(self, page_name, image_name, caption='Caption me!'): | |
"""Appends HTML to the page that embeds the attached image. | |
Parameters | |
========== | |
page_name : string | |
The name of the page to create. | |
image_name : string | |
The name of an image that is attached to the page. | |
caption : string, optional, default='Caption me!' | |
The caption that is displayed under the image. | |
Returns | |
======= | |
page_response : dictionary | |
The response from the patch. | |
""" | |
current_content = self.api.page(page_name).get()['content'] | |
html = \ | |
""" | |
<p> | |
<span class="image_frame image_frame_border"> | |
<img src="_files/{}" style="width: 300px; height: 225px;" /> | |
<span class="image_caption" style="width: 300px;"> | |
{} | |
</span> | |
</span> | |
</p> | |
""".format(file_name, caption) | |
return self.api.page(page_name).patch({'content': current_content + | |
html}) | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser( | |
description='Upload images to local wiki.') | |
parser.add_argument('url', type=str, | |
help="The API url with a trailing slash, e.g http://clevelandwiki.org/api/") | |
parser.add_argument('keyword', type=str, | |
help="The main keyword to look for, e.g. 'cleveland wiki'.") | |
parser.add_argument('directories', type=str, narg='*', | |
help="The directories to search.") | |
parser.add_argument('--prefix', type=str, | |
help="The keyword page name prefix, the default is 'page:'.") | |
args = parser.parse_args() | |
uploader = ImageUploader(args.url) | |
if args.prefix: | |
kwargs = {'page_keyword_prefix': args.prefix} | |
else: | |
kwargs = {} | |
uploader.upload(arg.keyword, *arg.directories, **kwargs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment