pythonnumpyevalgrammar

Grammar for combinations of Numpy arrays


For a specific application, I do a GUI to manipulate some data (internally: numpy 1D arrays), and plot them.

The end-user can choose in the UI to plot various series a, b, c.

Now I also need to allow a "custom combination" of a, b, c. More precisely, the user (who doesn't know Python/Numpy, but can learn a few keywords) should enter in a GUI textbox a "formula", and then my program should transcribe this into real numpy code (probaly using eval(...), here few security problem because the end-user is the only user), and plot the data.

Examples of end-user input:

a * 3 + 1.234 * c - d
a + b.roll(2)
a + b / b.max() * a.max()

For example, the allowed syntax is: basic arithmetic (+ * - / and parentheses), float numbers, a.max(), and a.roll(3) to shift the arrays.

Question: is there a function inside Numpy or Scipy to provide such a way to interpret combinations of arrays with a basic arithmetic grammar?


Solution

  • For the algebraic part you can use the numexpr library can take of that. For example the following snippet will work:

    import numpy as np
    import numexpr as ne
    
    a = np.random.rand(10)
    b = np.random.rand(10)
    c = np.random.rand(10)
    d = np.random.rand(10)
    
    ne.evaluate("a * 3 + 1.234 * c - d")
    

    Sadly the library does not cover the other two cases straight away, but that can be easily achieved with some string parsing. A final version with all your features might look like this:

    import numpy as np
    import numexpr as ne
    import re
    
    a = np.random.rand(10)
    b = np.random.rand(10)
    c = np.random.rand(10)
    d = np.random.rand(10)
    
    def expression_eval(
        expression:str, a:np.array, b: np.array, c:np.array, d:np.array
    ) -> np.array:
    
        #Snippet to manage max values:
        a_max = a.max()
        b_max = b.max()
        c_max = c.max()
        d_max = d.max()
    
        for label in ["a", "b", "c", "d"]:
            expression = expression.replace(f"{label}.max()", f"{label}_max")
    
        #Snippet to manage rolling windows:
        pattern = r'(\w)\.roll\((\d+)\)'
    
        matches = re.findall(pattern, expression)
        if matches: roll_results = [(match[0], int(match[1])) for match in matches]
        else: roll_results = []
    
        rolls = {}
    
        for arr, window in roll_results:
            expression = expression.replace(f"{arr}.roll({window})", f"{arr}_roll_{window}")
            rolls[f"{arr}_roll_{window}"] = np.concatenate([
                vars()[arr][window:],
                np.zeros(window)
            ])
    
        return ne.evaluate(expression, global_dict=rolls)
    
    #Evaluation:
    
    expression_1 = "a * 3 + 1.234 * c - d"
    expression_2 = "a + b / b.max() * a.max()"
    expression_3 = "a + b.roll(3) + c.roll(2) + d.roll(4)"
    
    print(f"{expression_1}\n{expression_eval(expression_1, a, b, c, d)}\n")
    print(f"{expression_2}\n{expression_eval(expression_2, a, b, c, d)}\n")
    print(f"{expression_3}\n{expression_eval(expression_3, a, b, c, d)}\n")
    

    Essentialy we are replacing each function for a variable with its computed value before evalauting the algebraic expression. Note that for the rolling windows we can use a dictionary with a more dynamic approach to adapt to the many possibilities of the rolling windows.

    Updated(30-03-2024)

    @cards asked in the comments if this code can actually handle some nested expressions. The answer is it cannot. However, we can extend this basic prototype to handle more complex expression such as expression_4. The numexpr library already handles nesting with algebraic expressions, and we can allow some extra nesting capacities such as taking max, mins and rolls of custom expressions by precomputing the nested expressions, replacing them with in the final expression and pass the value of the tag to the final evaluation.

    import numpy as np
    import numexpr as ne
    import re
    
    a = np.random.rand(10)
    b = np.random.rand(10)
    c = np.random.rand(10)
    d = np.random.rand(10)
    
    def expression_eval(
        expression:str, a:np.array, b: np.array, c:np.array, d:np.array
    ) -> np.array:
        
        variable_dict = {"a":a, "b":b, "c":c, "d":d}
        
        #Snippet to evaluate inner algebraic expressions:
        pattern = r'\(.*?\)(?:\.max\(\)|\.min\(\)|\.roll\(.*\))'
        matches = list(set(re.findall(pattern, expression)))
    
        for expr_ind, match in enumerate(matches):
            expression = re.sub(re.escape(match), f"expr_{expr_ind}", expression)
            variable_dict[f"expr_{expr_ind}"] = ne.evaluate(expr_ind)
    
        #Snippet to manage max values:
        pattern = r'(\w)\.max\(\)'
        matches = re.findall(pattern, expression)
    
        for match in matches:
            expression = expression.replace(f"{match}.max()", f"{match}_max")
            variable_dict[f"{match}_max"] = variable_dict[match].max()
    
        #Snippet to manage min values:
        pattern = r'(\w)\.min\(\)'
        matches = re.findall(pattern, expression)
    
        for match in matches:
            expression = expression.replace(f"{match}.max()", f"{match}_max")
            variable_dict[f"{match}_max"] = variable_dict[match].max()
            
        #Snippet to manage rolling windows:
        pattern = r'(\w)\.roll\((\d+)\)'
    
        matches = re.findall(pattern, expression)
        if matches: roll_results = [(match[0], int(match[1])) for match in matches]
        else: roll_results = []
    
        for arr, window in roll_results:
            expression = expression.replace(f"{arr}.roll({window})", f"{arr}_roll_{window}")
            variable_dict[f"{arr}_roll_{window}"] = np.concatenate([
                vars()[arr][window:],
                np.zeros(window)
            ])
    
        return ne.evaluate(expression, global_dict=variable_dict)
    
    #Evaluation:
    
    expression_1 = "a * 3 + 1.234 * c - d"
    expression_2 = "a + b / b.max() * a.max()"
    expression_3 = "a + b.roll(3) + c.roll(2) + d.roll(4)"
    expression_4 = "((a+b)**3).min()) + ((c-d)*5).roll(3)"
    
    print(f"{expression_1}\n{expression_eval(expression_1, a, b, c, d)}\n")
    print(f"{expression_2}\n{expression_eval(expression_2, a, b, c, d)}\n")
    print(f"{expression_3}\n{expression_eval(expression_3, a, b, c, d)}\n")
    print(f"{expression_4}\n{expression_eval(expression_3, a, b, c, d)}\n")