This tutorial introduces Apache Spark's structured APIs, specifically DataFrames and Datasets. These APIs provide a higher-level abstraction for working with structured and semi-structured data.
Spark's structured APIs provide a way to work with data in a tabular format, similar to a relational database.
DataFrames can be created from various data sources, including:
df = spark.read \
.csv("data.csv", header=True, inferSchema=True)
Dataset<Row> df = spark.read()
.format("csv")
.option("header", "true")
.load("data.csv");
val df = spark.read
.format("csv")
.option("header", "true")
.load("data.csv")
df = spark.read.json("data.json")
Dataset<Row> df = spark.read()
.format("json")
.load("data.json");
val df = spark.read
.format("json")
.load("data.json")
df = spark.read.parquet("data.parquet")
Dataset<Row> df = spark.read()
.format("parquet")
.load("data.parquet");
val df = spark.read
.format("parquet")
.load("data.parquet")
rdd = spark.sparkContext.parallelize([(1, "Alice"), (2, "Bob")])
df = rdd.toDF(["id", "name"])
JavaRDD<Row> rdd = spark.sparkContext()
.parallelize(
Arrays.asList(new Row(1, "Alice"),new Row(2, "Bob"))
);
Dataset<Row> df = spark.createDataFrame(rdd, Encoders.bean(Row.class));
val rdd = spark.sparkContext
.parallelize(
Seq((1, "Alice"), (2, "Bob"))
)
val df = rdd.toDF("id", "name")
DataFrames support various operations and transformations:
select()
: Selects columns. df2 = df.select("name", "age")
Dataset<Row> df2 = df.select("name", "age");
val df2 = df.select("name", "age")
filter()
: Filters rows based on a condition.df2 = df.filter(df["age"] > 30)
Dataset<Row> df2 = df.filter(df.col("age").gt(30));
val df2 = df.filter(col("age") > 30)
groupBy()
: Groups rows based on one or more columns.df2 = df.groupBy("city").count()
Dataset<Row> df2 = df.groupBy("city").count();
val df2 = df.groupBy("city").count()
orderBy()
: Orders rows based on one or more columns.df2 = df.orderBy(df["age"].desc())
Dataset<Row> df2 = df.orderBy(df.col("age").desc());
val df2 = df.orderBy(col("age").desc)
withColumn()
: Adds a new column or replaces an existing one.df2 = df.withColumn("age_plus_one", df["age"] + 1)
Dataset<Row> df2 = df.withColumn("age_plus_one", expr("age + 1"));
val df2 = df.withColumn("age_plus_one", expr("age + 1"))
Columns and expressions are used to manipulate DataFrame data.
df["name"]
df.col("name")
df.col("name")
df["age"] + 1
df.col("age").plus(1)
df.col("age").plus(1)
from pyspark.sql.functions import col, expr
df.select(col("age") + 1)
df.select(expr("age + 1"))
from pyspark.sql.functions import col, expr
df.select(col("age").plus(1))
df.select(expr("age + 1"))
from pyspark.sql.functions import col, expr
df.select(col("age").plus(1))
df.select(expr("age + 1"))
DataFrames have a schema that defines the data types of each column.
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True)
])
df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], schema)
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
StructType schema = new StructType(new StructField[]{
new StructField("id", IntegerType(), true),
new StructField("name", StringType(), true)
})
Dataset<Row> df = spark.createDataFrame(Arrays.asList(new Row(1, "Alice"), new Row(2, "Bob")), schema);
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
val schema = StructType(Seq(
StructField("id", IntegerType, true),
StructField("name", StringType, true)
))
val df = spark.createDataFrame(Seq((1, "Alice"), (2, "Bob")), schema)
Datasets are similar to DataFrames but provide compile-time type safety.
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.List;
SparkSession spark = SparkSession.builder().getOrCreate();
List<Row> data = Arrays.asList(new Row(1, "Alice"), new Row(2, "Bob"));
Dataset<Row> df = spark.createDataFrame(data, Encoders.bean(Row.class));
Dataset<Row> ds = df.as("id", "name");
from pyspark.sql import SparkSession
val spark = SparkSession.builder().getOrCreate()
val df = spark.createDataFrame(Seq((1, "Alice"), (2, "Bob"))).toDF("id", "name")
val ds = df.as("id", "name")
ds.filter(ds.col("id").gt(1)).select(ds.col("name"));
ds.filter(ds("id") > 1).select(ds("name"));
Note: Datasets are only available in Scala and Java.
rdd = spark.sparkContext.parallelize([(1, "Alice"), (2, "Bob")])
df = rdd.toDF(["id", "name"])
JavaRDD<Row> rdd = spark.sparkContext().parallelize(Arrays.asList(new Row(1, "Alice"), new Row(2, "Bob")));
Dataset<Row> df = spark.createDataFrame(rdd, Encoders.bean(Row.class));
val rdd = spark.sparkContext.parallelize(Seq((1, "Alice"), (2, "Bob")))
val df = rdd.toDF("id", "name")
Dataset<Row> df = spark.createDataFrame(Arrays.asList(new Row(1, "Alice"), new Row(2, "Bob")), Encoders.bean(Row.class));
Dataset<Row> ds = df.as("id", "name");
val df = spark.createDataFrame(Seq((1, "Alice"), (2, "Bob"))).toDF("id", "name")
val ds = df.as("id", "name")