Pyspark has many flexible syntaxes which are not so common to other languages. One of these syntaxes is the loop format.
Loops in PySpark can be used to build objects or other values which will be set into variables. In other languages, this would involve a loop concatenating or appending to tThe variable.
In PySpark, this can easily be done with a loop in a single line or very few lines.
Example 1 : Inline loop
You have a list of field names in a dictionary called fieldnames. The values of the dictionary are the actual field names. You need to select these fields from a dataframe.
1 |
df = df.select([col(new_name) for new_name in fieldNames.values()]) |
Let’s understand the pieces of this code:
- The .values() method extract a collection with the values of a dictionary. Dictionaries also have a keys() method
- The for loop is in the middle of the syntax to build an array of columns for the select method of the dataframe
- Before the for loop, there is the syntax for one single element of the array, using the loop variable (new_name)
Example 2 : Building Variables
You are joining 2 dataframes. However, you are building a reusable piece of code which may be executed for different tables.
The tables are in a dictionary where the key is the table name and the value is an array with the primary key names. How to build the join condition ?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
df1 = spark.createDataFrame([ {'id': 1, 'name': 'John', 'age': 30, 'operation': 'u'}, {'id': 2, 'name': 'Jane', 'age': 25, 'operation': 'u'}, {'id': 3, 'name': 'Doe', 'age': 40, 'operation': 'd'} ]) df2 = spark.createDataFrame([ {'id': 1, 'name': 'John', 'age': 29}, {'id': 2, 'name': 'Jane', 'age': 25}, {'id': 3, 'name': 'Doe', 'age': 40} ]) primaryKey = {'tablename': ['id']} join_conditions = [df1[key] == df2[key] for key in primaryKey['tablename']] joined_df = df1.join(df2, join_conditions, how='left') display(joined_df) |
The for works in a similar way as the previous example, but this time building an array of conditions to be evaluated during the join of a dataframe.
There is no need of “and” or comma, PySpark understand the expression is building different elements of the array.
Although in this example there is only one key field (“id”), the for prepares the code for the presence of composite keys.
Example 3 : Object loop
Loop over a dictionary with multiple variables. The for loop can extract multiple variables from an object. This is specially interesting for dictionaries.
1 2 3 |
fieldMappings = {'name': 'name', 'age': 'age'} for k,v in fieldMappings.items(): print(f"{k} : {v}") |
We use the method items() over the dictionary to allow the for to work.
Example 4 : JSON Loop
We can use a loop to build a JSON in a very interesting way. The example below is building a DAG for the runmultiple statement, but this is another subject. The interesting part for us is the way the for is used to build the DAG.
-
12345678910111213141516171819202122DAG = {"activities": [{"name": row["taskName"],"path": "NotebookToRunTask","timeoutPerCellInSeconds": 4000,"args": {"keyValue": row["taskName"] ,"function_url": urlprefix,"executionId": executionId,"foldername": foldername,"filenameprefix": filenameprefix},}for row in dfTasks.rdd.collect()],"concurrency": 0}
The basic concept is the same: One element of an array is built, the for comes immediatelly after, “multiplying” the element. - This example uses a dataframe
- The variable of the for is a row of the dataframe and it’s used to build each array element.
- The JSON can have other elements besides the array built by the for and this doesn’t affect the for (“concurrency” element is not part of the for).
Summary
This is a very powerful language syntax which can make the coding task very easy.
Load comments