pythondatabasesqlite

How do I join 2 SQL tables in python based on their (non-unique) ID and closest values in years?


I am currently working on my thesis where I try to automate creating family trees. Right now I am trying to link birth certificates to their respective parents by looking through the marriage certificates and linking the names: if the names of both parents in the birth certificate are exactly the same as in the marriage certificate, I want to link them. However, there are multiple marriage certificates that contain the exact same names. So how I try to "solve" it is to find the marriage certificate that has the closest date to the birth certificate to reduce errors (I am working with 12 million birth and 5 million marriage certificates).

What I want to get out of this query is the difference in years between the birth of the child and the date of marriage of their parents, for all results.

I started by creating a PARENTS_ID that consists of the full name of the mother appended to the full name of the father, as well as a MARRIAGE_ID that appended the full name of the bride to the groom. I created a chunk generator to reduce memory usage and a difference counter to count the amount of times a specific difference is found.

def generate_chunks(df, chunk_size):
    for i in range(0, len(df), chunk_size):
        yield df.iloc[i:i+chunk_size]

def freq(dic, arr):
    for i in arr:
        if i not in dic:
            dic[i] = 1
        else:
            dic[i] += 1
    return dic

marriages.sort_values(by=['EVENT_YEAR'])
marriages.sort_values(by=['MARRIAGE_ID'])
births.sort_values(by=['EVENT_YEAR'])

# Create an in-memory database
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA synchronous = OFF;')
conn.execute('PRAGMA journal_mode = MEMORY;')
conn.execute('PRAGMA temp_store = MEMORY;')

marriages.to_sql('sqlmarriages', conn, index=False, if_exists='append')
conn.execute("CREATE INDEX idx_marriage_id ON sqlmarriages (MARRIAGE_ID);")

chunk_size = 2000

dic = {}
with open('age_diffs.txt', 'w') as f:
    for i, chunk in enumerate(generate_chunks(births, chunk_size)):
        print(f"Processing chunk {i} at {time.time() - start:.2f} seconds")
        chunk.to_sql('sqlchunk', conn, index=False, if_exists='append')
        conn.execute("CREATE INDEX idx_parents_id ON sqlchunk (PARENTS_ID);")

        query = """
        SELECT b.*, m.EVENT_YEAR - b.PR_BIR_YEAR AS AGE_DIFF, FIRST_VALUE(AGE_DIFF) OVER (PARTITION BY b.PARENTS_ID ORDER BY ABS(b.PR_BIR_YEAR - m.EVENT_YEAR) ASC) AS AGE_DIFF
        FROM sqlchunk b
        INNER JOIN sqlmarriages m
        ON b.PARENTS_ID = m.MARRIAGE_ID
        """
        matches = pd.read_sql_query(query, conn)

        dic = freq(dic, matches.loc[:, 'AGE_DIFF'].tolist())

print(dic)
conn.close()
temp_conn.close()

However when I run this, I get the error that AGE_DIFF does not exist as a column. Does anyone know how to resolve this issue?


Solution

  • def generate_chunks(df, chunk_size):
        for i in range(0, len(df), chunk_size):
            yield df.iloc[i:i+chunk_size]
    
    def freq(dic, arr):
        for i in arr:
            dic[i] = dic.get(i, 0) + 1
        return dic
    
    # Assuming births and marriages are DataFrames
    # and you have their EVENT_YEAR and PARENTS_ID / MARRIAGE_ID columns prepared.
    
    # Create an in-memory database and load the marriages table
    conn = sqlite3.connect(':memory:')
    conn.execute('PRAGMA synchronous = OFF;')
    conn.execute('PRAGMA journal_mode = MEMORY;')
    conn.execute('PRAGMA temp_store = MEMORY;')
    
    marriages.to_sql('sqlmarriages', conn, index=False, if_exists='append')
    conn.execute("CREATE INDEX idx_marriage_id ON sqlmarriages (MARRIAGE_ID);")
    
    chunk_size = 2000
    dic = {}
    
    with open('age_diffs.txt', 'w') as f:
        for i, chunk in enumerate(generate_chunks(births, chunk_size)):
            print(f"Processing chunk {i} at {time.time() - start:.2f} seconds")
            
            # Write chunk to a temporary table
            chunk.to_sql('sqlchunk', conn, index=False, if_exists='replace')
            
            # Index the chunk to speed up the join
            conn.execute("CREATE INDEX idx_parents_id ON sqlchunk (PARENTS_ID);")
    
            query = """
            WITH matched AS (
              SELECT 
                b.*,
                m.EVENT_YEAR,
                m.EVENT_YEAR - b.PR_BIR_YEAR AS AGE_DIFF,
                ROW_NUMBER() OVER (
                  PARTITION BY b.PARENTS_ID 
                  ORDER BY ABS(b.PR_BIR_YEAR - m.EVENT_YEAR) ASC
                ) AS rn
              FROM sqlchunk b
              INNER JOIN sqlmarriages m
              ON b.PARENTS_ID = m.MARRIAGE_ID
            )
            SELECT * FROM matched WHERE rn = 1;
            """
    
            matches = pd.read_sql_query(query, conn)
            
            # Write or print differences if needed
            # f.write(...) or just process them
            age_differences = matches['AGE_DIFF'].tolist()
            dic = freq(dic, age_differences)
    
    print(dic)
    conn.close()
    

    What This Accomplishes Efficient Chunks: You process the large birth dataset in manageable chunks of 2,000 rows to keep memory usage reasonable.

    Indexed Joins: Creating indexes on PARENTS_ID and MARRIAGE_ID helps speed up the join operation.

    Minimal Absolute Differences: Using ROW_NUMBER() and ORDER BY ABS(b.PR_BIR_YEAR - m.EVENT_YEAR) ensures that for each set of parents (PARENTS_ID), you get the one marriage row that is closest in time to the birth.

    Frequency Dictionary: After extracting only the closest matches, you update the frequency dictionary to keep track of how many times a particular year difference occurs.

    The final result dic will contain the distribution of these minimal age differences, allowing you to understand the typical gap between marriage and the birth of the child.