PySpark has some unconventional syntaxes which provide power to the development process, making it easier. We talked about loops before. Let’s discover some more tricks to make our coding easier and more powerful.
The Power of the pyspark REDUCE function
Let’s discover more about this powerful function
Definition: We can use this function to apply a given function to a sequence of elements and reduces it to a single value
Syntax
1 |
functools.reduce(function, iterable, initializer=None) |
Scenario
Imagine you need to make replaces in a dataframe’s string column. You need to make multiple replaces of different strings.
This is the kind of scenario which creates a sequence of replace calls one inside another with a very ugly syntax.
It would be something like this:
1 2 3 |
replace(replace(replace(replace(replace(column,'xxx',''),'zz',''),'yy',''),'aa',''),'bb','') #this is an aproximate syntax |
Reduce function
Let’s suppose the dataframe name is DF and the column name is “Description”. This is how we can use the Reduce function in this scenario:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
from pyspark.sql.functions import col, regexp_replace from functools import reduce replacements = [ ("USD Monthly", ""), ("USD-Monthly", ""), ("USD Quarterly", ""), ("USD", "") ] new_column = reduce( lambda cl, replacement: regexp_replace(cl, replacement[0], replacement[1]), replacements, col("Description") ) df=df.withColumn("Description",new_column) |
Let’s analyze the code above:
The array “replacements” includes sets of strings to be replaced, the original values and the replacement value.
Reduce function first parameter
This one is a lambda expression. In this example, the expression receives a column and one of the string sets for replacement. It executes the regex_replace function to replace the string value in the column.
Reduce function second parameter
This is the “replacements” array. The Reduce function will be execute the lambda expression for each element in the array. Each element is a set of two strings, the original string value and the replacement.
Reduce function third parameter
This is an additional value to initialize the lambda expression. In this case, one column. The column will be sent as parameter to the lambda expression.
Final Result
The function Reduce results in a column value. This column value is used together the withColumn method to replace the original “Description” column by the new processed value
Clearing a dataframe – and similar tasks
This is an interesting coding trick. We can use it to clear a dataframe, to duplicate a dataframe or you may find even additional uses for this trick.
The logic is simple: You get the schema from one dataframe and use it as the definition to create a new, empty dataframe.
The code is as simple as this:
1 |
ingestion_df=spark.createDataFrame([], ingestion_df.schema) |
Calling pyspark functions by their string name
In one way or another, most languages have this capability: We can have the name of a function in a string variable, “functionName” for example, and execute the function from this information.
This is usually a feature we can use when create a generic, reusable and parameterized code.
Scenario
My scenario was exactly this. I was building a reusable code we can call with different sets of parameters. However, each different situation had slightly different data transformations required.
In this way, the reusable code can contain the data transformations which are common to all sets of parameters. We can handle the specific transformations each set of parameters need by calling a custom function, a different function for each set of parameters.
In this way, we can include in each set of parameters the name of a custom function. The function will complete the specific transformations.
Implementation
Considering a dataframe called “df” and a string variable called function_name, this is how the code can be implemented:
1 2 3 4 5 6 7 8 9 10 11 |
import builtin.Lib.pycdc as pycdc func=None try: func= getattr(pycdc, function_name) except: print('no custom transformation available') if func!=None: df=func(df) |
Some details about the code:
- getattr is the function which retrieves the actual function implementation from the function name
- The first parameter is the module where the function is located. “pycdc” is the name of the module, as used in the import statement.
- The getattr is used together a try/except. This creates a safe code in case the function doesn’t exist.
Summary
These are only some of the PySpark tricks. This is a powerful language available in Microsoft Fabric, an equally powerful environment
Load comments