This tutorial covers Apache Spark Streaming, a module for real-time data processing.
Spark Streaming enables you to process real-time data streams.
DStreams are the basic abstraction in Spark Streaming.
Structured Streaming is a higher-level API for stream processing.
Spark Streaming supports various input sources.
Kafka:
df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "mytopic")
.load()
```
Kinesis:
df = spark.readStream.format("kinesis")
.option("streamName", "myStream")
.option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com")
.load()
```
Socket:
df = spark.readStream.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
```
Files:
df = spark.readStream.text("data/") ```
Spark Streaming supports various output sinks.
Kafka:
df.writeStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "mytopic")
.start()
```
Console:
df.writeStream.format("console").start() ```
Memory:
df.writeStream.format("memory").queryName("my_table").start() spark.sql("SELECT * FROM my_table").show() ```
Windowing operations allow you to perform calculations over a sliding window of data.
from pyspark.sql.functions import window
df = df.withColumn("window", window(df["timestamp"], "10 minutes", "5 minutes"))
DStreams are the basic abstraction in Spark Streaming.
DStreams (Discretized Streams) are a sequence of RDDs (Resilient Distributed Datasets) representing a continuous stream of data.
Spark Streaming provides various transformations on DStreams, similar to RDD transformations.
map()
: Applies a function to each element of the stream.filter()
: Filters elements based on a condition.count()
: Returns the number of elements in each RDD.reduce()
: Reduces the elements using a function.window()
: Performs calculations over a sliding window of data.import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.SparkConf;
public class JavaDStreamExample {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setAppName("JavaDStreamExample").setMaster("local[*]");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
lines.print();
jssc.start();
jssc.awaitTermination();
}
}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
object ScalaDStreamExample {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("ScalaDStreamExample").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
lines.print()
ssc.start()
ssc.awaitTermination()
}
}
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[*]", "PythonDStreamExample")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream("localhost", 9999)
lines.pprint()
ssc.start()
ssc.awaitTermination()
To set up a Spark Streaming context, you need to follow these steps:
DStreams are the basic abstraction in Spark Streaming.
DStreams (Discretized Streams) are a sequence of RDDs (Resilient Distributed Datasets) representing a continuous stream of data.
Spark Streaming provides various transformations on DStreams, similar to RDD transformations.
map()
: Applies a function to each element of the stream.filter()
: Filters elements based on a condition.count()
: Returns the number of elements in each RDD.reduce()
: Reduces the elements using a function.window()
: Performs calculations over a sliding window of data.import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.SparkConf;
public class JavaDStreamExample {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setAppName("JavaDStreamExample").setMaster("local[*]");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
lines.print();
jssc.start();
jssc.awaitTermination();
}
}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
object ScalaDStreamExample {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("ScalaDStreamExample").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
lines.print()
ssc.start()
ssc.awaitTermination()
}
}
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[*]", "PythonDStreamExample")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream("localhost", 9999)
lines.pprint()
ssc.start()
ssc.awaitTermination()
To set up a Spark Streaming context, you need to follow these steps:
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. It allows you to process data streams in real-time with the same ease as batch processing.
Input Data:
Structured Streaming consumes data from various sources like Kafka, files, and sockets.
Processing:
Data is processed using Spark SQL operations like select
, filter
, groupBy
, etc.
Output:
The processed data can be written to various sinks like Kafka, console, and files.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, StructField
# Initialize SparkSession
spark = SparkSession.builder.appName("SocketStream").getOrCreate()
# Define schema
schema = StructType([StructField("value", StringType(), True)])
# Read stream from socket
df = spark.readStream.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# Define schema
df = spark.readStream.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.schema(schema) \
.load()
# Process data
query = df.writeStream.format("console").start()
query.awaitTermination()
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
from pyspark.sql.types import StructType, StringType, IntegerType, StructField
# Initialize SparkSession
spark = SparkSession.builder.appName("StructuredStreamingWordCount").getOrCreate()
# Define schema for the input data
schema = StructType([StructField("sentence", StringType(), True)])
# Create a streaming DataFrame from a socket source
df = spark.readStream.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.schema(schema) \
.load()
# Split the lines into words
words = df.select(explode(split(df.value, " ")).alias("word"))
# Group the words and count the occurrences
wordCounts = words.groupBy("word").count()
# Start the streaming query to print the results to the console
query = wordCounts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
from pyspark.sql import SparkSession
# Initialize SparkSession
spark = SparkSession.builder.appName("KafkaStream").getOrCreate()
# Read stream from Kafka
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "mytopic") \
.load()
# Process data