pythonpysparkpalantir-foundryfoundry-code-repositories

Importing RIDs from a dataset with column RIDs with Palantir Foundry Code Repository


I am a first-time poster and user in a Code Repository, so please excuse the verbiage. I am attempting to read RIDs from a dataset full of RIDs. For example, the column 'backing_dataset_rid' contains the RIDs which are saved inside the Foundry application. I am working on loading the RID to read the columns from that dataset and save the results back into the original dataset from which the RID was extracted. I believe this link is helpful, but I'm looking for a solution in PySpark, if possible: (How to union multiple dynamic inputs in Palantir Foundry?)

As of now it returns a list of "Transform(myproject.datasets.examples:extract_cols)</Foundry/OUTPUTFOLDER>" (hiding sensitive info)

from pyspark.sql import functions as F
from transforms.api import transform, transform_df, Input, Output
 
def RID_extract(RID):
    @transform_df(
        Output('/folder_path/OutputDataset'),
        data=Input(RID)
    )
    def extract_cols(data):
        column_names = data.dataframe().columns
        return column_names
    return extract_cols
@transform_df(
    Output("/folder_path/OutputDataset"),
    source_df=Input("/folder_path/InputDataset")
)
def compute(source_df):
    df = source_df
    output_path = "/folder_path/OutputDataset"
    rows=df.collect()
    df2=[]
    print(source_df.columns)
    for row in rows:
        if row['backing_dataset_rid'] == (None):
            continue
        RID = row['backing_dataset_rid']
        print(RID)
        RID_transform = RID_extract(RID)
        df2.append(RID_transform)
        #df.rdd.map(RID_transform).collect()
        print(RID_transform)
    return df2

Solution

  • A transform in Foundry can only have a static set of inputs and outputs, defined at the time of the checks running (for security reasons, notably).

    In other words: You can't add new inputs/outputs depending on your data (or any logic within your transform that can't be inferred at commit/checks-time).

    So, the exact behavior you are looking for is likely not doable.

    You would need to do something like:

    from pyspark.sql import functions as F
    from transforms.api import transform, transform_df, Input, Output
     
    def RID_extract(RID):
        [...]
        return extract_cols
    
    @transform_df(
        Output("/folder_path/OutputDataset"),
        source_df=Input("/folder_path/InputDataset")
    
        source_df1=Input("RID1"),
        source_df2=Input("RID2"),
        source_df3=Input("RID3"),
    ...
        source_dfn=Input("RIDn")
    
    )
    def compute(source_df, source_df1):
        [...]
    
    

    I think this would defeat the purpose of what you are trying to achieve. You could use a transforms generator like here, but still your inputs set would need to be static.

    As alternatives:

    Note: any API call reverse-engineered and not documented might break without prior warning, notice or replacement. Do this at your own risks, especially for production workflows.