taintsastsemgrep

Semgrep sink - how to define a method of a class as a sink only if the instance was initialize in a specific way?


after hours of looking for an answer if that is even possible or looking for examples doing something similar I decided maybe it is time to reach out to get help from smarter people than me.

So I am new to semgrep rules and to SAST tools in general. I am trying to create a semgrep taint rule where the source is as simple as getting the data from input(...). but the tricky thing in this rule is the source. I want to consider a source the method invoke of instances of type AgentExecutor from langchain.agents.agent. the problem is that I want to consider it a sink only in case the agent was initialized this way: "agent = create_sql_agent(...)" and then if I see a call of agent.invoke(...) that will be a sink.

so this code for example (written in python):

from langchain_community.agent_toolkits import create_sql_agent
import os
from langchain_community.utilities import SQLDatabase
from langchain_openai import AzureChatOpenAI
from langchain.agents.agent import AgentExecutor

def get_sql_agent() -> AgentExecutor:
    db = SQLDatabase.from_uri('sqlite:///chinook.db')
    
    azure_openai_endpoint = <AZURE_OPENAI_API_ENDPOINT>
    llm = AzureChatOpenAI(azure_deployment=<deployment_name>,
                        openai_api_version="2023-05-15",
                        azure_endpoint=azure_openai_endpoint,
                        temperature=0,
                        model="gpt-35-turbo",
                        model_kwargs={"top_p": 0},
                        max_retries=3)
    agent_executor = create_sql_agent(llm, db=db, agent_type='openai-tools', verbose=True)

    return agent_executor


def langchain_sql_agent_test():
    sql_agent = get_sql_agent()
    prompt = input("What can I assist you with?:")

    sql_agent.invoke(prompt)

def main():
    langchain_sql_agent_test()

if __name__ == "__main__":
    main()

So in that case I want to match the line (sql_agent.invoke(prompt)) because sql_agent.invoke is a sink since "sql_agent" was returned from "create_sql_agent" But not matter what rule I am trying to do I can't figure out a way to define a sink on that kind of condition I tried something like that:

rules:
  - id: taint-example
    languages:
      - python
    message: Found dangerous sql agent usage.
    mode: taint
    options:
      interfile: true
    pattern-sources:
      - pattern: $DATA = input(...)
    pattern-sinks:
      - patterns:
          - pattern: |
              $AGENT = create_sql_agent(...)
              ...
              return $AGENT
          - pattern: $AGENT.invoke(...)
    severity: WARNING

But I guess that doesn't work because patterns is doing a logical AND between the list of patterns and because both patterns match in my case but not in overlapping/same spot it doesn't really consider it a sink? either way I am not sure what I do need to change to get the result I want. which is define $AGENT.invoke(...) as a sink in case I got $AGENT from the function create_sql_agent which is found at langchain_community.agent_toolkits

Note: the reason I am even trying to write this rule is because the default sql_agent langchain is creating is vulnerable to prompt that can get him to delete data from the DB even though the system prompt is trying to protect that thing. the solution should be to add some layer of query validation before the agent is querying the DB. I only added this note to avoid answers asking why do I even do that instead of trying to help figure out the issue I am having.

Thank you in advance!!


Solution

  • Simple Semgrep taint mode rules work best when the sink you want to match is unconditionally vulnerable.

    When the problem you are trying to solve contains a sink that is only vulnerable when x and y conditions are true, you will want to reach for more advanced taint mode features.

    What I would use here is a feature called taint labels which is only available for the Semgrep Pro engine (documentation). This allows you to label your sources of data. Then, your sinks can be configured to match only when some logical combination of labeled sources enters them.

    In your scenario there are two sources that should enter the sink.

    1. The user input which I will label USER_INPUT.
    2. The Agent initialized with create_sql_agent which I will label SQL_AGENT.

    The sink should only match when:

    1. A method is invoked on the SQL_AGENT.
    2. USER INPUT is an argument to the method.

    Since both labeled sources must be present for the sink to match, the condition added to the sink should be that USER_INPUT and SQL_AGENT must be true.

    The completed rule looks like this:

    rules:
      - id: bengababy-taint-example
        languages:
          - python
        severity: WARNING
        message: Found dangerous sql agent usage.
        mode: taint
        options:
          interfile: true
        pattern-sources:
          - label: USER_INPUT
            pattern: input(...)
          - label: SQL_AGENT
            pattern: $AGENT = create_sql_agent(...)
        pattern-sinks:
          - requires: USER_INPUT and SQL_AGENT
            pattern: $AGENT.invoke(...)
    

    This rule will now identify the taint even when the initialization of the agent is in a different function or file from the sink.

    def getAgent():
        new_agent = get_sql_agent()
        return new_agent
    
    def agent_test_2():
        agent = getAgent()
        prompt = input("What can I assist you with?")
        #ruleid: bengababy-taint-example
        agent.invoke(prompt)
    

    The rule and test code can be reviewed in the Semgrep playground (link)