Skip to content

Instantly share code, notes, and snippets.

@muhammedfurkan
Last active April 30, 2022 12:08
Show Gist options
  • Save muhammedfurkan/67e57b65c838047cacfa37ec0dc36ed9 to your computer and use it in GitHub Desktop.
Save muhammedfurkan/67e57b65c838047cacfa37ec0dc36ed9 to your computer and use it in GitHub Desktop.
StreamTapeAPI with python fully async
  • install required library

    pip3 install aiohttp pyonize

  • Streamtape API Key / API Login required.

Example Usage:

from pyonize import pyonize


async def main():  # sourcery skip: use-next
    

    login = ''
    key = ''

    streamtape = StreamTapeAPI(login, key)

    folder = await streamtape.folder_content(login, key)
    folders = pyonize(folder).result.folders
    my_upload_folder_id = ""
    for folder in folders:
        if folder.name == 'my_folder_name':
            my_upload_folder_id = folder.id
            break
    url = 'https://t-link.herokuapp.com/dl/5/video.mp4'
    remoteupload = await streamtape.add_remote_upload(login, key, url, folder=my_upload_folder_id)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
import asyncio
import hashlib
import aiohttp
from pyonize import pyonize
class StreamTapeAPI():
def __init__(self, login, key):
"""
login: API-Login required
key: API-Key / API-Password required
"""
self.login = login
self.key = key
if login is None or key is None:
raise Exception('No login or key provided')
# Account
async def account_info(self):
url = f'https://api.streamtape.com/account/info?login={self.login}&key={self.key}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return data
# Stream
async def get_download_ticket(self, file_id):
url = f'https://api.streamtape.com/file/dlticket?file={file_id}&login={self.login}&key={self.key}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return data
async def get_download_link(self, file_id, download_ticket):
url = f'https://api.streamtape.com/file/dl?file={file_id}&ticket={download_ticket}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return data
async def check_file(self, file_id):
url = f'https://api.streamtape.com/file/info?file={file_id}&login={self.login}&key={self.key}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return data
# Upload
async def upload(self, folder_id, file_path):
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
sha256 = sha256_hash.hexdigest()
url = f'https://api.streamtape.com/file/ul?login={self.login}&key={self.key}&folder={folder_id}&sha256={sha256}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
upload_url = data['result']['url']
file = {'file': open(file_path, 'rb')}
async with session.post(upload_url, data=file) as resp:
upload_response = await resp.json()
return upload_response
# Remote upload
async def add_remote_upload(self, file_url, folder_id, name):
url = f'https://api.streamtape.com/remotedl/add?login={self.login}' \
f'&key={self.key}&url={file_url}&folder={folder_id}amp;name={name}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return data
async def remove_remote_upload(self, upload_id):
# To remove all the uploads, enter 'all' instead of an actual upload ID
url = f'https://api.streamtape.com/remotedl/remove?login={self.login}&key={self.key}&id={upload_id}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return data
async def check_remote_upload_status(self, upload_id):
url = f'https://api.streamtape.com/remotedl/status?login={self.login}&key={self.key}&id={upload_id}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return data
# File / folder management
async def folder_content(self):
url = f'https://api.streamtape.com/file/listfolder?login={self.login}&key={self.key}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return data
async def subfolder_content(self, folder_id):
url = f'https://api.streamtape.com/file/listfolder?login={self.login}&key={self.key}&folder={folder_id}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return data
async def create_folder(self, folder_name, parent_folder_id):
url = f'https://api.streamtape.com/file/createfolder?login={self.login}&key={self.key}&name={folder_name}' \
f'&pid={parent_folder_id}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return data
async def rename_folder(self, folder_id, new_name):
url = f'https://api.streamtape.com/file/renamefolder?login={self.login}&key={self.key}&folder={folder_id}&name={new_name}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return data
async def delete_folder(self, folder_id):
url = f'https://api.streamtape.com/file/deletefolder?login={self.login}&key={self.key}&folder={folder_id}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return data
async def rename_file(self, file_id, new_name):
url = f'https://api.streamtape.com/file/rename?login={self.login}&key={self.key}&file={file_id}&name={new_name}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return data
async def move_file(self, file_id, destination_folder_id):
url = f'https://api.streamtape.com/file/rename?login={self.login}&key={self.key}&file={file_id}' \
f'&folder={destination_folder_id}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return data
async def delete_file(self, file_id):
url = f'https://api.streamtape.com/file/delete?login={self.login}&key={self.key}&file={file_id}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return data
# Converting files
async def running_conversions(self):
url = f'https://api.streamtape.com/file/runningconverts?login={self.login}&key={self.key}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return data
async def failed_conversions(self):
url = f'https://api.streamtape.com/file/failedconverts?login={self.login}&key={self.key}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return data
async def get_thumbnail(self, file_id):
url = f'https://api.streamtape.com/file/getsplash?login={self.login}&key={self.key}&file={file_id}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment