PySpark Secrets to use with Fabric

PySpark is a powerful language for data manipulation and it’s full of tricks. Let’s discover some of them.

Control the Type of a NULL column

If you are creating a pysspark dataframe, but one of the columns contains only null values (None), how could you control the type of the column?

There is an interesting expression you can build using lit().cast() to control the type of a column with null values.

The following statement will add a new column or fix the type of an existing column containing only Null values.

 

from pyspark.sql.functions import lit

df=df.withColumn('myNewColumn',lit(None).cast('timestamp'))

 

Updating a single row/column in a Dataframe

PySpark works very well with Big Data and blocks of data. Each operation is made to apply over an entire Dataframe.

In this way, updating a single row/column brings a syntax not much traditional. This is an example:

 

from pyspark.sql.functions import lit, col, when

DF = DF.withColumn('fieldToChange', when(
     col("tableKey") == 'uniqueKey',
     lit('literalValue')
     ).otherwise(col("fieldToChange")))

 

Let’s analyse the statement step-by-step:

  • withColumn: Adds or replaces a column. In this case, it will replace the column.
  • when/otherwise: creates a condition, similar to a If statement
  • lit: literal value

In this way, the when clause specifies a filter, selecting one single row to receive a new value in the column we are updating. All the remaining rows will continue with the original value.

This example updates a single row. However, this syntax can be used as a SQL update/where, updating multiple rows in different ways.

Creating a Hash for Dataframe rows

When you need to compare if a row was changed or not, one good method is to create a hash for the row and compare the hash.

Using just a few statements you can create a hash for the rows of a dataframe. If these rows are kept immutable, the hash can also be used as surrogate key.

The code below shows an example of how to create a hash for all the rows of a dataframe:

df = spark.createDataFrame(
   [(1,"2",5,1),(3,"4",7,8)],
   ("col1","col2","col3","col4")
   )

df.withColumn("row_sha2", 
   sha2(concat_ws("||", *df.columns), 256))

display(df)

One of the most interesting parts of the code is the unpacking operator, ‘*’. Used with the df.columns collection, it unpack the collection, converting it into its individual elements. In this way, the concat_ws receives all the columns as a parameter.

You can also check more about the sha2 function