pythonsqlitecs50

When reading data into a sqlite database using a formatted statement, I can generate what looks to be a proper statement, but it won't execute


This is a web application I intend to submit as the final project in the cs50 online programming course, using:

The goal of the project is to provide on-the-fly grading capability to my faculty (I teach an unrelated course): The faculty log into the application, select which student team they wish to grade, are shown images of the students in that team and have a form via which to submit grades. The input and output are csv spreadsheets.

To make implementation easier (I hoped), I'm using the SQL object provided by that class: https://github.com/cs50/python-cs50/blob/main/src/cs50/sql.py The nature of this SQL object is that it won't interact directly with pandas; dataframe.to_sql won't run. Checking the source code, it appears that the necessary sql functionality isn't present in the SQL object. Thus, I am manually iterating over each row of the csv.

Each session's grades generate four columns: attendence, preparation, participation, and result; the column names are prepended to the lab session (e.g. attendence_lab_1). Once a faculty member submits grades, the data is read back to a temporary csv file, then the temp file is renamed with the name of the original csv, overwriting it. For the next grading session, I recreate the database by reading in from the csv. This means that I can't hardcode my column headers; I need to read the headers from the csv, generate a list from that, then create an INSET INTO statement from that list of headers. Using the answers at Python sqlite3, create table columns from a list with help of .format as a model, I've been able to get my headers into the table. But when I iterate over the rows I get errors:

I think I understand what these types of errors are, in a general sense, but I'm not understanding how I'm generating them.

The code:

# https://pandas.pydata.org/docs/reference/io.html
df = pd.read_csv(stpath, header=0)
#df.to_sql(students, con)
headers = df.columns.values.tolist() # draw list of headers from df

# create table via prepared statement that scales with number of columns in table
def createTableStatement(tableName, columnList):
    return f"CREATE TABLE IF NOT EXISTS {tableName} (id INTEGER, {columnList[0]}" + (", {}"*(len(columnList)-1)).format(*(columnList[1:])) + ", PRIMARY KEY(id))"
db.execute(createTableStatement("students", headers))

# read data into table
def createRowStatement(columnList):
    return f'"INSERT INTO students ({columnList[0]}' + (', {}'*(len(columnList)-1)).format(*(columnList[1:])) + ') VALUES (?' + (', ?'*(len(columnList)-1)) + ')", row["Student"]' + (', row["{}"]'*(len(columnList)-1)).format(*(columnList[1:])) + ')'


with open(stpath, "r", encoding='utf-8-sig') as file:
    reader = csv.DictReader(file)
    for row in reader:
        x = createRowStatement(headers)
        print(x)
        db.execute(x)

An earlier version of the code substituted {columnList[0]} for row["Student"]' in the INSERT INTO statement. This resulted in the generated sql query:

"INSERT INTO students (Student, Team, Session, Pod, Image, Lab1_Grade, Lab1_attend, Lab1_prep, Lab1_part, Lab1_disect) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", {columnList[0]}, row["Team"], row["Session"], row["Pod"], row["Image"], row["Lab1_Grade"], row["Lab1_attend"], row["Lab1_prep"], row["Lab1_part"], row["Lab1_disect"])

Changing {columnList[0]} to row["Student"]' generated the appropriate list of headers:

"INSERT INTO students (Student, Team, Session, Pod, Image, Lab1_Grade, Lab1_attend, Lab1_prep, Lab1_part, Lab1_disect) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", row["Student"], row["Team"], row["Session"], row["Pod"], row["Image"], row["Lab1_Grade"], row["Lab1_attend"], row["Lab1_prep"], row["Lab1_part"], row["Lab1_disect"])

I suspect the error is somehow related to this behavior - am I missing a quotation mark? Did I put in too many? Is the line of code simply too long (how should I break it up?)?


