pythonsqlrcsvbigdata

Create a kmer database from a huge csv file


I have a huge csv file (7.5GB) it is structured with three columns (no header), the first it is a string with 7 characters (SSSSDKI), the second is the count (100) and the third represents the length of the sequence where the kmers(kmer is a term from bioinformatics and represent a string of length k) were counted.

MSLLGTP,4,356265492
SLLGTPL,9,356265492
LLGTPLS,8,356265492
LGTPLSS,7,356265492
GTPLSSS,10,356265492
TPLSSSS,13,356265492
PLSSSSD,4,356265492
LSSSSDK,7,356265492

I have sqllite and mysql installed, so I need to tranform this csv file to a kmers databse. I have some experience in python code and R, but zero with SQL.

The csv file can have repeated kmers once they were counted in different sequences and merged in this huge file. So I need to aggregate the count form kmer that appears more than once and also the sequence length were they were founded.

for ex:

SSDKIML,4,356265492
SSDKIML,3,396290492 

the final values for this kmer would be:

SSDKIML,7,752555984  

After this process I would have a final csv file with all kmers and the aggregated counts and sequence lengths.

But I need assure that all the lines or all the data from the original file are in my database.

If any of the friends have time and patience any help would be very appreciated.

Thank you for your time and kindness.

PS - I tried with pandas and dask, but every time it kills my kernel

Use this with dask and a similar code with pandas:

import os
import glob
import dask.dataframe as dd


def process_file(input_pattern, output_filename):
    # Read all chunks matching the pattern
    dfs = [dd.read_csv(f, 
                       header=None,
                       names=['kmer',
                              'count',
                              'len']) for f in glob.glob(input_pattern)]
    
    # Concatenate all chunks into a single Dask DataFrame
    df = dd.concat(dfs)
    
    # Group by the 'String' column and sum the 'Count' column
    grouped_df = df.groupby('kmer')['count'].sum().reset_index()
    
    # Write the processed DataFrame to a new CSV file
    grouped_df.to_csv(output_filename,
                      index=False,
                      header=False,
                      single_file=True)

And this:

import os
import csv
from collections import defaultdict

def merge_kmer_files(folder_path, output_file):
    kmer_counts = defaultdict(int)  # Dictionary to store k-mer counts

    # Iterate over all files in the folder
    for file_name in os.listdir(folder_path):
        file_path = os.path.join(folder_path, file_name)
        if os.path.isfile(file_path) and file_name.endswith('.csv'):
            with open(file_path, mode='r') as csvfile:
                reader = csv.reader(csvfile)
                next(reader)  # Skip header if there is one
                for row in reader:
                    kmer, count = row[0], int(row[1])
                    kmer_counts[kmer] += count  # Aggregate counts

    # Write the aggregated results to the output file
    with open(output_file, mode='w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['kmer', 'count'])  # Write header
        for kmer, count in kmer_counts.items():
            writer.writerow([kmer, count])

def process_directory(root_dir):
    for dirpath, dirnames, filenames in os.walk(root_dir):
        csv_files = [f for f in filenames if f.endswith('.csv')]
        if len(csv_files) > 1:
            output_file = os.path.join(dirpath, 'merged_kmers.csv')
            print(f"Merging files in {dirpath} into {output_file}")
            merge_kmer_files(dirpath, output_file)

Paulo


Solution

  • It seems that this worked out

    #! usr/bin/env python
    
    import sys
    import csv
    import sqlite3
    
    
    # Database connection
    conn = sqlite3.connect('kmers.db')
    cursor = conn.cursor()
    
    # Create table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS kmers (
        kmer TEXT,
        counts INTEGER,
        seq_len INTEGER
    )
    ''')
    
    # Function to read and insert data in chunks
    def read_and_insert(csv_file):
        line_counter = 0
        with open(csv_file, 'r') as f:
            reader = csv.reader(f)
            chunk_size = 100000
            chunk = []
            for row in reader:
                chunk.append((row[0], int(row[1]), int(row[2])))
                line_counter += 1
                if len(chunk) >= chunk_size:
                    cursor.executemany('INSERT INTO kmers (kmer, counts, seq_len) VALUES (?, ?, ?)', chunk)
                    conn.commit()
                    chunk = []  # Clear the chunk after insertion
                    print(f"{line_counter} lines processed and inserted.")
            # Insert any remaining rows
            if chunk:
                cursor.executemany('INSERT INTO kmers (kmer, counts, seq_len) VALUES (?, ?, ?)', chunk)
                conn.commit()
                print(f"{line_counter} lines processed and inserted.")
    
        print(f"Total lines read and inserted: {line_counter}")
    
    # Path to your CSV file
    csv_file_path = sys.argv[1]
    read_and_insert(csv_file_path)
    
    # Create aggregated table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS aggregated_kmers AS
    SELECT kmer, SUM(counts) as total_counts, SUM(seq_len) as total_seq_len
    FROM kmers
    GROUP BY kmer
    HAVING COUNT(kmer) >= 2
    ''')
    
    # Export the aggregated data to a new CSV file
    with open('aggregated_kmers.csv', 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['kmer', 'total_counts', 'total_seq_len'])
        for row in cursor.execute('SELECT * FROM aggregated_kmers'):
            writer.writerow(row)
    
    # Close the database connection
    conn.close()