I have a huge csv file (7.5GB) it is structured with three columns (no header), the first it is a string with 7 characters (SSSSDKI), the second is the count (100) and the third represents the length of the sequence where the kmers(kmer is a term from bioinformatics and represent a string of length k) were counted.
MSLLGTP,4,356265492
SLLGTPL,9,356265492
LLGTPLS,8,356265492
LGTPLSS,7,356265492
GTPLSSS,10,356265492
TPLSSSS,13,356265492
PLSSSSD,4,356265492
LSSSSDK,7,356265492
I have sqllite and mysql installed, so I need to tranform this csv file to a kmers databse. I have some experience in python code and R, but zero with SQL.
The csv file can have repeated kmers once they were counted in different sequences and merged in this huge file. So I need to aggregate the count form kmer that appears more than once and also the sequence length were they were founded.
for ex:
SSDKIML,4,356265492
SSDKIML,3,396290492
the final values for this kmer would be:
SSDKIML,7,752555984
After this process I would have a final csv file with all kmers and the aggregated counts and sequence lengths.
But I need assure that all the lines or all the data from the original file are in my database.
If any of the friends have time and patience any help would be very appreciated.
Thank you for your time and kindness.
PS - I tried with pandas and dask, but every time it kills my kernel
Use this with dask and a similar code with pandas:
import os
import glob
import dask.dataframe as dd
def process_file(input_pattern, output_filename):
# Read all chunks matching the pattern
dfs = [dd.read_csv(f,
header=None,
names=['kmer',
'count',
'len']) for f in glob.glob(input_pattern)]
# Concatenate all chunks into a single Dask DataFrame
df = dd.concat(dfs)
# Group by the 'String' column and sum the 'Count' column
grouped_df = df.groupby('kmer')['count'].sum().reset_index()
# Write the processed DataFrame to a new CSV file
grouped_df.to_csv(output_filename,
index=False,
header=False,
single_file=True)
And this:
import os
import csv
from collections import defaultdict
def merge_kmer_files(folder_path, output_file):
kmer_counts = defaultdict(int) # Dictionary to store k-mer counts
# Iterate over all files in the folder
for file_name in os.listdir(folder_path):
file_path = os.path.join(folder_path, file_name)
if os.path.isfile(file_path) and file_name.endswith('.csv'):
with open(file_path, mode='r') as csvfile:
reader = csv.reader(csvfile)
next(reader) # Skip header if there is one
for row in reader:
kmer, count = row[0], int(row[1])
kmer_counts[kmer] += count # Aggregate counts
# Write the aggregated results to the output file
with open(output_file, mode='w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['kmer', 'count']) # Write header
for kmer, count in kmer_counts.items():
writer.writerow([kmer, count])
def process_directory(root_dir):
for dirpath, dirnames, filenames in os.walk(root_dir):
csv_files = [f for f in filenames if f.endswith('.csv')]
if len(csv_files) > 1:
output_file = os.path.join(dirpath, 'merged_kmers.csv')
print(f"Merging files in {dirpath} into {output_file}")
merge_kmer_files(dirpath, output_file)
Paulo
It seems that this worked out
#! usr/bin/env python
import sys
import csv
import sqlite3
# Database connection
conn = sqlite3.connect('kmers.db')
cursor = conn.cursor()
# Create table
cursor.execute('''
CREATE TABLE IF NOT EXISTS kmers (
kmer TEXT,
counts INTEGER,
seq_len INTEGER
)
''')
# Function to read and insert data in chunks
def read_and_insert(csv_file):
line_counter = 0
with open(csv_file, 'r') as f:
reader = csv.reader(f)
chunk_size = 100000
chunk = []
for row in reader:
chunk.append((row[0], int(row[1]), int(row[2])))
line_counter += 1
if len(chunk) >= chunk_size:
cursor.executemany('INSERT INTO kmers (kmer, counts, seq_len) VALUES (?, ?, ?)', chunk)
conn.commit()
chunk = [] # Clear the chunk after insertion
print(f"{line_counter} lines processed and inserted.")
# Insert any remaining rows
if chunk:
cursor.executemany('INSERT INTO kmers (kmer, counts, seq_len) VALUES (?, ?, ?)', chunk)
conn.commit()
print(f"{line_counter} lines processed and inserted.")
print(f"Total lines read and inserted: {line_counter}")
# Path to your CSV file
csv_file_path = sys.argv[1]
read_and_insert(csv_file_path)
# Create aggregated table
cursor.execute('''
CREATE TABLE IF NOT EXISTS aggregated_kmers AS
SELECT kmer, SUM(counts) as total_counts, SUM(seq_len) as total_seq_len
FROM kmers
GROUP BY kmer
HAVING COUNT(kmer) >= 2
''')
# Export the aggregated data to a new CSV file
with open('aggregated_kmers.csv', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['kmer', 'total_counts', 'total_seq_len'])
for row in cursor.execute('SELECT * FROM aggregated_kmers'):
writer.writerow(row)
# Close the database connection
conn.close()