The dataset I have stored are just coordinates of DNA sequence.
df:
chr start stop label
chr1 9000 9100 1
chr1 8803 8903 1
chr1 8903 9000 0
My goal is to expand the original dataset by creating a sliding window around each coordinate to capture context sequences.
new_df:
chr start stop label
chr1 9000-5000 9000+5000 1
chr1 9001-5000 9001+5000 1
chr1 9002-5000 9002+5000 1
...
chr1 9100-5000 9100+5000 1
...
using this function:
def expand_coordinates(element_locs, context=3):
# Vectorized expansion of coordinates
start = element_locs['Start'].astype(int)
end = element_locs['End'].astype(int)
expanded_data = []
for idx, row in element_locs.iterrows():
chr_name = row['Chromosome']
chr_start = start[idx]
chr_end = end[idx]
for i in range(chr_start, chr_end + 1):
expanded_data.append({
'Chromosome': chr_name,
'Start': max((i - 1) - context, 0),
'End': min(i + context, max_sizes[chr_name])
})
expanded_df = pd.DataFrame(expanded_data)
return expanded_df
def get_element_seqs(element_locs, context=3):
expanded_df = expand_coordinates(element_locs, context=context)
# Optimize genome fetching
genome = pysam.Fastafile(ref_genome)
def fetch_sequences(row):
return genome.fetch(row['Chromosome'], row['Start'], row['End'])
# Fetch sequences in a vectorized way
expanded_df['sequence'] = expanded_df.apply(fetch_sequences, axis=1)
return element_seqs
dataset = Dataset.from_pandas(element_final[['Chromosome', 'sequence', 'label']])
dataset = dataset.shuffle(seed=42)
tokenizer = AutoTokenizer.from_pretrained(f"InstaDeepAI/nucleotide-transformer-500m-human-ref")
def tokenize_function(examples):
outputs = tokenizer.batch_encode_plus(examples["sequence"], return_tensors="pt", truncation=False, padding=False, max_length=80)
return outputs
# Creating tokenized dataset
tokenized_dataset = dataset.map(
tokenize_function,
batched=True, batch_size=2000)
input_file = f"tokenized_elements/tokenized_{ELEMENT_LABEL}/{filename}.arrow"
# Load input data
d1 = Dataset.from_file(input_file)
def embed_function(examples):
torch.cuda.empty_cache()
gc.collect()
inputs = torch.tensor(examples['input_ids']) # Convert to tensor
inputs = inputs.to(device)
with torch.no_grad():
outputs = model(input_ids=inputs, output_hidden_states=True)
# Step 3: Extract the embeddings
hidden_states = outputs.hidden_states # List of hidden states from all layers
embeddings = hidden_states[-1] # Assuming you want embeddings from the last layer
averaged_embeddings = torch.mean(embeddings, dim=1) # Calculate mean along dimension 1 (the dimension with size 86)
averaged_embeddings = averaged_embeddings.to(torch.float32) # Ensure float32 data type
return {'embeddings': averaged_embeddings}
# Map embeddings function to input data
embeddings = d1.map(embed_function, batched=True, batch_size=1550)
embeddings = embeddings.remove_columns(["input_ids", "attention_mask"])
# Save embeddings to disk
output_dir = f"embedded_elements/embeddings_{ELEMENT_LABEL}/{filename}" # Assuming ELEMENT_LABEL is defined elsewhere
This ends up giving me huge datasets that makes my code crash (e.g. I start with 700K rows and they get expanded to 1000million rows). I have been using pandas, so maybe that's the problem too? Another issue is I'm not using batching I think? Unfortunately, my code keeps crashing between steps 2+3. I think I need to implement batching, but I'm unsure how everything will work since I eventually will need to feed in output to an LLM.
Rewriting the expand_coordinates
function since your process fails between steps 2 and 3. In step 3, expanded_df['sequence'] = expanded_df.apply(fetch_sequences, axis=1)
should be replaced with something like expanded_df.merge(fetch_sequences: pd.DataFrame, ...)
since merge is vectorized. It is a misconception that putting ANY function inside an apply is a vectorized approach!
def expand_coordinates(element_locs: pd.DataFrame, context: int = 3):
# create a column of ranges (memory efficient since ranges are lazy)
element_locs['range'] = element_locs.apply(lambda row: range(row['start'], row['stop'] + 1), axis=1)
# explode is a vectorized operation
element_locs = element_locs.explode('range')
element_locs['start'] = np.maximum(element_locs['start']-context, 0)
element_locs['stop'] = np.minimum(element_locs['stop']+context, 5000) # <-- 5000 is an arbitrary maximum for demo
return element_locs
I performed a few load tests and this fared well (although nowhere near the scale you're dealing with since I'm testing this on my laptop - Ubuntu 16GB). Results below. Please note - There is a significant amount of variability in testing since I'm generating the data through random generators. The critical piece here is the explosion factor (target rows/initial rows
) which depends solely on start and stop values (randomnly generated in my case).
Initial rows: 100
Generation time: 0.0 sec
Target rows: 511,628
Explosion time: 0.1 sec
Initial rows: 1,000
Generation time: 0.0 sec
Target rows: 4,965,974
Explosion time: 0.77 sec
Initial rows: 10,000
Generation time: 0.0 sec
Target rows: 50,074,976
Explosion time: 7.32 sec
Initial rows: 11,000
Generation time: 0.0 sec
Target rows: 54,922,952
Explosion time: 8.1 sec
Initial rows: 12,000
Generation time: 0.0 sec
Target rows: 59,966,220
Explosion time: 8.98 sec
Initial rows: 15,000
Generation time: 0.0 sec
Target rows: 75,115,987
Explosion time: 11.51 sec
Initial rows: 20,000
Generation time: 0.0 sec
Target rows: 100,010,662
Process finished with exit code 137 (interrupted by signal 9:SIGKILL)
At the scale you're dealing with, PANDAS is definitely a bad idea. Look into spark if you have access to the relevant infra.