[ad_1]
Before diving into decorators, let’s first take a look at an existing data processing pipeline in Pandas and quickly go over what it does.
FILE_PATH = "Updated_sales.csv"
CHUNK_SIZE = 1000def read_raw_data(file_path: str, chunk_size: int=1000) -> DataFrame:
csv_reader = pd.read_csv(file_path, chunksize=chunk_size)
processed_chunks = []
for chunk in csv_reader:
chunk = chunk.loc[chunk["Order ID"] != "Order ID"].dropna()
processed_chunks.append(chunk)
return pd.concat(processed_chunks, axis=0)
def split_purchase_address(df_to_process: DataFrame) -> DataFrame:
df_address_split = df_to_process["Purchase Address"].str.split(",", n=3, expand=True)
df_address_split.columns = ["Street Name", "City", "State and Postal Code"]
df_state_postal_split = (
df_address_split["State and Postal Code"]
.str.strip()
.str.split(" ", n=2, expand=True)
)
df_state_postal_split.columns = ["State Code", "Postal Code"]
return pd.concat([df_to_process, df_address_split, df_state_postal_split], axis=1)
def extract_product_pack_information(df_to_process: DataFrame) -> DataFrame:
df_to_process["Pack Information"] = df_to_process["Product"].str.extract(r'.*\((.*)\).*').fillna("Not Pack")
return df_to_process
def one_hot_encode_product_column(df_to_process: DataFrame) -> DataFrame:
return pd.get_dummies(df_to_process, columns=["Product"])
def process_raw_data():
df = read_raw_data(file_path=FILE_PATH, chunk_size=CHUNK_SIZE)
return (
df
.pipe(split_purchase_address)
.pipe(extract_product_pack_information)
.pipe(one_hot_encode_product_column)
)
final_df = process_raw_data()
In this pipeline, essentially there’s a “main” function that calls a few other reading and processing functions to output a DataFrame that’s ready to be used to train a machine learning model. This is a simple example with just three pre-processing steps, but you can see how it works mainly by looking at the process_raw_data
function, which works as follows:
- Read in raw data in chunks using the custom
read_raw_data
function defined to take in the file at specified chunk sizes; - Call the Pandas
pipe
method on the the resulting DataFrame object to employ method chaining, which involves passing our custom data pre-processing functions to thepipe
method in sequence.
In this case, we have three main things we want to do with our data which are encapsulated in the three functions split_purchase_address
, extract_product_pack_information
, and one_hot_encode_product_columns
. All of these functions in different ways transform the initial DataFrame and allow the raw data to really be usable when moving forwards with machine learning.
However, if it’s a bit difficult like this to see what’s going on at each successive step in the pipeline. Because I wrote the code, I know exactly what’s happening to the DataFrame each time the pipe
method is called. But if the underlying data changes or if someone else wants to know what’s happening, it would be useful to have some data returned to us in the form of logs!
There are a lot of different ways to implement logs, but let’s take a look at how decorators in Python can help us understand what’s happening with out data.
So our goal is to understand what’s going on with our DataFrame as it gets processed in multiple stages of our pipeline. The solution is to put a wrapper layer around the functions we defined that can add extra functionality to return information about our DataFrame and more. Doing this means we need to define decorator functions.
This is a more efficient alternative than adding the same kind of log lines to each function. Especially at scale, imagine having to copy paste the same three log messages in a dozen different data processing functions or more scattered over a few different files.
It’s much more efficient to engage in “metaprogramming”, where we instead define decorator functions whose goal is to manipulate existing code. All a decorater function should do ideally is take a function as input and have the function run and return what it would normally, but add some additional functionality on top of the function.
Logging execution times of your functions
For an initial example of a decorator, let’s start by simply logging the start time of each function and also calculating the time it takes a function to run.
def log_execution_and_time(function):
@functools.wraps(function)
def wrapper(*args, **kwargs):
start_time = time.time()
logging.info(f"{datetime.fromtimestamp(start_time)} - Start execution of: {function.__name__}")
result = function(*args, **kwargs)
end_time = time.time()
logging.info(f"{datetime.fromtimestamp(end_time)} - Ended execution of: {function.__name__}")
logging.info(f"Duration of {function.__name__}: {round(end_time - start_time, 2)} seconds")
return result
return wrapper
There’s a lot to unpack so let’s take a look at what’s going on here. We start by defining our decorator function log_execution_and_time
. Since a decorator should accept a function as input, we also define a function
parameter.
If we were to use the function without the functools.wraps
line, we wouldn’t be able to preserve the metadata around the existing function and use it in our logging. Always remember to include this in your decorators, otherwise they won’t behave as you would like. The original function name, for example, wouldn’t be preserved in the wrapper (e.g. the decorator wouldn’t be able to tell that the read_raw_data
function name was “read_raw_data”). You can see a simple example in the Python docs.
Next, we define the wrapper to actually implement the added functionality we want. In this case, we want to track how long it takes the function to run. Doing so is simple: all we need to do is use the time
library to assign the current time to a start_time
and end_time
variables, then take the difference of the two to get the function duration. You’ll see in the middle of us defining those variables, we run the function. The line to do so is the:
result = function(*args, **kwargs)
This is just like the syntax we did before with reading raw data:
df = read_raw_data(file_path=FILE_PATH, chunk_size=CHUNK_SIZE)
The reason both in the wrapper
and in the function
we define the args
and kwargs
parameters is to allow the functions we pass with the decorators to accept any number of arguments and keyword arguments that are necessary.
In this function, we also write some loglines with the logging.info
syntax. We also use the function.__name__
to pull the actual name of the function in our logging messages.
Finally, inside the wrapper function we return the result
defined earlier, which is the result of the original function’s execution. In the context of our data processing pipelines, this will always be a Pandas DataFrame. In our decorator fuction, we return the wrapper
function, in which we include our added functionality to the existing function.
Logging DataFrame metadata
Now that we know how decorators work, let’s take a look at returning information in our logs about metadata on the DataFrame.
def log_dataframe_metadata(function):
@functools.wraps(function)
def wrapper(*args, **kwargs):
result = function(*args, **kwargs)
logging.info(f"{function.__name__} - result DataFrame number of rows: {len(result)}")
logging.info(f"{function.__name__} - result DataFrame number of columns: {len(result.columns)}")
for column, memory in result.memory_usage(deep=True).iteritems():
logging.info(f"{function.__name__} - {column} memory usage: {memory}")
return result
return wrapper
The decorator syntax is the same before. Instead, we return the result of the function first, and then define logging messages afterwards based on ther resulting DataFrame. Here, you can see that you can use a decorator to perform operations on the result of your functions.
In this case, we want to look at three things in the DataFrame:
- Number of rows:
len(result)
- Number of columns:
len(result.columns)
- Memory usage for each column:
result.memory_usage(deep=True)
Here, remember that result
will always in this case be a Pandas DataFrame, which means that we can call Pandas methods on it. Each of these can be useful to know depending on what function you define to transform your DataFrame does.
For example, if you’re adding a new column to a DataFrame, you can verify it was added by checking the number of columns. Also, if you’re just adding an extra column, no rows should be added to the DataFrame, which you can also sanity check as well.
Finally, the memory usage of each column can be useful to monitor over time, to make sure that you’re operating within some pre-defined limits. In our logging decorator for convenience, we work with the output of result.memory_usage
is a Series with information about each column, so we can loop through this series and return the memory used by each column in an easy-to-read manner.
Now that we have our decorators defined, implementing them in our pipeline is really simple. Our initial code from earlier can be modified like this to add the functionality we’ve defined in our decorator functions.
@log_execution_and_time
@log_dataframe_metadata
def read_raw_data(file_path: str, chunk_size: int=1000) -> DataFrame:
csv_reader = pd.read_csv(file_path, chunksize=chunk_size)
processed_chunks = []for chunk in csv_reader:
chunk = chunk.loc[chunk["Order ID"] != "Order ID"].dropna()
processed_chunks.append(chunk)
return pd.concat(processed_chunks, axis=0)
@log_execution_and_time
@log_dataframe_metadata
def split_purchase_address(df_to_process: DataFrame) -> DataFrame:
df_address_split = df_to_process["Purchase Address"].str.split(",", n=3, expand=True)
df_address_split.columns = ["Street Name", "City", "State and Postal Code"]
df_state_postal_split = (
df_address_split["State and Postal Code"]
.str.strip()
.str.split(" ", n=2, expand=True)
)
df_state_postal_split.columns = ["State Code", "Postal Code"]
return pd.concat([df_to_process, df_address_split, df_state_postal_split], axis=1)
@log_execution_and_time
@log_dataframe_metadata
def extract_product_pack_information(df_to_process: DataFrame) -> DataFrame:
df_to_process["Pack Information"] = df_to_process["Product"].str.extract(r'.*\((.*)\).*').fillna("Not Pack")
return df_to_process
@log_execution_and_time
@log_dataframe_metadata
def one_hot_encode_product_column(df_to_process: DataFrame) -> DataFrame:
return pd.get_dummies(df_to_process, columns=["Product"])
@log_execution_and_time
@log_dataframe_metadata
def process_raw_data():
df = read_raw_data(file_path=FILE_PATH, chunk_size=CHUNK_SIZE)
return (
df
.pipe(split_purchase_address)
.pipe(extract_product_pack_information)
.pipe(one_hot_encode_product_column)
)
final_df = process_raw_data()
All we had to do is add @log_execution_and_time
and @log_dataframe_metadata
to the top of our function definitions to implement the decorator functionality. Then when we run the code just the same as before, we additionally get the logging output for each function as it runs.
Now, users are able to see what’s going on with the DataFrame as it moves along the processing pipeline. The resulting DataFrame is just the same as before.
And that’s all! This is just a start to the many things you can do with decorators. Other things you can do with to improve your data processing pipeline like more include verbose logging, adding optional parameters to your decorators to make them behave differently for different functions, and even have them edit the result of each DataFrame (like dropping nulls) as needed.
Source link