google-cloud-platformgoogle-oauthgoogle-authenticationservice-accounts

How do I create an Access Token from Service Account Credentials using REST API?


I have created a Service Account in Google Cloud Platform and downloaded the Private Key in JSON format. I am trying to create a Compute resource via REST API. For authentication purpose, I need an AccessToken which needs to be set as a Header of create compute resource REST API. Is there a REST API to get the Access Token from the Private Key (Without using SDK or Google Clients)?


Solution

  • The following example shows you several important steps to call Google Cloud APIs without using an SDK in Python. Similar code works in just about any language (c#, java, php, nodejs).

    Change the source code with the filename of your service account Json file, your Google Zone and your Project ID.

    This example will list the instances in one zone for the specified project. From this example you will know the framework to call an API to create GCE instances.

    This code will show you how to:

    1. How to load service account credentials from a Json file.
    2. How to extract the Private Key used to sign requests.
    3. How to create a JWT (Json Web Token) for Google Oauth 2.0.
    4. How to set the Google Scopes (permissions).
    5. How to sign a JWT to create a Signed-JWT (JWS).
    6. How to exchange the Signed-JWT for a Google OAuth 2.0 Access Token.
    7. How to set the expiration time. This program defaults to 3600 seconds (1 Hour).
    8. How to call a Google API and set the Authorization Header.
    9. How to process the returned Json results and display the name of each instance.

    Example program in Python 3.x:

        '''
        This program lists lists the Google Compute Engine Instances in one zone
        '''
        # Author: John Hanley
        # https://www.jhanley.com
        
        import time
        import json
        import jwt
        import requests
        import httplib2
        
        # Project ID for this request.
        project = 'development-123456'
        
        # The name of the zone for this request.
        zone = 'us-west1-a'
        
        # Service Account Credentials, Json format
        json_filename = 'service-account.json'
        
        # Permissions to request for Access Token
        scopes = "https://www.googleapis.com/auth/cloud-platform"
        
        # Set how long this token will be valid in seconds
        expires_in = 3600   # Expires in 1 hour
    
        def load_json_credentials(filename):
            ''' Load the Google Service Account Credentials from Json file '''
        
            with open(filename, 'r') as f:
                data = f.read()
        
            return json.loads(data)
    
        def load_private_key(json_cred):
            ''' Return the private key from the json credentials '''
        
            return json_cred['private_key']
    
        def create_signed_jwt(pkey, pkey_id, email, scope):
            ''' Create a Signed JWT from a service account Json credentials file
            This Signed JWT will later be exchanged for an Access Token '''
        
            # Google Endpoint for creating OAuth 2.0 Access Tokens from Signed-JWT
            auth_url = "https://www.googleapis.com/oauth2/v4/token"
        
            issued = int(time.time())
            expires = issued + expires_in   # expires_in is in seconds
        
            # Note: this token expires and cannot be refreshed. The token must be recreated
        
            # JWT Headers
            additional_headers = {
                    'kid': pkey_id,
                    "alg": "RS256",
                    "typ": "JWT"    # Google uses SHA256withRSA
            }
        
            # JWT Payload
            payload = {
                "iss": email,       # Issuer claim
                "sub": email,       # Issuer claim
                "aud": auth_url,    # Audience claim
                "iat": issued,      # Issued At claim
                "exp": expires,     # Expire time
                "scope": scope      # Permissions
            }
        
            # Encode the headers and payload and sign creating a Signed JWT (JWS)
            sig = jwt.encode(payload, pkey, algorithm="RS256", headers=additional_headers)
        
            return sig
    
        def exchangeJwtForAccessToken(signed_jwt):
            '''
            This function takes a Signed JWT and exchanges it for a Google OAuth Access Token
            '''
        
            auth_url = "https://www.googleapis.com/oauth2/v4/token"
        
            params = {
                "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
                "assertion": signed_jwt
            }
        
            r = requests.post(auth_url, data=params)
        
            if r.ok:
                return(r.json()['access_token'], '')
        
            return None, r.text
        
        def gce_list_instances(accessToken):
            '''
            This functions lists the Google Compute Engine Instances in one zone
            '''
        
            # Endpoint that we will call
            url = "https://www.googleapis.com/compute/v1/projects/" + project + "/zones/" + zone + "/instances"
        
            # One of the headers is "Authorization: Bearer $TOKEN"
            headers = {
                "Host": "www.googleapis.com",
                "Authorization": "Bearer " + accessToken,
                "Content-Type": "application/json"
            }
        
            h = httplib2.Http()
        
            resp, content = h.request(uri=url, method="GET", headers=headers)
        
            status = int(resp.status)
        
            if status < 200 or status >= 300:
                print('Error: HTTP Request failed')
                return
        
            j = json.loads(content.decode('utf-8').replace('\n', ''))
        
            print('Compute instances in zone', zone)
            print('------------------------------------------------------------')
            for item in j['items']:
                print(item['name'])
        
        if __name__ == '__main__':
            cred = load_json_credentials(json_filename)
        
            private_key = load_private_key(cred)
        
            s_jwt = create_signed_jwt(
                    private_key,
                    cred['private_key_id'],
                    cred['client_email'],
                    scopes)
        
            token, err = exchangeJwtForAccessToken(s_jwt)
        
            if token is None:
                print('Error:', err)
                exit(1)
        
            gce_list_instances(token)
    

    For more information visit my blog. I write articles like this and publish the source code to help others understand how to write software for the cloud.

    www.jhanley.com