If you are coming from a SQL background, you can use the where() clause instead of the filter() function to filter the rows from RDD/DataFrame based on the given condition or SQL expression. Both of these functions operate exactly the same. This can be done with the help of pySpark filter().
In this PySpark article, users would then know how to develop a filter on DataFrame columns of string, array, and struct types using single and multiple conditions, as well as how to implement a filter using isin() using PySpark (Python Spark) examples.
Wish to make a career in the world of PySpark? Start with HKR'S PySpark online training!
The filter function's syntax is shown below. The expression you wanted to filter would be condition.
Filter(condition)
Let's start with a DataFrame before moving on to examples. I'm using a DataFrame with StructType and ArrayType columns here because I'll be covering examples with both struct and array types.
from pyspark.sql.types import StructType,StructField
from pyspark.sql.types import StringType, IntegerType, ArrayType
data = [
(("James","","Smith"),["Java","Scala","C++"],"OH","M"),
(("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
(("Julia","","Williams"),["CSharp","VB"],"OH","F"),
(("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"),
(("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"),
(("Mike","Mary","Williams"),["Python","VB"],"OH","M")
]
schema = StructType([
StructField('name', StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
])),
StructField('languages', ArrayType(StringType()), True),
StructField('state', StringType(), True),
StructField('gender', StringType(), True)
])
df = spark.createDataFrame(data = data, schema = schema)
df.printSchema()
df.show(truncate=False)
Get ahead in your career with our PySpark Tutorial
The above script will help to display the below schema and dataframe results.
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- languages: array (nullable = true)
| |-- element: string (containsNull = true)
|-- state: string (nullable = true)
|-- gender: string (nullable = true)
+----------------------+------------------+-----+------+
|name |languages |state|gender|
+----------------------+------------------+-----+------+
|[James, , Smith] |[Java, Scala, C++]|OH |M |
|[Anna, Rose, ] |[Spark, Java, C++]|NY |F |
|[Julia, , Williams] |[CSharp, VB] |OH |F |
|[Maria, Anne, Jones] |[CSharp, VB] |NY |M |
|[Jen, Mary, Brown] |[CSharp, VB] |NY |M |
|[Mike, Mary, Williams]|[Python, VB] |OH |M |
+----------------------+------------------+-----+------+
To filter the rows from a DataFrame, use Column with the condition. You can express complex conditions by referring to column names with dfObject.colname.
Click here to get frequently asked PySpark Interview questions & answers
# Using equals condition
df.filter(df.state == "OH").show(truncate=False)
+----------------------+------------------+-----+------+
|name |languages |state|gender|
+----------------------+------------------+-----+------+
|[James, , Smith] |[Java, Scala, C++]|OH |M |
|[Julia, , Williams] |[CSharp, VB] |OH |F |
|[Mike, Mary, Williams]|[Python, VB] |OH |M |
+----------------------+------------------+-----+------+
# not equals condition
df.filter(df.state != "OH") \
.show(truncate=False)
df.filter(~(df.state == "OH")) \
.show(truncate=False)
The same example can also be written as follows. You must first import col from pyspark.sql.functions before you can use it.
#Using SQL col() function
from pyspark.sql.functions import col
df.filter(col("state") == "OH") \
.show(truncate=False)
If you have experience with SQL, you can use it in PySpark to filter DataFrame rows using SQL expressions.
Using SQL Expression
df.filter("gender == 'M'").show()
#For not equal
df.filter("gender != 'M'").show()
df.filter("gender <> 'M'").show()
To filter() rows on a DataFrame based on multiple conditions in PySpark, you can use either a Column with a condition or a SQL expression. The following is a simple example that uses the AND (&) condition; you can extend it with OR(|), and NOT(!) conditional expressions as needed.
//Filter multiple condition
df.filter( (df.state == "OH") & (df.gender == "M") ) \
.show(truncate=False)
The Dataframe results are shown below.
+----------------------+------------------+-----+------+
|name |languages |state|gender|
+----------------------+------------------+-----+------+
|[James, , Smith] |[Java, Scala, C++]|OH |M |
|[Mike, Mary, Williams]|[Python, VB] |OH |M |
+----------------------+------------------+-----+------+
If you have a list of elements and want to filter out those that are not in the list or are in the list, use the isin() function of the Column class. It does not have an isnotin() function, but you can accomplish the same thing by using the not operator ().
#Filter IS IN List values
li=["OH","CA","DE"]
df.filter(df.state.isin(li)).show()
+--------------------+------------------+-----+------+
| name| languages|state|gender|
+--------------------+------------------+-----+------+
| [James, , Smith]|[Java, Scala, C++]| OH| M|
| [Julia, , Williams]| [CSharp, VB]| OH| F|
|[Mike, Mary, Will...| [Python, VB]| OH| M|
+--------------------+------------------+-----+------+
# Filter NOT IS IN List values
#These show all records with NY (NY is not part of the list)
df.filter(~df.state.isin(li)).show()
df.filter(df.state.isin(li)==False).show()
You could also use the Column class's startswith(), endswith(), and contains() methods to filter DataFrame rows. More examples of Column class can be found in PySpark Column Functions.
# Using startswith
df.filter(df.state.startswith("N")).show()
+--------------------+------------------+-----+------+
| name| languages|state|gender|
+--------------------+------------------+-----+------+
| [Anna, Rose, ]|[Spark, Java, C++]| NY| F|
|[Maria, Anne, Jones]| [CSharp, VB]| NY| M|
| [Jen, Mary, Brown]| [CSharp, VB]| NY| M|
+--------------------+------------------+-----+------+
#using endswith
df.filter(df.state.endswith("H")).show()
#contains
df.filter(df.state.contains("H")).show()
The first syntax can be used to filter rows from a DataFrame based on a value in an array collection column. The following example employs array contains() from Pyspark SQL functions, which checks if a value exists in an array and returns true if it does, otherwise false.
from pyspark.sql.functions import array_contains
df.filter(array_contains(df.languages,"Java")) \
.show(truncate=False)
Import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType
from pyspark.sql.functions import col,array_contains
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
arrayStructureData = [
(("James","","Smith"),["Java","Scala","C++"],"OH","M"),
(("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
(("Julia","","Williams"),["CSharp","VB"],"OH","F"),
(("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"),
(("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"),
(("Mike","Mary","Williams"),["Python","VB"],"OH","M")
]
arrayStructureSchema = StructType([
StructField('name', StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
])),
StructField('languages', ArrayType(StringType()), True),
StructField('state', StringType(), True),
StructField('gender', StringType(), True)
])
df = spark.createDataFrame(data = arrayStructureData, schema = arrayStructureSchema)
df.printSchema()
df.show(truncate=False)
df.filter(df.state == "OH") \
.show(truncate=False)
df.filter(col("state") == "OH") \
.show(truncate=False)
df.filter("gender == 'M'") \
.show(truncate=False)
df.filter( (df.state == "OH") & (df.gender == "M") ) \
.show(truncate=False)
df.filter(array_contains(df.languages,"Java")) \
.show(truncate=False)
df.filter(df.name.lastname == "Williams") \
.show(truncate=False)
Conclusion:
Under this tutorial, I demonstrated how and where to filter rows from PySpark DataFrame using single or multiple conditions and SQL expressions, as well as how to filter rows by providing conditions on the array and struct columns with Spark using Python examples.Users may use the where() function to filter the rows on PySpark DataFrame.
Related Articles:
Batch starts on 28th Sep 2023, Weekday batch
Batch starts on 2nd Oct 2023, Weekday batch
Batch starts on 6th Oct 2023, Fast Track batch
The PySpark filter() is a useful function for filtering rows from the DataFrame or RDD based on the condition given.
The PySpark function filter() filters the rows within the DataFrame and returns the new dataframe.
In PySpark, an operation called filter() transformation helps filter the elements from PySpark RDD. Further, it returns an RDD with elements that pass the given conditions.
There are certain methods to filter multiple columns in the PySpark DataFrame, such as:-
In PySpark, DataFrame is a group of distributed data comprising rows and columns. Also, it is similar to Spark SQL’s relational table. Further, these dataframes are highly useful for many ML tasks and are very simple to estimate and control.