The documentation for Analytics Engine provides a link to generate a IAM access tokens using the CLI, but I need to generate the token with an API call. This is the CLI approach:
bx api https://api.ng.bluemix.net
bx login
<enter your credentials>
<If you are part of multiple IBM Cloud accounts, you'll be asked to choose an account for the current session. Also, you'll need to choose an organization and space in IBM Cloud.>
bx iam oauth-tokens
The documentation also states that the Cloud Foundry API is deprecated? How can I generate the IAM access tokens?
Here is the code I created in the end ...
Some utility classes for logging and exceptions:
import requests
import json
from datetime import datetime, timedelta
import logging
import os
class Logger:
def __init__(self):
format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(format=format)
self.ch = logging.StreamHandler()
def get_logger(self, clazz):
logger = logging.getLogger(clazz)
logger.setLevel(os.getenv("LOG_LEVEL", logging.INFO))
return logger
class CloudFoundryException(Exception):
def __init__(self, message, *args):
self.message = message
super(CloudFoundryException, self).__init__(message, *args)
Then a class to do the main work:
class CloudFoundryAPI(object):
def __init__(self, api_key=None, api_key_filename=None, api_endpoint='https://api.ng.bluemix.net', provision_poll_timeout_mins=30):
self.log = Logger().get_logger(self.__class__.__name__)
self.provision_poll_timeout_mins = provision_poll_timeout_mins
assert api_key is not None or api_key_filename is not None, "You must provide a value for api_key or for api_key_filename"
# allow tests to override the api_key_filename parameter
if hasattr(CloudFoundryAPI, 'api_key_filename') and CloudFoundryAPI is not None:
api_key_filename = CloudFoundryAPI.api_key_filename
if api_key_filename is not None:
try:
with open(api_key_filename, 'r') as api_file:
d = json.load(api_file)
try:
self.api_key = d['apikey']
except KeyError:
# The attibute name used to be
self.api_key = d['apiKey']
except:
self.log.error('Error retrieving "apiKey" from file {}'.format(api_key_filename))
raise
else:
self.api_key = api_key
self.api_endpoint = api_endpoint
self.info = self._get_info()
def auth(self):
self.log.debug('Authenticating to CloudFoundry')
url = self.info['authorization_endpoint'] + '/oauth/token'
headers = {
'Content-Type': 'application/x-www-form-urlencoded;charset=utf-8',
'Accept': 'application/x-www-form-urlencoded;charset=utf-8',
'Authorization': 'Basic Y2Y6'
}
data = 'grant_type=password&username=apikey&password={}'.format(self.api_key)
try:
response = requests.post(url, headers=headers, data=data)
response.raise_for_status()
except requests.exceptions.RequestException as e:
self.log.error('Cloud Foundry Auth Response: ' + response.text)
# TODO we should define a custom application exception for this
raise
self.auth_token = response.json()
self.expires_at = datetime.now() + timedelta(seconds=self.auth_token['expires_in']/60)
self.log.debug('Authenticated to CloudFoundry')
def oidc_token(self):
self.log.debug('Retrieving IAM token')
url='https://iam.bluemix.net/identity/token'
data="grant_type=urn:ibm:params:oauth:grant-type:apikey&apikey={}".format(self.api_key)
try:
response = requests.post(url, data=data)
response.raise_for_status()
except requests.exceptions.RequestException as e:
self.log.debug('IAM token response: ' + response.text)
raise
self.oidc_token = response.json()
self.oidc_expires_at = datetime.now() + timedelta(seconds=self.oidc_token['expires_in']/60)
self.log.debug('Retrieved IAM token')
return self.oidc_token
def get_auth_token(self):
if not hasattr(self, 'auth_token') or not hasattr(self, 'expires_at') or datetime.now() > self.expires_at:
self.auth()
return self.auth_token
def get_oidc_token(self):
if not hasattr(self, 'oidc_token') or not hasattr(self, 'oidc_expires_at') or datetime.now() > self.oidc_expires_at:
self.oidc_token()
return self.oidc_token
def _request_headers(self):
auth_token = self.get_auth_token()
access_token = auth_token['access_token']
token_type = auth_token['token_type']
headers = {
'accept': 'application/json',
'authorization': '{} {}'.format(token_type, access_token),
'cache-control': 'no-cache',
'content-type': 'application/json'
}
return headers
def _request(self, url, http_method='get', data=None, description='', create_auth_headers=True):
if create_auth_headers:
headers = self._request_headers()
else:
headers = {}
try:
if http_method == 'get':
response = requests.get(url, headers=headers)
elif http_method == 'post':
response = requests.post(url, headers=headers, data=json.dumps(data))
elif http_method == 'delete':
response = requests.delete(url, headers=headers)
response.raise_for_status()
except requests.exceptions.RequestException as e:
self.log.error('{} : {} {} : {} {}'.format(description, http_method, url, response.status_code, response.text))
raise CloudFoundryException(message=response.text)
try:
self.log.debug('{} : {} {} : {} {}'.format(description, http_method, url, response.status_code, json.dumps(response.json())))
except ValueError:
self.log.debug('{} : {} {} : {} {}'.format(description, http_method, url, response.status_code, response.text))
return response
def _get_info(self):
url = '{}/v2/info'.format(self.api_endpoint)
response = self._request(url=url, http_method='get', description='_get_info', create_auth_headers=False)
return response.json()
You can then use it like so:
cf = CloudFoundryAPI(api_key="xxxx") # or pass api_key_filename
cf.get_auth_token() # get UAA token
cf.get_oidc_token() # get IAM token