Last active
February 19, 2025 12:34
-
-
Save sounishnath003/e5210b84b62fcf1d00c5b9dde66b4c94 to your computer and use it in GitHub Desktop.
Easy and Simple way to generate the Access token on behalf of GSA using golang
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Description: Google OAuth2.0 Authentication | |
// Test the Google OAuth2.0 Authentication | |
// Get the Access y29 OAuth token | |
// Documentation: https://developers.google.com/identity/protocols/oauth2/service-account#:~:text=Preparing%20to%20make%20a%20delegated%20API%20call | |
package main | |
/** | |
======================================================= | |
===== Google OAuth2.0 Authentication ===== | |
==== Test the Google OAuth2.0 Authentication ==== | |
======================================================= | |
# Get the Access y29 OAuth token | |
curl -d 'grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&assertion=<JWT_TOKEN>' https://oauth2.googleapis.com/token | |
# Validate the ID Token on the Google OAuth2.0 server | |
curl "https://oauth2.googleapis.com/tokeninfo?id_token=<ID_TOKEN>" | |
# Validate the Access Token on the Google OAuth2.0 server | |
curl "https://oauth2.googleapis.com/tokeninfo?access_token=<ACCESS_TOKEN>" | |
# Get the list of buckets | |
curl -X GET -H "Authorization: Bearer <ACCESS_TOKEN>" \ | |
"https://storage.googleapis.com/storage/v1/b?project=sounish-cloud-workstation" | |
======================================================= | |
**/ | |
import ( | |
"bytes" | |
"context" | |
"crypto/rsa" | |
"crypto/x509" | |
"encoding/json" | |
"encoding/pem" | |
"fmt" | |
"io" | |
"log" | |
"net/http" | |
"os" | |
"time" | |
"github.com/golang-jwt/jwt/v5" | |
) | |
// GoogleServiceAccount holds service account information. | |
type GoogleServiceAccount struct { | |
Type string `json:"type"` | |
AuthURI string `json:"auth_uri"` | |
PrivateKey string `json:"private_key"` | |
ClientEmail string `json:"client_email"` | |
TokenURI string `json:"token_uri"` | |
ProjectID string `json:"project_id"` | |
} | |
// LoadGoogleServiceAccount loads the service account JSON file. | |
func LoadGoogleServiceAccount(filename string) (*GoogleServiceAccount, error) { | |
data, err := os.ReadFile(filename) | |
if err != nil { | |
return nil, fmt.Errorf("failed to read file: %w", err) | |
} | |
var serviceAccount GoogleServiceAccount | |
if err := json.Unmarshal(data, &serviceAccount); err != nil { | |
return nil, fmt.Errorf("failed to parse service account JSON: %w", err) | |
} | |
log.Println("Google Service Account loaded successfully") | |
return &serviceAccount, nil | |
} | |
// CreateJWT creates a signed JWT for the service account. | |
func (sa *GoogleServiceAccount) CreateJWT(audience string) (string, error) { | |
block, _ := pem.Decode([]byte(sa.PrivateKey)) | |
if block == nil { | |
return "", fmt.Errorf("failed to parse PEM block containing the private key") | |
} | |
privateKey, err := x509.ParsePKCS8PrivateKey(block.Bytes) | |
if err != nil { | |
return "", fmt.Errorf("failed to parse private key: %w", err) | |
} | |
claims := jwt.MapClaims{ | |
"iss": sa.ClientEmail, | |
"sub": sa.ClientEmail, | |
"aud": sa.TokenURI, | |
"exp": time.Now().Add(time.Hour).Unix(), | |
"iat": time.Now().Unix(), | |
// When generating the AccessToken Use Scope as it will be within the GCP services | |
"scope": "https://www.googleapis.com/auth/cloud-platform", | |
// When you are interacting with Deployed Services on GCP using SA Authentication use target_audience | |
"target_audience": audience, // Due to this, the ID token will be generated. | |
} | |
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims) | |
signedToken, err := token.SignedString(privateKey.(*rsa.PrivateKey)) | |
if err != nil { | |
return "", fmt.Errorf("failed to sign JWT: %w", err) | |
} | |
return signedToken, nil | |
} | |
// GetIdentityToken exchanges the JWT for an identity token. | |
func (sa *GoogleServiceAccount) GetIdentityToken(audience string) (string, error) { | |
jwtToken, err := sa.CreateJWT(audience) | |
if err != nil { | |
return "", fmt.Errorf("failed to create JWT: %w", err) | |
} | |
body := map[string]string{ | |
"grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer", | |
"assertion": jwtToken, | |
} | |
bodyBytes, _ := json.Marshal(body) | |
req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, sa.TokenURI, bytes.NewReader(bodyBytes)) | |
if err != nil { | |
return "", fmt.Errorf("failed to create HTTP request: %w", err) | |
} | |
req.Header.Set("Content-Type", "application/json") | |
client := &http.Client{} | |
resp, err := client.Do(req) | |
if err != nil { | |
return "", fmt.Errorf("failed to send HTTP request: %w", err) | |
} | |
defer resp.Body.Close() | |
if resp.StatusCode != http.StatusOK { | |
respBody, _ := io.ReadAll(resp.Body) | |
return "", fmt.Errorf("failed to get identity token: %s", string(respBody)) | |
} | |
var result struct { | |
IDToken string `json:"id_token"` | |
} | |
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { | |
return "", fmt.Errorf("failed to parse response: %w", err) | |
} | |
log.Println("Identity token retrieved successfully") | |
return result.IDToken, nil | |
} | |
// CallCloudRun calls the Cloud Run service using the identity token. | |
func CallCloudRun(url, idToken string) error { | |
req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, url, bytes.NewReader([]byte("{}"))) | |
if err != nil { | |
return fmt.Errorf("failed to create HTTP request: %w", err) | |
} | |
req.Header.Set("Authorization", "Bearer "+idToken) | |
req.Header.Set("Content-Type", "application/json") | |
client := &http.Client{} | |
resp, err := client.Do(req) | |
if err != nil { | |
return fmt.Errorf("failed to send HTTP request: %w", err) | |
} | |
defer resp.Body.Close() | |
body, _ := io.ReadAll(resp.Body) | |
log.Printf("Cloud Run Response: %d\n%s", resp.StatusCode, string(body)) | |
return nil | |
} | |
func main() { | |
filename := "/Users/sounishnath/sounish-cloud-workstation-ac143dfffa26.json" | |
cloudRunURL := "https://hello-797087556919.asia-south1.run.app" | |
// Load service account | |
sa, err := LoadGoogleServiceAccount(filename) | |
if err != nil { | |
log.Fatalf("Error loading service account: %v", err) | |
} | |
// Get identity token | |
idToken, err := sa.GetIdentityToken(cloudRunURL) | |
if err != nil { | |
log.Fatalf("Error getting identity token: %v", err) | |
} | |
log.Println("Identity Token: ", idToken) | |
} |
Hope it helps ๐ ๐ !
Generating Access Token for Google Service Account
Step-by-Step Guide
1. Create a Google Service Account:
- Go to the Google Cloud Console.
- Create a new project or select an existing one.
- Navigate to "IAM & Admin" > "Service Accounts".
- Create a new service account and download the JSON key file.
2. Install Required Libraries:
Depending on your programming language, install the necessary libraries to handle JWT and HTTP requests.
3. Load the Service Account JSON:
Read the JSON file containing the service account credentials.
4. Create a JWT Token:
- Use the service account's private key to sign a JWT token.
- Include the required claims such as
iss
,scope
,aud
,exp
, andiat
.
5. Request an Access Token:
- Make an HTTP POST request to
https://oauth2.googleapis.com/token
with the JWT token. - Include the
grant_type
andassertion
parameters in the request body.
6. Validate the Access Token:
Optionally, validate the access token by making a GET request to https://oauth2.googleapis.com/tokeninfo?access_token=<ACCESS_TOKEN>
.
7. Use the Access Token:
Use the access token to authenticate API requests to Google Cloud services.
Example commands:
# Get the Access Token
curl -d 'grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&assertion=<JWT_TOKEN>' https://oauth2.googleapis.com/token
# Validate the Access Token
curl "https://oauth2.googleapis.com/tokeninfo?access_token=<ACCESS_TOKEN>"
# Get the list of buckets
curl -X GET -H "Authorization: Bearer <ACCESS_TOKEN>" \
"https://storage.googleapis.com/storage/v1/b?project=<PROJECT_ID>"
Perform The IDENTITY TOKEN generation using Python 3.11
"""
@author: github.com/sounishnath003
"""
import json
import jwt
from datetime import datetime, timedelta
import requests
from dataclasses import dataclass
def LOG(*data):
print("[LOG]:", *data)
@dataclass
class GoogleServiceAccount:
type: str
project_id: str
private_key_id: str
private_key: str
client_email: str
client_id: str
auth_uri: str
token_uri: str
auth_provider_x509_cert_url: str
client_x509_cert_url: str
universe_domain: str
def read_google_service_account(filepath) -> GoogleServiceAccount:
obj=json.loads(open(filepath, "rb").read())
return GoogleServiceAccount(**obj)
def create_jwt_token(google_service_account:GoogleServiceAccount, target_audience:str):
current_time = datetime.now()
claims = {
"iss": google_service_account.client_email,
"sub": google_service_account.client_email,
"aud": google_service_account.token_uri,
"exp": datetime.timestamp(current_time + timedelta(hours=1)),
"iat": datetime.timestamp(current_time),
"target_audience": target_audience, # use to generate the IDENTITY token
# "scope": "https://www.googleapis.com/auth/cloud-platform", # use to generate the ACCESS token
}
LOG(claims)
jwt_token=jwt.encode(claims, key=google_service_account.private_key, algorithm=jwt.get_algorithm_by_name("RS256"), headers={"alg": "RS256", "typ": "jwt"})
return jwt_token
def generate_id_token(jwt_token:str):
# curl -d 'grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&assertion=<JWT_TOKEN>' https://oauth2.googleapis.com/token
body_payload={
"grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
"assertion": jwt_token,
}
resp = requests.post("https://oauth2.googleapis.com/token", data=body_payload, timeout=60)
resp.raise_for_status()
return resp.json()
def call_google_cloud_run_service(service_url:str, authorization_token:str):
headers = {
"Authorization": f"Bearer {authorization_token}",
"Content-Type": "data/html"
}
resp=requests.get(url=service_url, headers=headers)
resp.raise_for_status()
return resp.text
if __name__ == "__main__":
GOOGLE_CLOUDRUN_DEPLOYED_SRVC_URL="https://hello-797087556919.asia-south1.run.app/"
LOG(10 * "+==========+")
LOG("GOOGLE_CLOUDRUN_DEPLOYED_SRVC_URL", GOOGLE_CLOUDRUN_DEPLOYED_SRVC_URL)
LOG(10 * "+==========+")
service_account=read_google_service_account("/Users/sounishnath/sounish-cloud-workstation-ac143dfffa26.json")
token=create_jwt_token(service_account, target_audience=GOOGLE_CLOUDRUN_DEPLOYED_SRVC_URL)
LOG(10 * "+==========+")
LOG("jwt_token", token)
LOG(10 * "+==========+")
access_token=generate_id_token(token)
LOG(10 * "+==========+")
LOG("access_token", access_token)
LOG(10 * "+==========+")
LOG(10 * "+==========+")
data=call_google_cloud_run_service(service_url=GOOGLE_CLOUDRUN_DEPLOYED_SRVC_URL, authorization_token=access_token["id_token"])
LOG("resp_data", data)
LOG(10 * "+==========+")
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Validate the Access Token Output using googleapis.com:
Get the cloud storage buckets using the Access Token: