pythonoauthoauth-2.0deviantart-api

OAuth and redirect_uri in offline Python script


I'm currently trying to write a Python script that will use Deviantart's API to automatically shuffle my favourites. To do that I need to first log in in my script. Deviantart uses OAuth2 authentication, which requires a redirect_uri, which as I understand it is supposed to be the server where my application is running.

However I'm running the script locally on my computer (not on a server) and just sending http requests via Python's Requests library. How do I then authenticate, when the OAuth procedure sends the code required for the authentication token as a parameter of a GET call to the redirect_uri, which points to nowhere in particular for me? Is there no way to authenticate without running a server?

EDIT

My problem is still that I'm running a simple offline script, and I'm not sure how to authenticate from it.

This is my authentication code so far:

import binascii, os, requests

def auth():
    request = 'https://www.deviantart.com/oauth2/authorize'
    state = binascii.hexlify(os.urandom(160))
    params = {
        'response_type': 'token',
        'client_id': 1,
        'redirect_uri': 'https://localhost:8080',
        'state': state,
        'scope': 'user'
    }
    r = requests.get(request, params)
    print(r)

The printed response is simply a 200 HTTP code, rather than an access token (obviously, since the username and password haven't been entered anywhere). The request is sent to DA's authorisation page, but since the page itself doesn't actually open in my script, I can't enter my username and password to log in anywhere. And I can't directly send the username and password in the GET request to authenticate that way either (again obviously, since it would be a terrible idea to send the password like that).

Preferably I'd like a way to simply have the user (me) prompted for the username and password in the console that the script is running in and then have the script continue executing after the user has successfully logged in.

Alternatively, if the above is not possible, the script should open the authorisation webpage in a browser, and then continue execution once the user logs in.

How would I go about realising either of these two solutions in Python?


Solution

  • Per request, I'm updating this question with the code I ended up using for my script's authentication in the hope that it helps somebody.

    import webbrowser
    import requests
    import urllib.parse
    import binascii
    import os
    import time
    from http.server import HTTPServer, BaseHTTPRequestHandler
    
    AUTH = 'https://www.deviantart.com/oauth2/authorize'
    TOKEN = 'https://www.deviantart.com/oauth2/token'
    
    code = ''
    state = binascii.hexlify(os.urandom(20)).decode('utf-8')
    
    
    class Communicator:
        def __init__(self):
            self.client_id = '<insert-actual-id>'  # You get these two from the DA developer API page
            self.client_secret = '<insert-actual-secret>'  # but it's safer if you store them in a separate file
            self.server, self.port = 'localhost', 8080
            self._redirect_uri = f'http://{self.server}:{self.port}'
            self._last_request_time = 0
    
        def auth(self, *args):
            scope = ' '.join(args)
    
            params = {
                'response_type': 'code',
                'client_id': self.client_id,
                'redirect_uri': self._redirect_uri,
                'scope': scope,
                'state': state
            }
            request = requests.Request('GET', AUTH, params).prepare()
            request.prepare_url(AUTH, params)
            webbrowser.open(request.url)
            server = HTTPServer((self.server, self.port), RequestHandler)
            server.handle_request()
    
            params = {
                'client_id': self.client_id,
                'client_secret': self.client_secret,
                'grant_type': 'authorization_code',
                'code': code,
                'redirect_uri': self._redirect_uri
            }
            self._get_token(params)
    
        def _get_token(self, params):
            r = requests.get(TOKEN, params).json()
            self.token = r['access_token']
            self.refresh_token = r['refresh_token']
    
        def _refresh_token(self):
            params = {
                'client_id': self.client_id,
                'client_secret': self.client_secret,
                'grant_type': 'refresh_token',
                'refresh_token': self.refresh_token
            }
            self._get_token(params)
    
        def _request(self, func, url, params, sleep=5, cooldown=600):
            t = time.time()
            if t - self._last_request_time < sleep:
                time.sleep(sleep - t + self._last_request_time)
            self._last_request_time = t
    
            max_sleep = 16 * sleep
    
            params['access_token'] = self.token
            while True:
                try:
                    r = func(url, params).json()
                    if 'error_code' in r and r['error_code'] == 429:
                        sleep *= 2
                        time.sleep(sleep)
                        if sleep > max_sleep:
                            raise ConnectionError("Request timed out - server is busy.")
                    elif 'error' in r and r['error'] == 'user_api_threshold':
                        raise ConnectionError("Too many requests")
                    elif 'error' in r and r['error'] == 'invalid_token':
                        print("Refreshing token.")
                        self._refresh_token()
                        params['access_token'] = self.token
                    else:
                        return r
                except ConnectionError:
                    print(f"Request limit reached - waiting {cooldown // 60} minutes before retrying...")
                    time.sleep(cooldown)
    
        def get(self, url, params):
            return self._request(requests.get, url, params)
    
        def post(self, url, params):
            return self._request(requests.post, url, params)
    
    
    class RequestHandler(BaseHTTPRequestHandler):
        def do_GET(self):
            global code
            self.close_connection = True
            query = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
            if not query['state'] or query['state'][0] != state:
                raise RuntimeError("state argument missing or invalid")
            code = query['code']
    
    
    BROWSE = 'browse'
    BROWSE_MORE_LIKE_THIS = 'browse.mlt'
    COLLECTION = 'collection'
    COMMENT = 'comment.post'
    FEED = 'feed'
    GALLERY = 'gallery'
    MESSAGE = 'message'
    NOTE = 'note'
    PUBLISH = 'publish'
    STASH = 'stash'
    USER = 'user'
    USER_MANAGE = 'user.manage'
    
    if __name__ == '__main__':
        com = Communicator()
        com.auth(BROWSE, COLLECTION)  # request specific permissions
        ...  # do stuff with com.get() and com.post() requests