Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save spyderman4g63/dcd364c354d3f0ff722877336dbb70b7 to your computer and use it in GitHub Desktop.
Save spyderman4g63/dcd364c354d3f0ff722877336dbb70b7 to your computer and use it in GitHub Desktop.
OpenAI Sora 2 Video Generation API Example
import os
from openai import OpenAI
from dotenv import load_dotenv
import time
import requests
# Load environment variables
load_dotenv()
# Initialize OpenAI client
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
def create_video_job(prompt, size="720x1280", seconds=4, model="sora-2"):
"""
Create a video generation job using OpenAI's Sora model.
Args:
prompt (str): The text description of the video to generate
size (str): Video resolution. Supported values:
'720x1280' (portrait 9:16), '1280x720' (landscape 16:9),
'1024x1792' (tall portrait), '1792x1024' (wide landscape)
(default: "720x1280")
seconds (int): Duration of the video in seconds (default: 8)
model (str): Model to use (default: "sora-2")
Returns:
dict: Job information including job_id and status
"""
api_url = "https://api.openai.com/v1/videos"
headers = {
"Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}",
"Content-Type": "application/json"
}
payload = {
"model": model,
"prompt": prompt
}
# Add optional parameters only if they're not None
if size:
payload["size"] = size
if seconds:
payload["seconds"] = str(seconds)
print(f"Creating video generation job...")
print(f"Prompt: {prompt}")
print(f"Resolution: {size}, Duration: {seconds}s")
try:
response = requests.post(api_url, headers=headers, json=payload)
response.raise_for_status()
job_data = response.json()
print(f"✓ Job created successfully!")
print(f"Job ID: {job_data.get('id')}")
print(f"Status: {job_data.get('status')}")
return job_data
except requests.exceptions.HTTPError as e:
print(f"❌ HTTP Error: {e}")
print(f"Response: {e.response.text if hasattr(e, 'response') else 'No response'}")
raise
except Exception as e:
print(f"❌ Error creating video job: {str(e)}")
raise
def poll_job_status(video_id, poll_interval=5, max_wait=300):
"""
Poll the job status until it's completed or failed.
Args:
video_id (str): The video ID to poll
poll_interval (int): Seconds between each poll (default: 5)
max_wait (int): Maximum seconds to wait (default: 300)
Returns:
dict: Final job data when completed
"""
api_url = f"https://api.openai.com/v1/videos/{video_id}"
headers = {
"Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}"
}
print(f"\nPolling job status (checking every {poll_interval}s)...")
start_time = time.time()
while True:
try:
response = requests.get(api_url, headers=headers)
response.raise_for_status()
job_data = response.json()
status = job_data.get('status')
progress = job_data.get('progress', 0)
elapsed = int(time.time() - start_time)
print(f"Status: {status} | Progress: {progress}% | Elapsed: {elapsed}s", end='\r')
if status == "completed" or status == "succeeded":
print(f"\n✓ Video generation completed!")
return job_data
elif status == "failed" or status == "error":
print(f"\n❌ Video generation failed!")
error_msg = job_data.get('error', 'Unknown error')
print(f"Error: {error_msg}")
raise Exception(f"Video generation failed: {error_msg}")
elif status in ["queued", "processing", "running", "in_progress"]:
elapsed_time = time.time() - start_time
if elapsed_time > max_wait:
raise TimeoutError(f"Job did not complete within {max_wait} seconds")
# If progress is 100% but status is still in_progress, check more frequently
if progress == 100 and status == "in_progress":
print(f"\n⏳ Progress at 100%, waiting for final status change...", end='\r')
time.sleep(2) # Check more frequently when at 100%
else:
time.sleep(poll_interval)
else:
print(f"\n⚠️ Unknown status: {status}")
time.sleep(poll_interval)
except requests.exceptions.HTTPError as e:
print(f"\n❌ HTTP Error: {e}")
print(f"Response: {e.response.text if hasattr(e, 'response') else 'No response'}")
raise
except Exception as e:
if isinstance(e, (TimeoutError, Exception)):
raise
print(f"\n❌ Error polling job status: {str(e)}")
raise
def download_video_from_url(video_url, output_path="output.mp4"):
"""
Download video from a direct URL.
Args:
video_url (str): Direct URL to the video file
output_path (str): Path where to save the video
Returns:
str: Path to the downloaded video file
"""
print(f"\nDownloading video from URL to {output_path}...")
try:
response = requests.get(video_url, stream=True)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
percent = (downloaded / total_size) * 100
print(f"Progress: {percent:.1f}%", end='\r')
print(f"\n✓ Video saved successfully to {output_path}")
return output_path
except requests.exceptions.HTTPError as e:
print(f"\n❌ HTTP Error: {e}")
raise
except Exception as e:
print(f"\n❌ Error downloading video: {str(e)}")
raise
def download_video(video_id, output_path="output.mp4"):
"""
Download the generated video to a local file.
Args:
video_id (str): The video ID from the completed job
output_path (str): Path where to save the video
Returns:
str: Path to the downloaded video file
"""
# Use the /content endpoint to get the actual video file
api_url = f"https://api.openai.com/v1/videos/{video_id}/content"
headers = {
"Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}"
}
print(f"\nDownloading video to {output_path}...")
try:
response = requests.get(api_url, headers=headers, stream=True)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
percent = (downloaded / total_size) * 100
print(f"Progress: {percent:.1f}%", end='\r')
print(f"\n✓ Video saved successfully to {output_path}")
return output_path
except requests.exceptions.HTTPError as e:
print(f"\n❌ HTTP Error: {e}")
print(f"Response: {e.response.text if hasattr(e, 'response') else 'No response'}")
raise
except Exception as e:
print(f"\n❌ Error downloading video: {str(e)}")
raise
def generate_video(prompt, size="720x1280", seconds=8, model="sora-2", output_path=None, poll_interval=5, max_wait=1000, post_completion_delay=10):
"""
Complete workflow to generate and download a video.
Args:
prompt (str): The text description of the video to generate
size (str): Video resolution. Supported: '720x1280' (9:16), '1280x720' (16:9), '1024x1792', '1792x1024'
seconds (int): Duration of the video in seconds supported: 4,8,12 (default: 4)
model (str): Model to use: supported: sora-2, sora-2-pro (default: "sora-2")
output_path (str): Path where to save the video (auto-generated if None)
poll_interval (int): Seconds between status checks (default: 5)
max_wait (int): Maximum seconds to wait for completion (default: 300)
post_completion_delay (int): Seconds to wait after completion before downloading (default: 10)
Returns:
dict: Contains job_data, video_id, and video_path
"""
# Step 1: Create the job
job_data = create_video_job(prompt, size, seconds, model)
video_id = job_data.get('id')
if not video_id:
raise Exception("No video ID returned from API")
print(f"Video ID: {video_id}")
# Step 2: Poll until completed
completed_job = poll_job_status(video_id, poll_interval, max_wait)
# Print the completed job data to see what's available
print(f"\n📋 Completed job data:")
import json
print(json.dumps(completed_job, indent=2))
# Step 2.5: Wait for video file to be fully available at download endpoint
if post_completion_delay > 0:
print(f"\n⏳ Waiting {post_completion_delay} seconds for video file to be ready...")
time.sleep(post_completion_delay)
# Step 3: Download the video
if not output_path:
timestamp = time.strftime("%Y%m%d_%H%M%S")
output_path = f"sora_video_{timestamp}.mp4"
# Check if there's a direct URL in the response
if 'url' in completed_job:
print(f"\n✓ Found video URL in response: {completed_job['url']}")
video_path = download_video_from_url(completed_job['url'], output_path)
elif 'download_url' in completed_job:
print(f"\n✓ Found download URL in response: {completed_job['download_url']}")
video_path = download_video_from_url(completed_job['download_url'], output_path)
else:
# Try the download endpoint
video_path = download_video(video_id, output_path)
return {
'job_data': completed_job,
'video_id': video_id,
'video_path': video_path,
'prompt': prompt
}
def main():
"""
Main function to demonstrate video generation with Sora-2.
"""
# Use the first prompt as default
prompt = "A golden retriever surfing a teal wave at sunrise, cinematic, slow camera dolly."
# Video parameters
size = "720x1280" # Options: '720x1280' (portrait 9:16), '1280x720' (landscape 16:9), '1024x1792' (tall portrait), '1792x1024' (wide landscape)
seconds = 4 # options 4,8,12
model = "sora-2" # options sora-2, sora-2-pro
print("=" * 70)
print("OpenAI Sora-2 Video Generation")
print("=" * 70)
try:
# Generate video
result = generate_video(
prompt=prompt,
size=size,
seconds=seconds,
model=model
)
print("\n" + "=" * 70)
print("=== Video Generation Complete ===")
print("=" * 70)
print(f"Prompt: {result['prompt']}")
print(f"Video ID: {result['video_id']}")
print(f"Video saved to: {result['video_path']}")
print("=" * 70)
except Exception as e:
print(f"\n❌ Failed to generate video: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment