I'm creating an automation script for Google Workspace where it fetches an direct children of some organization unit and then fetches children of all these OUs at the same time. While searching the web for the answer whether multiprocessing, threading or asynchronous processing will work best for me I understood that asyncio is going to help me with this issue. I have created a class Google Tenant
which holds the connection to google api and the fetched users.
However, my problem right now is that the script still is not asynchronous but it works in sequence rather than making the calls asynchronously
from google.oauth2 import service_account
from googleapiclient.discovery import build
import logging
import asyncio
class GoogleTenant:
def __init__(self, api: str, version: str):
config: ScriptConfig = ScriptConfig()
credentials = service_account.Credentials.from_service_account_file(config["gcloud"]["keypath"],
scopes=SCOPES)
delegated_credentials = credentials.with_subject(config["gcloud"]["subject"])
self.service = build(api, version, credentials=delegated_credentials)
self.users_list = []
def fetch_users(self):
users_ous = self.service.orgunits().list(customerId="my_customer", orgUnitPath="/example", type="children", fields="organizationUnits/orgUnitPath").execute()
ous_list = [ou["orgUnitPath"] for ou in users_ous["organizationUnits"]]
asyncio.run(self._crawl_ous(ous_list))
async def _crawl_ous(self, ous: list):
crawling_result = await asyncio.gather(*[asyncio.create_task(self._fetch_users_from_ou(ou)) for ou in ous])
for result in crawling_result:
logging.info(f"Crawling result of ou {result[0]["organizations"][0]["department"]}: {len(result)}")
self.users_list.extend(result)
async def _fetch_users_from_ou(self, ou):
call_parameters = {
"customer": "my_customer",
"maxResults": 500,
"projection": "basic",
"query": f"orgUnitPath='{str(ou)}'",
"fields": "users/id,users/name,users/primaryEmail,users/suspended,users/emails,users/organizations/primary,users/organizations/department,users/recoveryEmail,nextPageToken"
}
logger.debug(f"Fetching users from {ou}")
users_from_ou = self.service.users().list(**call_parameters).execute()
user_fetching_result: list = users_from_ou["users"]
logger.debug(f"Initial fetch from {ou}: {len(users_from_ou["users"])}")
if "nextPageToken" in users_from_ou:
next_page_token = users_from_ou["nextPageToken"]
else:
return user_fetching_result
while True:
users_from_ou = self.service.users().list(**call_parameters, pageToken=next_page_token).execute()
logger.debug(f"Next fetch from {ou}: {len(users_from_ou["users"])}")
user_fetching_result.extend(users_from_ou["users"])
if "nextPageToken" in users_from_ou:
next_page_token = users_from_ou["nextPageToken"]
else:
return user_fetching_result
if __name__ == '__main__':
google_tenant = GoogleTenant("admin", "directory_v1")
google_tenant.fetch_users()
The execution result of the following:
DEBUG:root:Fetching users from /example/child1
DEBUG:root:Initial fetch from /example/child1: 500
DEBUG:root:Next fetch from /example/child1: 500
DEBUG:root:Next fetch from /example/child1: 500
DEBUG:root:Next fetch from /example/child1: 258
DEBUG:root:Fetching users from /example/child2
DEBUG:root:Initial fetch from /example/child2: 500
DEBUG:root:Next fetch from /example/child2: 500
DEBUG:root:Next fetch from /example/child2: 500
...
I've tried to enter the await statement in places however I seem to misunderstand how it should as per my understanding the await statement makes the function wait for the result before continuing function execution. How can I make python execute these concurrently?
I reformatted parts of the code as per @Michael Butscher suggestion and also added my imports in the previous block
def fetch_users(self):
users_ous = self.service.orgunits().list(customerId="my_customer", orgUnitPath="/example", type="children", fields="organizationUnits/orgUnitPath").execute()
ous_list = [ou["orgUnitPath"] for ou in users_ous["organizationUnits"]]
logger.debug(f"Fetched and sanitized ous: {pprint.pformat(ous_list)}")
asyncio.run(self._crawl_ous(ous_list))
async def _crawl_ous(self, ous: list):
tasks = [self._crawler_proxy(ou) for ou in ous]
crawling_result = await asyncio.gather(*tasks)
for result in crawling_result:
logger.info(f"Crawling result: {len(result)}")
self.users_list.extend(result)
async def _crawler_proxy(self, *args, **kwargs):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._fetch_users_from_ou(*args, **kwargs))
Once again thank you @Juris and @Michael Butscher for the provided suggestions and help. In the end I used different approach. The simplest solution in this case turned out to be multiprocessing
. I used the multiprocessing.Pool().map()
function. Below is the modified code.
def fetch_users(self) -> None:
config = ScriptConfig()
users_ous = self.service.orgunits().list(customerId="my_customer", orgUnitPath="/example", type="children", fields="organizationUnits/orgUnitPath").execute()
ous_list = [ou["orgUnitPath"] for ou in users_ous["organizationUnits"]]
logger.debug(f"Fetched and sanitized ous: {pprint.pformat(ous_list)}")
self._crawl_ous(ous_list)
logger.info(f"Fetched users: {len(self.users_list)}")
def _crawl_ous(self, ous: list[str]) -> None:
users_list = []
with multiprocessing.Pool(processes=30) as pool:
crawling_results = pool.map(self._fetch_users_from_ou, ous)
for result in crawling_results:
logger.info(f"Crawling result: {len(result)}")
self.users_list.extend(result)
def _fetch_users_from_ou(self, ou):
call_parameters = {
"customer": "my_customer",
"maxResults": 500,
"projection": "basic",
"query": f"orgUnitPath='{str(ou)}'",
"fields": "users/id,users/name,users/primaryEmail,users/suspended,users/emails,users/organizations/primary,users/organizations/department,users/recoveryEmail,nextPageToken"
}
logger.info(f"Fetching users from {ou}")
users_from_ou: dict = self.service.users().list(**call_parameters).execute()
logger.debug(f"Initial fetch from {ou}: {len(users_from_ou["users"])}")
user_fetching_result: list = users_from_ou["users"]
if "nextPageToken" in users_from_ou:
next_page_token: str = users_from_ou["nextPageToken"]
else:
return user_fetching_result
while True:
users_from_ou: dict = self.service.users().list(**call_parameters, pageToken=next_page_token).execute()
logger.debug(f"Next fetch from {ou}: {len(users_from_ou["users"])}")
user_fetching_result.extend(users_from_ou["users"])
if "nextPageToken" in users_from_ou:
next_page_token: str = users_from_ou["nextPageToken"]
else:
return user_fetching_result