I have a number of functions that form a data processing pipeline. Data extraction, transformation, loading into the database.
In addition to abstraction by functional meaning, I want to separate the program logs depending on the execution group. For example, I have a pipelines: “store 1 sales”, “store 1 warehouses”, “store 2 sales”. The logic is something like this:
store1_extract_sales(): pass
store1_convert_sales(): pass
store2_load_sales(): pass
# same for other pipelines.
However, I would like all the logs to be divided into files “store1_sales”, “store1_warehouses” and “store2_sales”, and have their own rotation logic.
Moreover, data processing script files are located in different directories and files. (Extracting files) -> (Transforming files) -> Load file(Database worker)
I use the loguru
library and now I just write all the logs into one file:
logger.add(
sink=logs_directory + '/logs.json',
level='INFO',
rotation='30 day',
serialize=True,
encoding='utf-8'
)
How can i do this?
I tried to change the logger configuration and install a filter, but it didn’t work.
From the loguru documentation, you can add multiple log handlers, each to it's own file; and you create a filter for which logger to use. To connect a log handler to particular logger, you use the bind
method.
# file 1
from loguru import logger
logger.add(
"store1_sales.log",
filter=lambda rec: rec["extra"]["task"] == "store1-sales",
rotation="1 week"
)
logger_1 = logger.bind(task="store1-sales")
def store1_load_sales():
logger_1.info('loading sales')
...
def store1_convert_sales():
logger_1.info('converting sales')
...
Similarly for other files:
# file 2
from loguru import logger
logger.add(
"store2_sales.log",
filter=lambda rec: rec["extra"]["task"] == "store2-sales",
rotation="1 week"
)
logger_2 = logger.bind(task="store2-sales")
def store2_load_sales():
logger_2.info('loading sales')
...
def store1_convert_sales():
logger_2.info('converting sales')
...