pythonopensearchelasticsearch-pyaws-data-wrangleramazon-opensearch

How can I bulk upload JSON records to AWS OpenSearch index using a python client library?


I have a sufficiently large dataset that I would like to bulk index the JSON objects in AWS OpenSearch.

I cannot see how to achieve this using any of: boto3, awswrangler, opensearch-py, elasticsearch, elasticsearch-py.

Is there a way to do this without using a python request (PUT/POST) directly?

Note that this is not for: ElasticSearch, AWS ElasticSearch.

Many thanks!


Solution

  • I finally found a way to do it using opensearch-py, as follows.

    First establish the client,

    # First fetch credentials from environment defaults
    # If you can get this far you probably know how to tailor them
    # For your particular situation. Otherwise SO is a safe bet :)
    import boto3
    credentials = boto3.Session().get_credentials()
    region='eu-west-2' # for example
    auth = AWSV4SignerAuth(credentials, region)
    
    # Now set up the AWS 'Signer'
    from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
    auth = AWSV4SignerAuth(credentials, region)
    
    # And finally the OpenSearch client
    host=f"...{region}.es.amazonaws.com" # fill in your hostname (minus the https://) here
    client = OpenSearch(
        hosts = [{'host': host, 'port': 443}],
        http_auth = auth,
        use_ssl = True,
        verify_certs = True,
        connection_class = RequestsHttpConnection
    )
    
    

    Phew! Let's create the data now:

    # Spot the deliberate mistake(s) :D
    document1 = {
        "title": "Moneyball",
        "director": "Bennett Miller",
        "year": "2011"
    }
    
    document2 = {
        "title": "Apollo 13",
        "director": "Richie Cunningham",
        "year": "1994"
    }
    
    data = [document1, document2]
    

    TIP! Create the index if you need to -

    my_index = 'my_index'
    
    try:
        response = client.indices.create(my_index)
        print('\nCreating index:')
        print(response)
    except Exception as e:
        # If, for example, my_index already exists, do not much!
        print(e)
    

    This is where things go a bit nutty. I hadn't realised that every single bulk action needs an, er, action e.g. "index", "search" etc. - so let's define that now

    action={
        "index": {
            "_index": my_index
        }
    }
    

    You can read all about the bulk REST API, there.

    The next quirk is that the OpenSearch bulk API requires Newline Delimited JSON (see https://www.ndjson.org), which is basically JSON serialized as strings and separated by newlines. Someone wrote on SO that this "bizarre" API looked like one designed by a data scientist - far from taking offence, I think that rocks. (I agree ndjson is weird though.)

    Hideously, now let's build up the full JSON string, combining the data and actions. A helper fn is at hand!

    def payload_constructor(data,action):
        # "All my own work"
    
        action_string = json.dumps(action) + "\n"
    
        payload_string=""
    
        for datum in data:
            payload_string += action_string
            this_line = json.dumps(datum) + "\n"
            payload_string += this_line
        return payload_string
    
    

    OK so now we can finally invoke the bulk API. I suppose you could mix in all sorts of actions (out of scope here) - go for it!

    response=client.bulk(body=payload_constructor(data,action),index=my_index)
    

    That's probably the most boring punchline ever but there you have it.

    You can also just get (geddit) .bulk() to just use index= and set the action to:

    action={"index": {}}
    

    Hey presto!

    Now, choose your poison - the other solution looks crazily shorter and neater.

    PS The well-hidden opensearch-py documentation on this are located here.