Skip to content

Instantly share code, notes, and snippets.

@sounishnath003
Last active February 19, 2025 12:34
Show Gist options
  • Save sounishnath003/e5210b84b62fcf1d00c5b9dde66b4c94 to your computer and use it in GitHub Desktop.
Save sounishnath003/e5210b84b62fcf1d00c5b9dde66b4c94 to your computer and use it in GitHub Desktop.
Easy and Simple way to generate the Access token on behalf of GSA using golang
// Description: Google OAuth2.0 Authentication
// Test the Google OAuth2.0 Authentication
// Get the Access y29 OAuth token
// Documentation: https://developers.google.com/identity/protocols/oauth2/service-account#:~:text=Preparing%20to%20make%20a%20delegated%20API%20call
package main
/**
=======================================================
===== Google OAuth2.0 Authentication =====
==== Test the Google OAuth2.0 Authentication ====
=======================================================
# Get the Access y29 OAuth token
curl -d 'grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&assertion=<JWT_TOKEN>' https://oauth2.googleapis.com/token
# Validate the ID Token on the Google OAuth2.0 server
curl "https://oauth2.googleapis.com/tokeninfo?id_token=<ID_TOKEN>"
# Validate the Access Token on the Google OAuth2.0 server
curl "https://oauth2.googleapis.com/tokeninfo?access_token=<ACCESS_TOKEN>"
# Get the list of buckets
curl -X GET -H "Authorization: Bearer <ACCESS_TOKEN>" \
"https://storage.googleapis.com/storage/v1/b?project=sounish-cloud-workstation"
=======================================================
**/
import (
"bytes"
"context"
"crypto/rsa"
"crypto/x509"
"encoding/json"
"encoding/pem"
"fmt"
"io"
"log"
"net/http"
"os"
"time"
"github.com/golang-jwt/jwt/v5"
)
// GoogleServiceAccount holds service account information.
type GoogleServiceAccount struct {
Type string `json:"type"`
AuthURI string `json:"auth_uri"`
PrivateKey string `json:"private_key"`
ClientEmail string `json:"client_email"`
TokenURI string `json:"token_uri"`
ProjectID string `json:"project_id"`
}
// LoadGoogleServiceAccount loads the service account JSON file.
func LoadGoogleServiceAccount(filename string) (*GoogleServiceAccount, error) {
data, err := os.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
}
var serviceAccount GoogleServiceAccount
if err := json.Unmarshal(data, &serviceAccount); err != nil {
return nil, fmt.Errorf("failed to parse service account JSON: %w", err)
}
log.Println("Google Service Account loaded successfully")
return &serviceAccount, nil
}
// CreateJWT creates a signed JWT for the service account.
func (sa *GoogleServiceAccount) CreateJWT(audience string) (string, error) {
block, _ := pem.Decode([]byte(sa.PrivateKey))
if block == nil {
return "", fmt.Errorf("failed to parse PEM block containing the private key")
}
privateKey, err := x509.ParsePKCS8PrivateKey(block.Bytes)
if err != nil {
return "", fmt.Errorf("failed to parse private key: %w", err)
}
claims := jwt.MapClaims{
"iss": sa.ClientEmail,
"sub": sa.ClientEmail,
"aud": sa.TokenURI,
"exp": time.Now().Add(time.Hour).Unix(),
"iat": time.Now().Unix(),
// When generating the AccessToken Use Scope as it will be within the GCP services
"scope": "https://www.googleapis.com/auth/cloud-platform",
// When you are interacting with Deployed Services on GCP using SA Authentication use target_audience
"target_audience": audience, // Due to this, the ID token will be generated.
}
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
signedToken, err := token.SignedString(privateKey.(*rsa.PrivateKey))
if err != nil {
return "", fmt.Errorf("failed to sign JWT: %w", err)
}
return signedToken, nil
}
// GetIdentityToken exchanges the JWT for an identity token.
func (sa *GoogleServiceAccount) GetIdentityToken(audience string) (string, error) {
jwtToken, err := sa.CreateJWT(audience)
if err != nil {
return "", fmt.Errorf("failed to create JWT: %w", err)
}
body := map[string]string{
"grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
"assertion": jwtToken,
}
bodyBytes, _ := json.Marshal(body)
req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, sa.TokenURI, bytes.NewReader(bodyBytes))
if err != nil {
return "", fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("failed to send HTTP request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
respBody, _ := io.ReadAll(resp.Body)
return "", fmt.Errorf("failed to get identity token: %s", string(respBody))
}
var result struct {
IDToken string `json:"id_token"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return "", fmt.Errorf("failed to parse response: %w", err)
}
log.Println("Identity token retrieved successfully")
return result.IDToken, nil
}
// CallCloudRun calls the Cloud Run service using the identity token.
func CallCloudRun(url, idToken string) error {
req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, url, bytes.NewReader([]byte("{}")))
if err != nil {
return fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+idToken)
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send HTTP request: %w", err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
log.Printf("Cloud Run Response: %d\n%s", resp.StatusCode, string(body))
return nil
}
func main() {
filename := "/Users/sounishnath/sounish-cloud-workstation-ac143dfffa26.json"
cloudRunURL := "https://hello-797087556919.asia-south1.run.app"
// Load service account
sa, err := LoadGoogleServiceAccount(filename)
if err != nil {
log.Fatalf("Error loading service account: %v", err)
}
// Get identity token
idToken, err := sa.GetIdentityToken(cloudRunURL)
if err != nil {
log.Fatalf("Error getting identity token: %v", err)
}
log.Println("Identity Token: ", idToken)
}
@sounishnath003
Copy link
Author

sounishnath003 commented Jan 25, 2025

Validate the Access Token Output using googleapis.com:

image

Get the cloud storage buckets using the Access Token:

image

@sounishnath003
Copy link
Author

Hope it helps ๐Ÿ‘ ๐Ÿ˜„ !

@sounishnath003
Copy link
Author

Generating Access Token for Google Service Account

Step-by-Step Guide

1. Create a Google Service Account:

  1. Go to the Google Cloud Console.
  2. Create a new project or select an existing one.
  3. Navigate to "IAM & Admin" > "Service Accounts".
  4. Create a new service account and download the JSON key file.

2. Install Required Libraries:

Depending on your programming language, install the necessary libraries to handle JWT and HTTP requests.

3. Load the Service Account JSON:

Read the JSON file containing the service account credentials.

4. Create a JWT Token:

  1. Use the service account's private key to sign a JWT token.
  2. Include the required claims such as iss, scope, aud, exp, and iat.

5. Request an Access Token:

  1. Make an HTTP POST request to https://oauth2.googleapis.com/token with the JWT token.
  2. Include the grant_type and assertion parameters in the request body.

6. Validate the Access Token:

Optionally, validate the access token by making a GET request to https://oauth2.googleapis.com/tokeninfo?access_token=<ACCESS_TOKEN>.

7. Use the Access Token:

Use the access token to authenticate API requests to Google Cloud services.

Example commands:

# Get the Access Token
curl -d 'grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&assertion=<JWT_TOKEN>' https://oauth2.googleapis.com/token

# Validate the Access Token
curl "https://oauth2.googleapis.com/tokeninfo?access_token=<ACCESS_TOKEN>"

# Get the list of buckets
curl -X GET -H "Authorization: Bearer <ACCESS_TOKEN>" \
  "https://storage.googleapis.com/storage/v1/b?project=<PROJECT_ID>"

@sounishnath003
Copy link
Author

sounishnath003 commented Feb 19, 2025

Perform The IDENTITY TOKEN generation using Python 3.11

"""
@author: github.com/sounishnath003
"""

import json
import jwt
from datetime import datetime, timedelta
import requests

from dataclasses import dataclass

def LOG(*data):
    print("[LOG]:", *data)

@dataclass
class GoogleServiceAccount:
    type: str
    project_id: str
    private_key_id: str
    private_key: str
    client_email: str
    client_id: str
    auth_uri: str
    token_uri: str
    auth_provider_x509_cert_url: str
    client_x509_cert_url: str
    universe_domain: str


def read_google_service_account(filepath) -> GoogleServiceAccount:
    obj=json.loads(open(filepath, "rb").read())
    return GoogleServiceAccount(**obj)
    

def create_jwt_token(google_service_account:GoogleServiceAccount, target_audience:str):
    current_time = datetime.now()
    claims = {
        "iss": google_service_account.client_email,
        "sub": google_service_account.client_email,
        "aud": google_service_account.token_uri,
        "exp":  datetime.timestamp(current_time + timedelta(hours=1)),
        "iat": datetime.timestamp(current_time),
        "target_audience": target_audience, # use to generate the IDENTITY token
        # "scope": "https://www.googleapis.com/auth/cloud-platform", # use to generate the ACCESS token
    }
    LOG(claims)

    jwt_token=jwt.encode(claims, key=google_service_account.private_key, algorithm=jwt.get_algorithm_by_name("RS256"), headers={"alg": "RS256", "typ": "jwt"})

    return jwt_token


def generate_id_token(jwt_token:str):
    # curl -d 'grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&assertion=<JWT_TOKEN>' https://oauth2.googleapis.com/token

    body_payload={
        "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
        "assertion": jwt_token,
    }
    resp = requests.post("https://oauth2.googleapis.com/token", data=body_payload, timeout=60)
    resp.raise_for_status()

    return resp.json()

def call_google_cloud_run_service(service_url:str, authorization_token:str):
    headers = {
        "Authorization": f"Bearer {authorization_token}",
        "Content-Type": "data/html"
    }
    resp=requests.get(url=service_url, headers=headers)
    resp.raise_for_status()

    return resp.text


if __name__ == "__main__":
    GOOGLE_CLOUDRUN_DEPLOYED_SRVC_URL="https://hello-797087556919.asia-south1.run.app/"
    LOG(10 * "+==========+")
    LOG("GOOGLE_CLOUDRUN_DEPLOYED_SRVC_URL", GOOGLE_CLOUDRUN_DEPLOYED_SRVC_URL)
    LOG(10 * "+==========+")

    service_account=read_google_service_account("/Users/sounishnath/sounish-cloud-workstation-ac143dfffa26.json")
    
    token=create_jwt_token(service_account, target_audience=GOOGLE_CLOUDRUN_DEPLOYED_SRVC_URL)
    LOG(10 * "+==========+")
    LOG("jwt_token", token)
    LOG(10 * "+==========+")

    access_token=generate_id_token(token)
    LOG(10 * "+==========+")
    LOG("access_token", access_token)
    LOG(10 * "+==========+")


    LOG(10 * "+==========+")
    data=call_google_cloud_run_service(service_url=GOOGLE_CLOUDRUN_DEPLOYED_SRVC_URL, authorization_token=access_token["id_token"])
    LOG("resp_data", data)
    LOG(10 * "+==========+")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment