I am trying to speed up the following dlt (dlthub) pipeline via parallelisation as shown in the documentation here: https://dlthub.com/docs/reference/performance#parallelism
Here is the original (NOT parallelised) code which works
source = rest_api_source({
"client": {
"base_url": "https://www.filmweb.no/",
},
"resources": [
{
"name": f"movie_{MOVIE_ID}",
"table_name": "movies",
"endpoint": {
"path": "_next/data/{build_id}/film/{movie_id}.json",
"params": {
"movie_id": MOVIE_ID,
"build_id": BUILD_ID,
"edi": MOVIE_ID,
},
"data_selector": "pageProps.cmsDocument",
},
"write_disposition": "replace",
}
for MOVIE_ID in MOVIE_IDS
],
})
pipeline = dlt.pipeline(pipeline_name="movies", destination="filesystem")
load_info = pipeline.run(source)
I cannot work out how to actually parallelise this. The following code block...
@dlt.resource(parallelized=True)
def movies(movie_ids):
for MOVIE_ID in movie_ids:
yield {
"name": f"movie_{MOVIE_ID}",
"table_name": "movies",
"endpoint": {
"path": "_next/data/{build_id}/film/{movie_id}.json",
"params": {
"movie_id": MOVIE_ID,
"build_id": BUILD_ID,
"edi": MOVIE_ID,
},
"data_selector": "pageProps.cmsDocument",
},
"write_disposition": "replace",
}
@dlt.source
def movies_source(movie_ids):
return [rest_api_source({
"client": {
"base_url": "https://www.filmweb.no/",
},
"resources": [movies(movie_ids)]
})]
pipeline = dlt.pipeline(pipeline_name="movies", destination="filesystem")
pipeline.run(movies_source(MOVIE_IDS))
...gives me the error:
ResourceNameMissing: Resource name is missing. If you create a resource directly from data ie. from a list you must pass the name explicitly in `name` argument.
Please note that for resources created from functions or generators, the name is the function name by default.
I have tried a lot of things e.g. taking the rest_api_source
out of a list within the movies_source
function; explicitly specifying the name
within the @dlt.resource
wrapper; taking the name
entry out of the yielded dict within the movies
resource. None of this is fixing anything.
In the dlt REST API declarative object you can define resources passing the arguments that you can use in a normal dlt resource. In your case parallelized=True
.
So, your config object would be:
source: RESTAPIConfig = rest_api_source({
"client": {
"base_url": "https://www.filmweb.no/",
},
"resources": [
{
"name": f"movie_{MOVIE_ID}",
"table_name": "movies",
"endpoint": {
"path": "_next/data/{build_id}/film/{movie_id}.json",
"params": {
"movie_id": MOVIE_ID,
"build_id": BUILD_ID,
"edi": MOVIE_ID,
},
"data_selector": "pageProps.cmsDocument",
},
"write_disposition": "replace",
}
for MOVIE_ID in MOVIE_IDS
],
})
I suggest to use the type hint RESTAPIConfig
to have your IDE suggesting you which are the available arguments for the REST API config object.