pythonpandasdataframecsvsnakemake

How to pass a dynamic list of CSV files from Snakemake input to a Pandas DataFrame concat operation?


I'm working on a Snakemake workflow where I need to combine multiple CSV files into a single Pandas DataFrame. The list of CSV files is dynamic—it depends on upstream rules and wildcard patterns.

Here's a simplified version of what I have in my Snakefile:

rule combine_tables:
    input:
        expand("results/{sample}/data.csv", sample=SAMPLES)
    output:
        "results/combined/all_data.csv"
    run:
        import pandas as pd
        dfs = [pd.read_csv(f) for f in input]
        combined = pd.concat(dfs)
        combined.to_csv(output[0], index=False)

This works when the files exist, but I’d like to know:

What's the best practice for handling missing or corrupt files in this context?

Is there a more "Snakemake-idiomatic" way to dynamically list and read input files for Pandas operations?

How do I ensure proper file ordering or handle metadata like sample names if not all CSVs are structured identically?


Solution

  • rule combine_tables:
        input:
            # Static sample list (use checkpoints if dynamically generated)
            expand("results/{sample}/data.csv", sample=SAMPLES)
        output:
            "results/combined/all_data.csv"
        run:
            import pandas as pd
            dfs = []
            missing_files = []
            corrupt_files = []
            
            # Process files in consistent order
            for file_path in sorted(input, key=lambda x: x.split("/")[1]):  # Sort by sample
                # Handle missing files (shouldn't occur if Snakemake workflow is correct)
                if not os.path.exists(file_path):
                    missing_files.append(file_path)
                    continue
                    
                # Handle corrupt/unreadable files
                try:
                    df = pd.read_csv(file_path)
                    # Add sample metadata column
                    sample_id = file_path.split("/")[1]
                    df.insert(0, "sample", sample_id)  # Add sample column at start
                    dfs.append(df)
                except Exception as e:
                    corrupt_files.append((file_path, str(e)))
            
            # Validation reporting
            if missing_files:
                raise FileNotFoundError(f"Missing {len(missing_files)} files: {missing_files}")
            if corrupt_files:
                raise ValueError(f"Corrupt files detected:\n" + "\n".join(
                    [f"{f[0]}: {f[1]}" for f in corrupt_files]))
            if not dfs:
                raise ValueError("No valid dataframes to concatenate")
                
            # Concatenate and save
            combined = pd.concat(dfs, ignore_index=True)
            combined.to_csv(output[0], index=False)