pythonparallel-processingmultiprocessinglarge-language-modelanthropic

Run Anthropic API in parallel


I successfully ran OpenAI GPT4o in parallel with multiprocessing:

def llm_query(chunk):
    context, query = get_prompt_synonyms()
    input1, output1 = get_example()
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": context},
            {"role": "user", "content": f'Input data is {input1}' + ' ' + query},
            {"role": "assistant", "content": output1},
            {"role": "user", "content": f'Input data is {chunk}' + ' ' + query}
        ],
        temperature=0,
        max_tokens=4090
    )
    reply = response.choices[0].message.content
    return reply

def description_from_llm(chunk):
    reply = llm_query(chunk)
    df_synonyms_codes = get_frame_llm(reply) # reply to dataframe
    return df_synonyms_codes



if __name__ == '__main__':
    # some stuff
    with Pool(processes=cpu_count()) as pool:
             freeze_support()
             dfs_syn = pool.map(description_from_llm, list_chunks)
         df_final = pd.concat(dfs_syn)
         pool.close()

It runs (locally) really fast without any issues. However when I try to do the same thing with Anthropic Claude 3.5 (I made sure that I imported all needed updated packages and have valid key etc.):

def llm_query(chunk, temperature=0, max_tokens=4096):
    model = "claude-3-5-sonnet-20240620"
    data = "Input data for analysis and enrichment: {x}".format(x=list_benefits_chunk)
    context, query = get_query()
    examples = get_few_shot_learning()
    messages = get_messages(context, data, query, examples)
    response = client.messages.create(
        model=model,
        messages=messages,
        temperature=temperature,
        max_tokens=max_tokens
    )
    return response

It doesn't work with exception:

TypeError: APIStatusError.__init__() missing 2 required keyword-only arguments: 'response' and 'body'

It works in loop:

   df_all = pd.DataFrame() 
    for chunk in list_chunks: 
        df= llm_query(chunk) 
        df_all = pd.concat[df_all, df],axis=0)

But too slow!

Is there a way to parallelize calls to anthropic API? Or other solution that will reduce time x7 - x10 (as mp does with GPT4o)?


Solution

  • I usually use ThreadPoolExecutor.

    Minimal example

    from anthropic import Anthropic
    from concurrent.futures import ThreadPoolExecutor
    
    TEMPERATURE = 0.5
    CLAUDE_SYSTEM_MESSAGE = "You are a helpful AI assistant."
    
    anthropic_client = Anthropic(api_key=ANTHROPIC_API_KEY)
    
    def call_anthropic(
        prompt,
        model_id="claude-3-haiku-20240307",
        temperature=TEMPERATURE,
        system=CLAUDE_SYSTEM_MESSAGE,
    ):
        try:
            message = anthropic_client.messages.create(
                model=model_id,
                temperature=temperature,
                max_tokens=4096,
                system=system,
                messages=[
                    {
                        "role": "user",
                        "content": prompt,
                    }
                ],
            )
            return message.content[0].text
    
        except Exception as e:
            print(f"Error: {e}")
            return None
    
    BASE_PROMPT = "What is the capital of {country}?"
    COUNTRIES = ["Switzerland", "Sweden", "Sri Lanka", "Spain"]
    
    prompts = [BASE_PROMPT.format(country=country) for country in COUNTRIES]
    with ThreadPoolExecutor(max_workers=4) as executor:
        responses = list(executor.map(call_anthropic, prompts))
    
    print(responses)
    

    Output

    ['The capital of Switzerland is Bern.',
     'The capital of Sweden is Stockholm.',
     'The capital of Sri Lanka is Colombo.',
     'The capital of Spain is Madrid.']
    

    Adjust max_workers to the limits to what your tier allows to speed up parallel processing. This depends on the token count of your prompts and probably needs a little bit of experimentation in order to avoid hitting the API limits.