I am working on integrating Kedro, a data pipeline framework, with Streamlit, a popular Python web app framework, to build a data processing application. The primary goal is to run a specific Kedro pipeline from within my Streamlit app, using a custom DataCatalog
to manage and load DataFrames.
Problem Details:
Integration Background:
I have successfully integrated Kedro and Streamlit, and I can run Kedro pipelines from my Streamlit app. However, I want to pass custom data loaded in Streamlit as a DataCatalog
to the Kedro pipeline for processing.
Data Loading in Streamlit:
In Streamlit, I use file uploaders to load data from various sources (e.g., CSV, Excel). I then create DataFrames from this data.
Running Kedro Pipeline:
After loading and processing the data in Streamlit, I want to execute a specific Kedro pipeline, passing the custom DataCatalog
containing the loaded DataFrames. The pipeline processes this data and stores the results.
Error Encountered:
When I attempt to run the Kedro pipeline with the custom DataCatalog
, I encounter the following error messages:
TypeError: KedroSession.run() got an unexpected keyword argument 'extra_params'
AttributeError: can't set attribute 'catalog'
AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'
How can I properly run a Kedro pipeline from within a Streamlit app, passing a dynamically created DataCatalog? Kedro Version: Latest Streamlit Version: Latest
# Code block 1: Loading data in Streamlit
if st.button('Processar Dados de Entrada'):
# Executar a pipeline do Kedro
with KedroSession.create(project_path=project_path) as session:
catalog = DataCatalog({
"reagentes_raw": MemoryDataSet(df1),
"balanco_de_massas_raw": MemoryDataSet(df2),
"laboratorio_raw": MemoryDataSet(df3),
"laboratorio_raiox_raw": MemoryDataSet(df4),
"carta_controle_pims_raw": MemoryDataSet(df5),
"blend_raw": MemoryDataSet(df6)
})
session.run(data_catalog=catalog, pipeline_name="tag_web_app")
st.success('Dados de entrada processados com sucesso!')
# Ler o arquivo Parquet após a execução da pipeline
merged_data = catalog.load("merged_raw_data_process")
# Supondo que o arquivo Parquet tenha uma coluna de timestamp
last_update = merged_data['timestamp_column'].max()
st.header('Resultado da Atualização da base de dados')
st.write(f"Dados de entrada processados com sucesso: a data e hora da informação mais recente é {last_update.strftime('%d/%m/%Y %H:%M')}")
# Code block 2: Attempt to set DataCatalog in KedroContext
if st.button('Processar Dados de Entrada'):
with KedroSession.create(project_path=project_path) as session:
context = session.load_context()
context.catalog = catalog # This line triggers an AttributeError
runner = SequentialRunner()
session.run(pipeline_name="tag_web_app")
runner.run(pipeline=context.pipeline_registry.get("tag_web_app"), catalog=catalog) # This line triggers an AttributeError
Error 2:
AttributeError: can't set attribute 'catalog'
Error 3:
AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'
AttributeError: can't set attribute 'catalog'
KedroSession.catalog
has been read-only for a while, what you are trying to do cannot be done.
AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'
KedroContext
never had a pipeline_registry
property.