Last active
April 21, 2021 06:24
-
-
Save xiconet/fd1ae39b26b0698868531487e7e2aa6a to your computer and use it in GitHub Desktop.
pycurl quickstart
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!C:/Python36/python | |
# | |
# from http://pycurl.io/docs/latest/quickstart.html#retrieving-a-network-resource | |
from io import BytesIO | |
from urllib.parse import urlencode | |
import posixpath | |
import re | |
import json | |
import pycurl | |
headers = {} | |
def header_function(header_line): | |
# HTTP standard specifies that headers are encoded in iso-8859-1. | |
# On Python 2, decoding step can be skipped. | |
# On Python 3, decoding step is required. | |
header_line = header_line.decode('iso-8859-1') | |
# Header lines include the first status line (HTTP/1.x ...). | |
# We are going to ignore all lines that don't have a colon in them. | |
# This will botch headers that are split on multiple lines... | |
if ':' not in header_line: | |
return | |
# Break the header line into header name and value. | |
name, value = header_line.split(':', 1) | |
# Remove whitespace that may be present. | |
# Header lines include the trailing newline, and there may be whitespace | |
# around the colon. | |
name = name.strip() | |
value = value.strip() | |
# Header names are case insensitive. | |
# Lowercase name here. | |
name = name.lower() | |
# Now we can actually record the header name and value. | |
headers[name] = value | |
def download(uri, outfile=None): | |
if not outfile: | |
outfile = posixpath.basename(uri) | |
if outfile == '': | |
outfile = 'pcurl_out.tmp' | |
with open(outfile, 'wb') as f: | |
c = pycurl.Curl() | |
c.setopt(c.URL, uri) | |
c.setopt(c.WRITEDATA, f) | |
c.setopt(c.HEADERFUNCTION, header_function) | |
c.perform() | |
c.close() | |
def upload(url, file=None): | |
c = pycurl.Curl() | |
c.setopt(c.URL, url) | |
c.setopt(c.HTTPPOST, [ | |
('fileupload', ( | |
# upload the contents of this file | |
c.FORM_FILE, __file__, | |
# specify a different file name for the upload | |
c.FORM_FILENAME, 'helloworld.py', | |
# specify a different content type | |
c.FORM_CONTENTTYPE, 'application/x-python', | |
)), | |
]) | |
c.perform() | |
c.close() | |
def request(url, data=None): | |
buffer = BytesIO() | |
c = pycurl.Curl() | |
c.setopt(c.URL, url) | |
c.setopt(c.WRITEDATA, buffer) | |
if data: | |
#post_data = {'field': 'value'} | |
# Form data must be provided already urlencoded. | |
postfields = urlencode(data) | |
# Sets request method to POST, | |
# Content-Type header to application/x-www-form-urlencoded | |
# and data to send in request body. | |
c.setopt(c.POSTFIELDS, postfields) | |
# Set a header function. | |
c.setopt(c.HEADERFUNCTION, header_function) | |
c.perform() | |
# HTTP response code, e.g. 200. | |
print('Status code: %d' % c.getinfo(c.RESPONSE_CODE)) | |
# Elapsed time for the transfer. | |
print('Tranfer time: %f' % c.getinfo(c.TOTAL_TIME)) | |
c.close() | |
body = buffer.getvalue() # a byte string. | |
return body | |
if __name__ == '__main__': | |
DEF_URL = 'http://httpbin.org/ip' | |
DEF_UPLOAD_URL = 'http://pycurl.io/tests/testfileupload.php' | |
from argparse import ArgumentParser | |
parser = ArgumentParser() | |
parser.add_argument('-v', '--verbose', action='store_true', help='switch verbose/debug mode on') | |
parser.add_argument('url', nargs='?', default=DEF_URL, help='target url') | |
parser.add_argument('-d', '--data', nargs='*', help='post form data') | |
parser.add_argument('-o', '--outfile', nargs='?', const='default', help='download to OUTFILE') | |
parser.add_argument('-u', '--upload', nargs='?', metavar="file", const='default', help='upload FILE') | |
args = parser.parse_args() | |
#u = 'https://mega.co.nz/' # ssl issue | |
uri = args.url | |
if args.outfile: | |
if args.outfile == "default": | |
download(uri) | |
else: | |
download(uri, args.outfile) | |
print(headers) | |
exit() | |
if args.upload: | |
if uri == DEF_URL: | |
uri = DEF_UPLOAD_URL | |
if args.upload == 'default': | |
upload(uri) | |
else: | |
upload(uri, args.upload) | |
exit() | |
form_data = {} | |
if args.data: | |
for p in args.data: | |
if len(p.split('=')) == 2: | |
k, v = p.split('=') | |
form_data[k] = v | |
# shorter code: | |
#form_data = dict(map(lambda x:x.split('='),fd)) | |
if args.verbose: | |
print('form data:', form_data) | |
res = request(uri, data=form_data) | |
encoding = None | |
is_json = False | |
if 'content-type' in headers: | |
content_type = headers['content-type'].lower() | |
match = re.search('charset=(\S+)', content_type) | |
if match: | |
encoding = match.group(1) | |
print('Decoding using %s' % encoding) | |
if content_type == 'application/json': | |
is_json = True | |
if encoding is None: | |
# Default encoding for HTML is iso-8859-1. | |
# Other content types may have different default encoding, | |
# or in case of binary data, may have no encoding at all. | |
encoding = 'iso-8859-1' | |
print('Assuming encoding is %s' % encoding) | |
if res: | |
if args.verbose: | |
print("response headers:", headers) | |
print("-"*40) | |
print(res.decode('iso-8859-1')) | |
if is_json: | |
js = json.loads(res.decode('iso-8859-1')) | |
for k, v in js.items(): | |
print("{}: {}".format(k, v)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
see pycurl doc at http://pycurl.io/docs/latest/quickstart.html#