Spark 课程复习
目录
- 习题 1:猜数字游戏
- 习题 2:质因数分解
- 习题 3:Scala 文件读写
- 习题 4:Spark WordCount(经典)
- 习题 5:最长上升子序列(LIS)
- 习题 6:杨辉三角
- 习题 7:分形递归图形
- 习题 8:DF 基础操作
- 习题 9:广播变量 join 优化
- 习题 10:电影评分
- 习题 11:自定义分区器
- 习题 12:文本分类完整案例(读取 CSV + JSON)
- 习题 13:Case Class 转 DataFrame
- 习题 14:KMeans 算法
- 习题 15:读取后排序
- 习题 16:RDD 缓存
- 习题 18:RDD 分区操作
- 习题 19:从文件创建 RDD 并过滤
- 习题 20:统计最长单词
习题 1:猜数字游戏
题目:程序随机生成一个 1~100 的整数,用户不断猜直到猜对,每次告诉用户猜大了还是猜小了。
import scala.util.Random
object GuessNum {
def main(args: Array[String]): Unit = {
val res = Random.nextInt(100)
var ans = -1
var c = 0
println("输入1-100之间的整数")
while (res != ans) {
ans = readInt()
c += 1
if (ans > res) println("你猜大了,试个小的")
else if (ans < res) println("你猜小了,试个大的")
else println("共" + c + "次,恭喜你,猜对了!!!")
}
}
}
习题 2:质因数分解
题目:输入一个整数,输出其质因数分解结果。例如输入 60,输出 2 个 2、1 个 3、1 个 5。
object TestFactor {
def main(args: Array[String]): Unit = {
var x = readInt()
var t = 2
while (t * t <= x) {
if (x % t == 0) {
var c = 0
while (x % t == 0) {
x /= t
c += 1
}
println(c + "个" + t)
}
t += 1
}
if (x > 1) println("1个" + x)
}
}
习题 3:Scala 文件读写
题目:将 1~5 写入文件
out.txt,再读取并打印,同时处理可能的 I/O 异常。
import java.io.PrintWriter
import scala.io.Source
import java.io.IOException
object TestScalaFile {
def main(args: Array[String]): Unit = {
try {
val pw = new PrintWriter("out.txt")
for (i <- 1 to 5) {
pw.println(i)
}
pw.close()
println("写入文件完毕")
val in = Source.fromFile("out.txt")
val lines = in.getLines()
for (line <- lines) {
println(line)
}
in.close()
println("读入文件完毕")
} catch {
case ex: IOException => println("文件操作错误")
case ex: Exception => println("发生异常")
}
}
}
习题 4:Spark WordCount(经典)
题目:使用 Spark RDD 实现 WordCount,输出到文件。
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("wordcount").setMaster("local[*]")
val sc = new SparkContext(conf)
val rddlines = sc.textFile("input/word.txt", 1)
val rddwords = rddlines.flatMap(line => line.trim().split(" "))
val rddpair = rddwords.map(word => (word, 1))
val rddwc = rddpair.reduceByKey((x, y) => x + y)
rddwc.collect().foreach(println)
// 保存到文件:自动删除旧输出
val outpath = new Path("out")
if (fs.exists(outpath)) fs.delete(outpath, true)
rddwc.saveAsTextFile("out")
}
}
习题 5:最长上升子序列(LIS)
题目:给定数组
[-3,-2,1,6,2,-7,5,8,-9,-8,-2,7,6,11,-2,3,-4],求最长上升子序列的长度。
object LongAS {
def main(args: Array[String]): Unit = {
var a = List(-3, -2, 1, 6, 2, -7, 5, 8, -9, -8, -2, 7, 6, 11, -2, 3, -4)
val n = a.length
var f = Array.ofDim[Int](n)
var ans = 0
for (i <- 0 until n) {
f(i) = 1
for (j <- 0 until i) {
if (a(i) > a(j)) {
f(i) = math.max(f(i), f(j) + 1)
}
}
ans = math.max(ans, f(i))
}
println(ans)
}
}
习题 6:杨辉三角
题目:输入整数 n,使用二维数组和递推公式输出杨辉三角。
object YangTriangle {
def main(args: Array[String]): Unit = {
val n = readInt()
var a = Array.ofDim[Int](n + 1, n + 1)
a(0)(0) = 1
for (i <- 1 to n) {
a(i)(0) = 1
for (j <- 1 to i) {
a(i)(j) = a(i - 1)(j - 1) + a(i - 1)(j)
}
}
for (i <- 1 to n) {
for (j <- 0 to i) {
print(a(i)(j) + "\t")
}
println()
}
}
}
习题 7:分形递归图形
题目:输入整数 n,打印由 x
字符构成的递归分形图形(类似谢尔宾斯基地毯)。
object RecurX {
def main(args: Array[String]): Unit = {
val n = readInt()
var len = 1
var a = Array.ofDim[Char](1000, 1000)
a(0)(0) = 'x'
for (i <- 2 to n) {
for (j <- 0 to len - 1) {
for (k <- 0 to len - 1) {
a(j)(k + 2 * len) = a(j)(k) // 右上
a(j + 2 * len)(k) = a(j)(k) // 左下
a(j + len)(k + len) = a(j)(k) // 中间
a(j + 2 * len)(k + 2 * len) = a(j)(k) // 右下
}
}
len *= 3
}
for (j <- 0 to len - 1) {
for (k <- 0 to len - 1) {
print(a(j)(k) + " ")
}
println()
}
}
}
习题 8:DF 基础操作
题目:通过 SparkSession 读取 CSV 文件生成
DataFrame,分别进行升序、降序和多字段复合排序;查询
name、salary、education
字段,并分别设置中文别名“名字”“工资”“教育”后展示。
数据格式:
name,age,home,occupation,salary,education
张三,25,北京,工程师,12000,本科
李四,30,上海,设计师,15000,硕士
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object DataFrameOperations {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName("DataFrameOperations")
.getOrCreate()
val df = spark.read.csv("data.csv")
println("原始数据:")
df.show()
println("按 salary 升序排序:")
df.orderBy("salary").show()
println("按 salary 降序排序:")
df.orderBy(col("salary").desc).show()
println("复合排序(salary 降序,education 升序):")
df.orderBy(col("salary").desc, col("education").asc).show()
println("带中文别名的查询结果:")
df.select(
col("name").as("名字"),
col("salary").as("工资"),
col("education").as("教育")
).show()
spark.stop()
}
}
习题 9:广播变量 join 优化
题目:有小表 smallTable(公司信息)和大表
bigTable(订单信息),使用广播变量实现高效 join。
import org.apache.spark.{SparkConf, SparkContext}
object CsvJoinExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("CsvJoin").setMaster("local[*]")
val sc = new SparkContext(conf)
// 1. 读取小表(价格映射)并转为 Map、广播
val smallTableRaw = sc.textFile("prices.csv")
.filter(!_.startsWith("clusterName")) // 跳过表头
.map { line =>
val parts = line.split(",")
(parts(0), parts(1).toInt) // (clusterName, price)
}
.collectAsMap()
val smallTableBroadcast = sc.broadcast(smallTableRaw)
// 2. 读取大表(订单数据)
val bigTable = sc.textFile("orders.csv")
.filter(!_.startsWith("orderId")) // 跳过表头
.map { line =>
val parts = line.split(",")
(parts(0), parts(1), parts(2)) // (orderId, clusterName, year)
}
// 3. 使用广播变量进行 map-side join
val result = bigTable.map { case (orderId, clusterName, year) =>
val price = smallTableBroadcast.value.getOrElse(clusterName, "unknown")
(orderId, clusterName, price, year)
}
// 4. 输出结果
result.collect().foreach(println)
sc.stop()
}
}
习题 10:电影评分
题目:读取评分文件和电影文件,计算每部电影的平均评分,筛选平均分大于等于 4.0 的电影,输出电影名和评分。
movies.csv:
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
ratings.csv:
userId,movieId,rating,timestamp
14,2,4.4,957934979
15,4,1.2,884032602
15,11,4.5,902412488
import org.apache.spark.{SparkConf, SparkContext}
object MovieRatingFixed {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("MovieRating").setMaster("local[*]")
val sc = new SparkContext(conf)
val rddrating = sc.textFile(args(0)) // ratings.csv
val ratingres = rddrating.map(line => line.split(","))
.filter(a => a(1) != "movieId")
.map(a => (a(1).toInt, a(2).toDouble))
.groupByKey()
.mapValues(a => a.sum / a.size)
.filter(kv => kv._2 >= 4.0)
val rddmovies = sc.textFile(args(1)) // movies.csv
.map(line => line.split(","))
.filter(a => a(0) != "movieId")
.map(a => (a(0).toInt, a(1)))
.join(ratingres)
.map(f => f._2)
rddmovies.saveAsTextFile(args(2))
}
}
习题 11:自定义分区器
题目:实现自定义 Partitioner,按 key 值取模分区;生成 100 万个随机数,按自定义分区器分区。
// UseridPartitioner.scala
import org.apache.spark.Partitioner
class UseridPartitioner(nump: Int) extends Partitioner {
override def numPartitions: Int = nump
override def getPartition(key: Any): Int = {
return key.toString().toInt % nump
}
}
// TestUserPartition.scala
object TestUserPartition {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(a)
val rdd2 = rdd.map((_, 1)).partitionBy(new UseridPartitioner(6))
rdd2.collect()
sc.stop()
}
}
习题 12:文本分类完整案例(读取 CSV + JSON)
题目:读取 user_data.csv(用户文本特征)和
label_data.json(标签),join
后训练逻辑回归文本分类模型。
user_data.csv:
userId,user_intro
user_1,医生,专注于心脏病学研究
user_2,技术爱好者,专注于人工智能和机器学习
label_data.json:
[
{
"userId": "user_1",
"label": "medical"
},
{
"userId": "user_2",
"label": "finance"
}
]
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.sql.SparkSession
object SparkTextClassifier {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName("SparkTextClassifier")
.getOrCreate()
// 关键:导入隐式转换,使 $"col" 语法生效
import spark.implicits._
// 读取训练数据
val trainUserDF = spark.read.option("header", "true").csv("user_data.csv")
val trainLabelDF = spark.read.json("label_data.json")
// 合并并一次性完成列选取和类型转换
val trainDF = trainUserDF.join(trainLabelDF, "userId")
.selectExpr(
"user_intro as text",
"cast(label as double) as label"
)
// 构建 Pipeline
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.01)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
val model = pipeline.fit(trainDF)
// 定义测试集:从训练集中随机拆分 20% 作为测试集
val Array(trainingData, testData) = trainDF.randomSplit(Array(0.8, 0.2), seed = 42L)
val predictions = model.transform(testData)
predictions.select("text", "label", "probability", "prediction").show(false)
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("accuracy")
println(f"Test Error = ${1.0 - evaluator.evaluate(predictions)}%2.2f")
}
}
习题 13:Case Class 转 DataFrame
题目:定义 case class Person,读取 CSV 数据转换为
DataFrame,按家乡分组统计平均年龄。
import org.apache.spark.sql.SparkSession
case class Person(name: String, age: Int, home: String)
object TestCaseClass {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val rdd = spark.sparkContext.textFile("input/data.txt")
val df = rdd.map(line => line.split(","))
.map(a => new Person(a(0), a(1).toInt, a(2)))
.toDF()
df.show()
df.groupBy(df("home")).avg("age").show()
}
}
习题 14:KMeans 算法
题目:使用 Spark 机器学习库完成无监督 KMeans 聚类任务,读取 LIBSVM
格式数据文件
input/sample_kmeans_data.txt,同时评测聚类指标。
sample_kmeans_data.txt:
0 1:2.496714 2:1.861736
0 1:2.647689 2:3.523030
0 1:1.765847 2:1.765863
0 1:3.579213 2:2.767435
0 1:1.530526 2:2.542560
0 1:1.536582 2:1.534270
0 1:2.241962 2:0.086720
0 1:0.275082 2:1.437712
0 1:0.987169 2:2.314247
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
object KMeansExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("KMeansExample")
.master("local[*]")
.getOrCreate()
// 1. 读取 LIBSVM 格式数据
val dataset = spark.read.format("libsvm").load("input/sample_kmeans_data.txt")
println("原始数据预览:")
dataset.show(false)
// 2. 训练 KMeans 模型(设定 k=2,可根据实际调整)
val kmeans = new KMeans()
.setK(2)
.setSeed(1L)
.setFeaturesCol("features")
val model = kmeans.fit(dataset)
// 3. 获取聚类中心
println("聚类中心:")
model.clusterCenters.foreach(println)
// 4. 对数据进行预测
val predictions = model.transform(dataset)
// 5.1 轮廓系数(Silhouette Score,值越接近 1 越好)
val evaluator = new ClusteringEvaluator()
.setFeaturesCol("features")
.setPredictionCol("prediction")
.setDistanceMeasure("squaredEuclidean")
val silhouetteScore = evaluator.evaluate(predictions)
println(s"轮廓系数(Silhouette Score)= $silhouetteScore")
// 5.2 簇内平方和(WCSS / Within Set Sum of Squared Errors)
val wcss = predictions.select("features", "prediction").rdd.map { row =>
val features = row.getAs[org.apache.spark.ml.linalg.Vector](0)
val clusterIdx = row.getInt(1)
val center = model.clusterCenters(clusterIdx)
Vectors.sqdist(features, center)
}.sum()
println(s"簇内平方和(WCSS)= $wcss")
println("预测结果预览:")
predictions.select("features", "prediction").show(false)
spark.stop()
}
}
习题 15:读取后排序
题目:编写 Spark 代码读取指定目录下的 15.csv
文件。每行按逗号分割,过滤字段总数不足 4
列的脏数据;将第三列转为浮点型作为排序
key,执行降序排序;第三列相同时按第四列排序,并输出所有列。
15.csv:
col1,col2,col3,col4
A,1,3.5,X
B,2,2.1,Y
C,3,3.5,A
D,4,1.0,Z
import org.apache.spark.{SparkConf, SparkContext}
object CsvSortExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("CsvSort").setMaster("local[*]")
val sc = new SparkContext(conf)
val lines = sc.textFile("15.csv")
val sortedResult = lines
.map(line => line.split(",", -1))
.filter(fields => fields.length >= 4)
.map { fields =>
val key3 = fields(2).toDouble
val key4 = fields(3)
(key3, key4, fields)
}
.sortBy({ case (k3, k4, _) => (k3, k4) }, ascending = false)
.map(_._3.mkString(","))
sortedResult.take(20).foreach(println)
sc.stop()
}
}
习题 16:RDD 缓存
题目:对过滤后的 RDD 进行 cache 缓存,观察两次 action 操作时 filter 的执行次数。
import org.apache.spark.{SparkConf, SparkContext}
object TestCache {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("testcache").setMaster("local[*]")
val sc = new SparkContext(conf)
val a = Array("Hadoop", "Spark", "Java", "Python", "JSQ", "C", "C++")
val rdd = sc.parallelize(a)
val rddqry = rdd.filter(word => {
println("过滤操作")
!word.contains("C")
})
rddqry.cache() // 缓存 RDD
println(rddqry.count()) // 第一次 action,触发计算并缓存
println(rddqry.collect().mkString(",")) // 第二次 action,从缓存读取
}
}
习题 18:RDD 分区操作
题目:创建 2 分区的 RDD,使用 repartition 改为 3 分区,查看分区数。
import org.apache.spark.{SparkConf, SparkContext}
object TestPartition {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("testpartition").setMaster("local[*]")
val sc = new SparkContext(conf)
val a = Array(1, 2, 3, 4, 5, 6, 7, 8)
val rdd = sc.parallelize(a, 2)
println(rdd.getNumPartitions) // 2
val rdd2 = rdd.repartition(3)
println(rdd2.getNumPartitions) // 3
}
}
习题 19:从文件创建 RDD 并过滤
题目:读取文件,过滤出包含
spark(不区分大小写)的行,统计行数。
import org.apache.spark.{SparkConf, SparkContext}
object TestFilter {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("testfilter").setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd = sc.textFile("input/word.txt")
val cnt = rdd.filter(line => line.toLowerCase().contains("spark")).count()
println(cnt)
}
}
习题 20:统计最长单词
题目:读取文件,找出所有单词中长度最大的值。
import org.apache.spark.{SparkConf, SparkContext}
object TestMaxWord {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("testmaxnum").setMaster("local[*]")
val sc = new SparkContext(conf)
val rddlines = sc.textFile("input/word.txt")
val rddwords = rddlines.flatMap(line => line.split(" "))
val rddlen = rddwords.map(word => word.length)
val maxlen = rddlen.reduce((x, y) => Math.max(x, y))
println(maxlen)
}
}