I am trying to create a python script that will crawl Azure DevOps project for a file, and download it locally. However, I'm running into an issue where making the request to download the file isn't working due the request being "dangerous".
Error:
Failed to clone file 'mkdocs.yml' from repository 'crawl-ado'
Response: 400
{"$id":"1","innerException":null,"message":"A potentially dangerous Request.Path value
was detected from the client (:).","typeName":"System.Web.HttpException,
System.Web","typeKey":"HttpException","errorCode":0,"eventId":0}
My current strategy is to pull a list of repos through an Azure PAT token and make a json request to see if those repos contain the file. If they do, I want to create a new file and download them to my local environment. Is there something off with my logic here, or am I trying to do something like this the wrong way? Thank you in advance!
crawl.py:
import os
import requests
import base64
from azure.devops.connection import Connection
from msrest.authentication import BasicAuthentication
# Replace these variables with your Azure DevOps organization, project, and personal access token (PAT)
organization = "https://dev.azure.com/MYORG"
project = os.getenv('PROJECT')
pat = os.getenv('PAT')
file_path = "mkdocs.yml" # Replace with the path to the specific file you want to clone
# Check to see if PROJECT & PAT are set.
if not project:
print("Please set the PROJECT environment variable!")
exit(1)
if not pat:
print("Please set the PAT environment variable!")
exit(1)
else:
print("PAT is set!")
print("***************************************")
# Create a connection to the Azure DevOps organization
credentials = BasicAuthentication('', pat)
connection = Connection(base_url=organization, creds=credentials)
# Get a client for the Git service
git_client = connection.clients.get_git_client()
# Get a list of repositories in the project
repos = git_client.get_repositories(project=project)
# Store the repository names in an array
repo_names = [repo.name for repo in repos]
# Display the repository names and download the file from each repository
print("Repositories in project '{}':".format(project))
for repo_name in repo_names:
print(repo_name)
url = f'https://dev.azure.com/{organization}/{project}/_apis/git/repositories/{repo_name}/items?path={file_path}&api-version=7.1'
headers = {
'Authorization': f'Basic {base64.b64encode(f":{pat}".encode()).decode()}'
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
file_content = response.text
with open(file_path, 'w') as file:
file.write(file_content)
print(f"File '{file_path}' cloned successfully from repository '{repo_name}'")
else:
print(f"Failed to clone file '{file_path}' from repository '{repo_name}'")
print(f"Response: {response.status_code}")
print(response.text)
Me and a coworker have found a solution. Instead of using the ADO URL we have changed it to a git URL to clone the repos instead. This will clone any file that is inside an organization and is not limited to one project.
import os
import requests
import json
import base64
import yaml
import subprocess
import urllib.parse
# Define variables
organization = os.getenv('ORG')
file_path = os.getenv('FILE_NAME')
ado_access_token = os.getenv('PAT')
# Check if the environment variable is set
if not ado_access_token:
raise EnvironmentError("The environment variable 'PAT' is not set. Please set it to your Azure DevOps personal access token.")
exit(1)
if not file_path:
raise EnvironmentError("The environment variable 'FILE_NAME' is not set. Please set it to the file path of the file you want to search for.")
exit(1)
# Construct the URL
alm_url = f'https://almsearch.dev.azure.com/{organization}/_apis/search/codesearchresults?api-version=7.1'
# Set up the headers
headers = {
'Content-Type': 'application/json',
'Authorization': f'Basic {base64.b64encode(f":{ado_access_token}".encode()).decode()}'
}
# Create the request body
body = {
"searchText": file_path,
"$top": 1000 # Set the limit for the number of files to be found.
}
# Make the POST request
response = requests.post(alm_url, headers=headers, data=json.dumps(body))
# Initialize the results dictionary
results_dict = {}
# Check the response
if response.status_code == 200:
search_results = response.json()
print(f'Found {search_results.get("count", 0)} results...')
print(f'Only using {file_path} files in the root of the repository...')
for result in search_results.get('results',[]):
# Get the project name, repo name, and file path
project_name = result.get('project', {}).get('name')
repo_name = result.get('repository', {}).get('name')
result_file_path = result.get('path', 'N/A')
if os.path.basename(result_file_path) != file_path:
print(f'Skipping file {result_file_path} in repository {repo_name}...')
continue
print(f'Processing {project_name}/{repo_name}/{result_file_path}...')
# Create git clone URL
encoded_project_name = urllib.parse.quote(project_name)
git_clone_url = f'https://{ado_access_token}@dev.azure.com/{organization}/{encoded_project_name}/_git/{repo_name}'
if project_name not in results_dict:
results_dict[project_name] = {}
results_dict[project_name][repo_name] = git_clone_url
# Define the target directory for cloning
clone_dir = os.path.join(project_name, repo_name)
os.makedirs(clone_dir, exist_ok=True)
# Run the git clone command
clone_command = ['git', 'clone', git_clone_url, clone_dir]
subprocess.run(clone_command, check=True)
# Move mkdocs.yml to the root of the repo folder
destination_root = os.path.join(project_name, repo_name)
destination_doc = os.path.join(destination_root, 'docs')
os.makedirs(destination_root, exist_ok=True)
source_mkdocs = os.path.join(clone_dir, result_file_path)
if os.path.exists(source_mkdocs):
os.rename(source_mkdocs, os.path.join(destination_root, file_path))
# Move all .md files to docs folder in the repo folder
os.makedirs(os.path.join(destination_root), exist_ok=True)
# Define the staging directory
staging_directory = destination_root
# Write results to a .yml file
with open(os.path.join(staging_directory, 'repos.yml'), 'w') as yaml_file:
yaml.dump(results_dict, yaml_file, default_flow_style=False)
else:
print(f'Error: {response.status_code}')
print(response.text)