HOME
ABOUT

DataFrames and Datasets

DataFrames and Datasets

This tutorial introduces Apache Spark's structured APIs, specifically DataFrames and Datasets. These APIs provide a higher-level abstraction for working with structured and semi-structured data.

Introduction to Structured APIs

Spark's structured APIs provide a way to work with data in a tabular format, similar to a relational database.

Creating DataFrames

DataFrames can be created from various data sources, including:

  • CSV files:
df = spark.read \
          .csv("data.csv", header=True, inferSchema=True)
Dataset<Row> df = spark.read()
                       .format("csv")
                       .option("header", "true")
                       .load("data.csv");
val df = spark.read
              .format("csv")
              .option("header", "true")
              .load("data.csv")
  • JSON files:
df = spark.read.json("data.json")
Dataset<Row> df = spark.read()
                       .format("json")
                       .load("data.json");
val df = spark.read
              .format("json")
              .load("data.json")
  • Parquet files:
df = spark.read.parquet("data.parquet")
Dataset<Row> df = spark.read()
                       .format("parquet")
                       .load("data.parquet");
val df = spark.read
              .format("parquet")
              .load("data.parquet")
  • RDDs:
rdd = spark.sparkContext.parallelize([(1, "Alice"), (2, "Bob")])
df = rdd.toDF(["id", "name"])
JavaRDD<Row> rdd = spark.sparkContext()
                        .parallelize(
                            Arrays.asList(new Row(1, "Alice"),new Row(2, "Bob"))
                        );
Dataset<Row> df = spark.createDataFrame(rdd, Encoders.bean(Row.class));
val rdd = spark.sparkContext
               .parallelize(
                    Seq((1, "Alice"), (2, "Bob"))
                )
val df = rdd.toDF("id", "name")

DataFrame Operations and Transformations

DataFrames support various operations and transformations:

  • select(): Selects columns.
  df2 = df.select("name", "age")
  Dataset<Row> df2 = df.select("name", "age");
  val df2 = df.select("name", "age")
  • filter(): Filters rows based on a condition.
df2 = df.filter(df["age"] > 30)
Dataset<Row> df2 = df.filter(df.col("age").gt(30));
val df2 = df.filter(col("age") > 30)
  • groupBy(): Groups rows based on one or more columns.
df2 = df.groupBy("city").count()
Dataset<Row> df2 = df.groupBy("city").count();
val df2 = df.groupBy("city").count()
  • orderBy(): Orders rows based on one or more columns.
df2 = df.orderBy(df["age"].desc())
Dataset<Row> df2 = df.orderBy(df.col("age").desc());
val df2 = df.orderBy(col("age").desc)
  • withColumn(): Adds a new column or replaces an existing one.
df2 = df.withColumn("age_plus_one", df["age"] + 1)
Dataset<Row> df2 = df.withColumn("age_plus_one", expr("age + 1"));
val df2 = df.withColumn("age_plus_one", expr("age + 1"))

Working with Columns and Expressions

Columns and expressions are used to manipulate DataFrame data.

  • Column access:
  df["name"]
  df.col("name")
  df.col("name")
  • Column operations:
  df["age"] + 1
  df.col("age").plus(1)
  df.col("age").plus(1)
  • Expressions:
  from pyspark.sql.functions import col, expr
  df.select(col("age") + 1)
  df.select(expr("age + 1"))
  from pyspark.sql.functions import col, expr
  df.select(col("age").plus(1))
  df.select(expr("age + 1"))
  from pyspark.sql.functions import col, expr
  df.select(col("age").plus(1))
  df.select(expr("age + 1"))

Schema Definition and Management

DataFrames have a schema that defines the data types of each column.

  • Inferring schema: Spark can infer the schema from the data.
  • Defining schema: You can define the schema explicitly.
  from pyspark.sql.types import StructType, StructField, IntegerType, StringType
  schema = StructType([
      StructField("id", IntegerType(), True),
      StructField("name", StringType(), True)
  ])
  df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], schema)
  from pyspark.sql.types import StructType, StructField, IntegerType, StringType
  StructType schema = new StructType(new StructField[]{
      new StructField("id", IntegerType(), true),
      new StructField("name", StringType(), true)
  })
  Dataset<Row> df = spark.createDataFrame(Arrays.asList(new Row(1, "Alice"), new Row(2, "Bob")), schema);
  from pyspark.sql.types import StructType, StructField, IntegerType, StringType
  val schema = StructType(Seq(
      StructField("id", IntegerType, true),
      StructField("name", StringType, true)
  ))
  val df = spark.createDataFrame(Seq((1, "Alice"), (2, "Bob")), schema)

Datasets: Typed Distributed Collections

Datasets are similar to DataFrames but provide compile-time type safety.

  • Creating Datasets:
  import org.apache.spark.sql.Encoders;
  import org.apache.spark.sql.Row;
  import org.apache.spark.sql.SparkSession;
  import java.util.Arrays;
  import java.util.List;
  
  SparkSession spark = SparkSession.builder().getOrCreate();
  List<Row> data = Arrays.asList(new Row(1, "Alice"), new Row(2, "Bob"));
  Dataset<Row> df = spark.createDataFrame(data, Encoders.bean(Row.class));
  Dataset<Row> ds = df.as("id", "name");
  from pyspark.sql import SparkSession
  val spark = SparkSession.builder().getOrCreate()
  val df = spark.createDataFrame(Seq((1, "Alice"), (2, "Bob"))).toDF("id", "name")
  val ds = df.as("id", "name")
  • Dataset Operations:
  ds.filter(ds.col("id").gt(1)).select(ds.col("name"));
  ds.filter(ds("id") > 1).select(ds("name"));

Note: Datasets are only available in Scala and Java.

Converting Between RDDs, DataFrames, and Datasets

  • RDD to DataFrame:
  rdd = spark.sparkContext.parallelize([(1, "Alice"), (2, "Bob")])
  df = rdd.toDF(["id", "name"])
  JavaRDD<Row> rdd = spark.sparkContext().parallelize(Arrays.asList(new Row(1, "Alice"), new Row(2, "Bob")));
  Dataset<Row> df = spark.createDataFrame(rdd, Encoders.bean(Row.class));
  val rdd = spark.sparkContext.parallelize(Seq((1, "Alice"), (2, "Bob")))
  val df = rdd.toDF("id", "name")
- **DataFrame to Dataset:**
  Dataset<Row> df = spark.createDataFrame(Arrays.asList(new Row(1, "Alice"), new Row(2, "Bob")), Encoders.bean(Row.class));
  Dataset<Row> ds = df.as("id", "name");
  val df = spark.createDataFrame(Seq((1, "Alice"), (2, "Bob"))).toDF("id", "name")
  val ds = df.as("id", "name")

Related Articles

  • Introduction
  • Installation
  • Architecture
  • Execution Modes
  • Spark Submit Command
  • Spark Core: RDD
  • DataFrames and Datasets
  • Data Sources and Formats
  • Spark SQL
  • Spark Structured Streaming
  • Spark Unstructured Streaming
  • Performance Tuning
  • Machine Learning with MLlib
  • Graph Processing with GraphX
  • Advanced Spark Concepts
  • Deployment and Production
  • Real-world Applications
  • Integration with Big Data Ecosystem
  • Best Practices and Design Patterns
  • Hands-on Projects