I have a couple of custom provider components in my projects and I've written a function to return the credentials of the current Pulumi login context. Here is my function:
def get_access_token(endpoint: Optional[str] = None) -> Tuple[str, str, str]:
access_token = authorization.get_client_token(endpoint=endpoint).token
client_config = authorization.get_client_config()
tenant_id = client_config.tenant_id
subscription_id = client_config.subscription_id
return tenant_id, subscription_id, access_token
However, I get the following error during pulumi up
which is directly caused by the above function:
C:\Workspace\vsts-agent-win-x64-2.191.1\_work\39\s\Infrastructure\venv\lib\site-packages\grpc\_server.py:457: RuntimeWarning: coroutine 'invoke.<locals>.do_rpc' was never awaited
return None, False
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
pulumi-python:dynamic/keyvault:AccessPolicy (kv-access-policy-pulumi-developer):
error: Exception calling application: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.
pulumi-python:dynamic/keyvault:Certificate (kv-certificate-auto-ssl-certificate):
error: Exception calling application: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.
I expected for the pulumi_azure_native.authorization
methods to work properly as mentioned in their documents. However, somehow, I can't find an easy way to access the current Pulumi login context within my codes.
Since get_client_token
returns an Awaitable
, I should've made my method (get_access_token
) an async method and added some await
s:
async def get_access_token(endpoint: Optional[str] = None) -> Tuple[str, str, str]:
access_token = (await authorization.get_client_token(endpoint=endpoint)).token
client_config = await authorization.get_client_config()
tenant_id = client_config.tenant_id
subscription_id = client_config.subscription_id
return tenant_id, subscription_id, access_token