Solution

  • F-strings and string.format() generally should not be used to generate SQL statements, as it potentially opens you to SQL injection attacks if there were some malformed or malicious column headers or data. While you trust the source of the input (for now), it's something that should be avoided. The combination of both makes these statements very difficult to parse and debug as well. However, it is okay to use f-strings or string concatenation to insert the correct number of ? placeholders.

    An additional problem I found while debugging your code is that the createRowStatement function had mismatched " and ' characters, which was probably causing your "string literal is unterminated" error.

    I also suspect there could have been problems if any of your data included spaces in the header or value, as the way you were constucting the statements did not wrap any of the inserted values in quotation marks.

    Here's a possible solution. Note that I added a placeholder for the tablename in the createRowStatement function so it can be more versatile. Also, by creating the statement with placeholders, it can now be called once before the loop, instead of having to generate a new statement for every row.

    # https://pandas.pydata.org/docs/reference/io.html
    df = pd.read_csv(stpath, header=0)
    #df.to_sql(students, con)
    headers = df.columns.values.tolist() # draw list of headers from df
    
    # create table via prepared statement that scales with number of columns in table
    def createTableStatement(tableName, columnList):
        column_placeholders = "? TEXT, " * (len(columnList))
        statement = f"CREATE TABLE IF NOT EXISTS ? (id INTEGER, {column_placeholders} PRIMARY KEY(id))"
        return statement, (tableName, *columnList)
    
    statement, values = createTableStatement("students", headers)   
    db.execute(statement, *values)   
    # Note for anyone not familiar with CS50, the SQL library being used here expects all of the values to be passed as *args, not as a tuple like some of the built-in libraries.
    
    # read data into table
    def createRowStatement(columnList)
        column_placeholders = "?" + ", ?" * (len(columnList)-1)
        statement = f"INSERT INTO ? ({column_placeholders}) VALUES ({column_placeholders})"
        return statement
    
    
    with open(stpath, "r", encoding='utf-8-sig') as file:
        reader = csv.DictReader(file)
        statement = createRowStatement(headers)
        for row in reader:
            db.execute(statement, "students", *headers, *row.values())
    

    Example outputs from these versions:

    tableName="Students"
    headers = ["First Name", "Last Name", "Grade", "Age"]
    
    statement, values = createTableStatement("students", headers)   
    # statement = "CREATE TABLE IF NOT EXISTS ? (id INTEGER, ? TEXT, ? TEXT, ? TEXT, ? TEXT, PRIMARY KEY(id))"
    # values = ('Students', 'First Name', 'Last Name', 'Grade', 'Age')
    
    statement = createRowStatement(headers)
    # statement = "INSERT INTO ? (?, ?, ?, ?) VALUES (?, ?, ?, ?)"
    

    We can actually drop pandas entirely if you're not using it for anything else, and get the headers from the DictReader.fieldnames property.

    import csv
    #import pandas as pd
    from cs50 import SQL
    
    
    db = SQL("sqlite:///so.db")
    stpath = "so.csv"
    TABLENAME = "students"
    
    # create table via prepared statement that scales with number of columns in table
    def createTableStatement(tableName, columnList):
        column_placeholders = "? TEXT, " * (len(columnList))
        statement = f"CREATE TABLE IF NOT EXISTS ? (id INTEGER, {column_placeholders} PRIMARY KEY(id))"
        return statement, (tableName, *columnList)
    
    # read data into table
    def createRowStatement(columnList):
        column_placeholders = "?" + ", ?" * (len(columnList)-1)
        statement = f"INSERT INTO ? ({column_placeholders}) VALUES ({column_placeholders})"
        return statement
    
    with open(stpath, "r", encoding='utf-8-sig') as file:
        reader = csv.DictReader(file)
        headers = reader.fieldnames
        create_statement, values = createTableStatement(TABLENAME, headers)
        db.execute(create_statement, *values)
    
        insert_statement = createRowStatement(headers)
        for row in reader:
            db.execute(insert_statement, TABLENAME, *headers, *row.values())
    
    data = db.execute("SELECT * FROM ?", TABLENAME)
    for row in data:
        print(row)