Spark 课程复习

目录

  • 习题 1:猜数字游戏
  • 习题 2:质因数分解
  • 习题 3:Scala 文件读写
  • 习题 4:Spark WordCount(经典)
  • 习题 5:最长上升子序列(LIS)
  • 习题 6:杨辉三角
  • 习题 7:分形递归图形
  • 习题 8:DF 基础操作
  • 习题 9:广播变量 join 优化
  • 习题 10:电影评分
  • 习题 11:自定义分区器
  • 习题 12:文本分类完整案例(读取 CSV + JSON)
  • 习题 13:Case Class 转 DataFrame
  • 习题 14:KMeans 算法
  • 习题 15:读取后排序
  • 习题 16:RDD 缓存
  • 习题 18:RDD 分区操作
  • 习题 19:从文件创建 RDD 并过滤
  • 习题 20:统计最长单词

习题 1:猜数字游戏

题目:程序随机生成一个 1~100 的整数,用户不断猜直到猜对,每次告诉用户猜大了还是猜小了。

import scala.util.Random

object GuessNum {
  def main(args: Array[String]): Unit = {
    val res = Random.nextInt(100)
    var ans = -1
    var c = 0
    println("输入1-100之间的整数")
    while (res != ans) {
      ans = readInt()
      c += 1
      if (ans > res) println("你猜大了,试个小的")
      else if (ans < res) println("你猜小了,试个大的")
      else println("共" + c + "次,恭喜你,猜对了!!!")
    }
  }
}

习题 2:质因数分解

题目:输入一个整数,输出其质因数分解结果。例如输入 60,输出 2 个 2、1 个 3、1 个 5。

object TestFactor {
  def main(args: Array[String]): Unit = {
    var x = readInt()
    var t = 2
    while (t * t <= x) {
      if (x % t == 0) {
        var c = 0
        while (x % t == 0) {
          x /= t
          c += 1
        }
        println(c + "个" + t)
      }
      t += 1
    }
    if (x > 1) println("1个" + x)
  }
}

习题 3:Scala 文件读写

题目:将 1~5 写入文件 out.txt,再读取并打印,同时处理可能的 I/O 异常。

import java.io.PrintWriter
import scala.io.Source
import java.io.IOException

object TestScalaFile {
  def main(args: Array[String]): Unit = {
    try {
      val pw = new PrintWriter("out.txt")
      for (i <- 1 to 5) {
        pw.println(i)
      }
      pw.close()
      println("写入文件完毕")

      val in = Source.fromFile("out.txt")
      val lines = in.getLines()
      for (line <- lines) {
        println(line)
      }
      in.close()
      println("读入文件完毕")
    } catch {
      case ex: IOException => println("文件操作错误")
      case ex: Exception   => println("发生异常")
    }
  }
}

习题 4:Spark WordCount(经典)

题目:使用 Spark RDD 实现 WordCount,输出到文件。

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("wordcount").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rddlines = sc.textFile("input/word.txt", 1)
    val rddwords = rddlines.flatMap(line => line.trim().split(" "))
    val rddpair = rddwords.map(word => (word, 1))
    val rddwc = rddpair.reduceByKey((x, y) => x + y)
    rddwc.collect().foreach(println)

    // 保存到文件:自动删除旧输出
    val outpath = new Path("out")
    if (fs.exists(outpath)) fs.delete(outpath, true)
    rddwc.saveAsTextFile("out")
  }
}

习题 5:最长上升子序列(LIS)

题目:给定数组 [-3,-2,1,6,2,-7,5,8,-9,-8,-2,7,6,11,-2,3,-4],求最长上升子序列的长度。

object LongAS {
  def main(args: Array[String]): Unit = {
    var a = List(-3, -2, 1, 6, 2, -7, 5, 8, -9, -8, -2, 7, 6, 11, -2, 3, -4)
    val n = a.length
    var f = Array.ofDim[Int](n)
    var ans = 0
    for (i <- 0 until n) {
      f(i) = 1
      for (j <- 0 until i) {
        if (a(i) > a(j)) {
          f(i) = math.max(f(i), f(j) + 1)
        }
      }
      ans = math.max(ans, f(i))
    }
    println(ans)
  }
}

习题 6:杨辉三角

题目:输入整数 n,使用二维数组和递推公式输出杨辉三角。

object YangTriangle {
  def main(args: Array[String]): Unit = {
    val n = readInt()
    var a = Array.ofDim[Int](n + 1, n + 1)
    a(0)(0) = 1
    for (i <- 1 to n) {
      a(i)(0) = 1
      for (j <- 1 to i) {
        a(i)(j) = a(i - 1)(j - 1) + a(i - 1)(j)
      }
    }
    for (i <- 1 to n) {
      for (j <- 0 to i) {
        print(a(i)(j) + "\t")
      }
      println()
    }
  }
}

习题 7:分形递归图形

题目:输入整数 n,打印由 x 字符构成的递归分形图形(类似谢尔宾斯基地毯)。

object RecurX {
  def main(args: Array[String]): Unit = {
    val n = readInt()
    var len = 1
    var a = Array.ofDim[Char](1000, 1000)
    a(0)(0) = 'x'
    for (i <- 2 to n) {
      for (j <- 0 to len - 1) {
        for (k <- 0 to len - 1) {
          a(j)(k + 2 * len) = a(j)(k)                 // 右上
          a(j + 2 * len)(k) = a(j)(k)                 // 左下
          a(j + len)(k + len) = a(j)(k)               // 中间
          a(j + 2 * len)(k + 2 * len) = a(j)(k)       // 右下
        }
      }
      len *= 3
    }
    for (j <- 0 to len - 1) {
      for (k <- 0 to len - 1) {
        print(a(j)(k) + " ")
      }
      println()
    }
  }
}

习题 8:DF 基础操作

题目:通过 SparkSession 读取 CSV 文件生成 DataFrame,分别进行升序、降序和多字段复合排序;查询 namesalaryeducation 字段,并分别设置中文别名“名字”“工资”“教育”后展示。

数据格式:

name,age,home,occupation,salary,education
张三,25,北京,工程师,12000,本科
李四,30,上海,设计师,15000,硕士
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object DataFrameOperations {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("DataFrameOperations")
      .getOrCreate()

    val df = spark.read.csv("data.csv")

    println("原始数据:")
    df.show()

    println("按 salary 升序排序:")
    df.orderBy("salary").show()

    println("按 salary 降序排序:")
    df.orderBy(col("salary").desc).show()

    println("复合排序(salary 降序,education 升序):")
    df.orderBy(col("salary").desc, col("education").asc).show()

    println("带中文别名的查询结果:")
    df.select(
      col("name").as("名字"),
      col("salary").as("工资"),
      col("education").as("教育")
    ).show()

    spark.stop()
  }
}

习题 9:广播变量 join 优化

题目:有小表 smallTable(公司信息)和大表 bigTable(订单信息),使用广播变量实现高效 join。

import org.apache.spark.{SparkConf, SparkContext}

object CsvJoinExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("CsvJoin").setMaster("local[*]")
    val sc = new SparkContext(conf)

    // 1. 读取小表(价格映射)并转为 Map、广播
    val smallTableRaw = sc.textFile("prices.csv")
      .filter(!_.startsWith("clusterName")) // 跳过表头
      .map { line =>
        val parts = line.split(",")
        (parts(0), parts(1).toInt) // (clusterName, price)
      }
      .collectAsMap()
    val smallTableBroadcast = sc.broadcast(smallTableRaw)

    // 2. 读取大表(订单数据)
    val bigTable = sc.textFile("orders.csv")
      .filter(!_.startsWith("orderId")) // 跳过表头
      .map { line =>
        val parts = line.split(",")
        (parts(0), parts(1), parts(2)) // (orderId, clusterName, year)
      }

    // 3. 使用广播变量进行 map-side join
    val result = bigTable.map { case (orderId, clusterName, year) =>
      val price = smallTableBroadcast.value.getOrElse(clusterName, "unknown")
      (orderId, clusterName, price, year)
    }

    // 4. 输出结果
    result.collect().foreach(println)
    sc.stop()
  }
}

习题 10:电影评分

题目:读取评分文件和电影文件,计算每部电影的平均评分,筛选平均分大于等于 4.0 的电影,输出电影名和评分。

movies.csv

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy

ratings.csv

userId,movieId,rating,timestamp
14,2,4.4,957934979
15,4,1.2,884032602
15,11,4.5,902412488
import org.apache.spark.{SparkConf, SparkContext}

object MovieRatingFixed {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("MovieRating").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rddrating = sc.textFile(args(0)) // ratings.csv
    val ratingres = rddrating.map(line => line.split(","))
      .filter(a => a(1) != "movieId")
      .map(a => (a(1).toInt, a(2).toDouble))
      .groupByKey()
      .mapValues(a => a.sum / a.size)
      .filter(kv => kv._2 >= 4.0)

    val rddmovies = sc.textFile(args(1)) // movies.csv
      .map(line => line.split(","))
      .filter(a => a(0) != "movieId")
      .map(a => (a(0).toInt, a(1)))
      .join(ratingres)
      .map(f => f._2)

    rddmovies.saveAsTextFile(args(2))
  }
}

习题 11:自定义分区器

题目:实现自定义 Partitioner,按 key 值取模分区;生成 100 万个随机数,按自定义分区器分区。

// UseridPartitioner.scala
import org.apache.spark.Partitioner

class UseridPartitioner(nump: Int) extends Partitioner {
  override def numPartitions: Int = nump

  override def getPartition(key: Any): Int = {
    return key.toString().toInt % nump
  }
}

// TestUserPartition.scala
object TestUserPartition {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(a)
    val rdd2 = rdd.map((_, 1)).partitionBy(new UseridPartitioner(6))
    rdd2.collect()
    sc.stop()
  }
}

习题 12:文本分类完整案例(读取 CSV + JSON)

题目:读取 user_data.csv(用户文本特征)和 label_data.json(标签),join 后训练逻辑回归文本分类模型。

user_data.csv

userId,user_intro
user_1,医生,专注于心脏病学研究
user_2,技术爱好者,专注于人工智能和机器学习

label_data.json

[
  {
    "userId": "user_1",
    "label": "medical"
  },
  {
    "userId": "user_2",
    "label": "finance"
  }
]
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.sql.SparkSession

object SparkTextClassifier {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("SparkTextClassifier")
      .getOrCreate()

    // 关键:导入隐式转换,使 $"col" 语法生效
    import spark.implicits._

    // 读取训练数据
    val trainUserDF = spark.read.option("header", "true").csv("user_data.csv")
    val trainLabelDF = spark.read.json("label_data.json")

    // 合并并一次性完成列选取和类型转换
    val trainDF = trainUserDF.join(trainLabelDF, "userId")
      .selectExpr(
        "user_intro as text",
        "cast(label as double) as label"
      )

    // 构建 Pipeline
    val tokenizer = new Tokenizer()
      .setInputCol("text")
      .setOutputCol("words")
    val hashingTF = new HashingTF()
      .setNumFeatures(1000)
      .setInputCol(tokenizer.getOutputCol)
      .setOutputCol("features")
    val lr = new LogisticRegression()
      .setMaxIter(10)
      .setRegParam(0.01)

    val pipeline = new Pipeline()
      .setStages(Array(tokenizer, hashingTF, lr))

    val model = pipeline.fit(trainDF)

    // 定义测试集:从训练集中随机拆分 20% 作为测试集
    val Array(trainingData, testData) = trainDF.randomSplit(Array(0.8, 0.2), seed = 42L)
    val predictions = model.transform(testData)

    predictions.select("text", "label", "probability", "prediction").show(false)

    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("label")
      .setPredictionCol("prediction")
      .setMetricName("accuracy")

    println(f"Test Error = ${1.0 - evaluator.evaluate(predictions)}%2.2f")
  }
}

习题 13:Case Class 转 DataFrame

题目:定义 case class Person,读取 CSV 数据转换为 DataFrame,按家乡分组统计平均年龄。

import org.apache.spark.sql.SparkSession

case class Person(name: String, age: Int, home: String)

object TestCaseClass {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").getOrCreate()
    import spark.implicits._

    val rdd = spark.sparkContext.textFile("input/data.txt")
    val df = rdd.map(line => line.split(","))
      .map(a => new Person(a(0), a(1).toInt, a(2)))
      .toDF()

    df.show()
    df.groupBy(df("home")).avg("age").show()
  }
}

习题 14:KMeans 算法

题目:使用 Spark 机器学习库完成无监督 KMeans 聚类任务,读取 LIBSVM 格式数据文件 input/sample_kmeans_data.txt,同时评测聚类指标。

sample_kmeans_data.txt

0 1:2.496714 2:1.861736
0 1:2.647689 2:3.523030
0 1:1.765847 2:1.765863
0 1:3.579213 2:2.767435
0 1:1.530526 2:2.542560
0 1:1.536582 2:1.534270
0 1:2.241962 2:0.086720
0 1:0.275082 2:1.437712
0 1:0.987169 2:2.314247
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession

object KMeansExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("KMeansExample")
      .master("local[*]")
      .getOrCreate()

    // 1. 读取 LIBSVM 格式数据
    val dataset = spark.read.format("libsvm").load("input/sample_kmeans_data.txt")

    println("原始数据预览:")
    dataset.show(false)

    // 2. 训练 KMeans 模型(设定 k=2,可根据实际调整)
    val kmeans = new KMeans()
      .setK(2)
      .setSeed(1L)
      .setFeaturesCol("features")

    val model = kmeans.fit(dataset)

    // 3. 获取聚类中心
    println("聚类中心:")
    model.clusterCenters.foreach(println)

    // 4. 对数据进行预测
    val predictions = model.transform(dataset)

    // 5.1 轮廓系数(Silhouette Score,值越接近 1 越好)
    val evaluator = new ClusteringEvaluator()
      .setFeaturesCol("features")
      .setPredictionCol("prediction")
      .setDistanceMeasure("squaredEuclidean")
    val silhouetteScore = evaluator.evaluate(predictions)
    println(s"轮廓系数(Silhouette Score)= $silhouetteScore")

    // 5.2 簇内平方和(WCSS / Within Set Sum of Squared Errors)
    val wcss = predictions.select("features", "prediction").rdd.map { row =>
      val features = row.getAs[org.apache.spark.ml.linalg.Vector](0)
      val clusterIdx = row.getInt(1)
      val center = model.clusterCenters(clusterIdx)
      Vectors.sqdist(features, center)
    }.sum()
    println(s"簇内平方和(WCSS)= $wcss")

    println("预测结果预览:")
    predictions.select("features", "prediction").show(false)

    spark.stop()
  }
}

习题 15:读取后排序

题目:编写 Spark 代码读取指定目录下的 15.csv 文件。每行按逗号分割,过滤字段总数不足 4 列的脏数据;将第三列转为浮点型作为排序 key,执行降序排序;第三列相同时按第四列排序,并输出所有列。

15.csv

col1,col2,col3,col4
A,1,3.5,X
B,2,2.1,Y
C,3,3.5,A
D,4,1.0,Z
import org.apache.spark.{SparkConf, SparkContext}

object CsvSortExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("CsvSort").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val lines = sc.textFile("15.csv")

    val sortedResult = lines
      .map(line => line.split(",", -1))
      .filter(fields => fields.length >= 4)
      .map { fields =>
        val key3 = fields(2).toDouble
        val key4 = fields(3)
        (key3, key4, fields)
      }
      .sortBy({ case (k3, k4, _) => (k3, k4) }, ascending = false)
      .map(_._3.mkString(","))

    sortedResult.take(20).foreach(println)
    sc.stop()
  }
}

习题 16:RDD 缓存

题目:对过滤后的 RDD 进行 cache 缓存,观察两次 action 操作时 filter 的执行次数。

import org.apache.spark.{SparkConf, SparkContext}

object TestCache {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("testcache").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val a = Array("Hadoop", "Spark", "Java", "Python", "JSQ", "C", "C++")
    val rdd = sc.parallelize(a)
    val rddqry = rdd.filter(word => {
      println("过滤操作")
      !word.contains("C")
    })

    rddqry.cache() // 缓存 RDD
    println(rddqry.count()) // 第一次 action,触发计算并缓存
    println(rddqry.collect().mkString(",")) // 第二次 action,从缓存读取
  }
}

习题 18:RDD 分区操作

题目:创建 2 分区的 RDD,使用 repartition 改为 3 分区,查看分区数。

import org.apache.spark.{SparkConf, SparkContext}

object TestPartition {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("testpartition").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val a = Array(1, 2, 3, 4, 5, 6, 7, 8)
    val rdd = sc.parallelize(a, 2)
    println(rdd.getNumPartitions) // 2
    val rdd2 = rdd.repartition(3)
    println(rdd2.getNumPartitions) // 3
  }
}

习题 19:从文件创建 RDD 并过滤

题目:读取文件,过滤出包含 spark(不区分大小写)的行,统计行数。

import org.apache.spark.{SparkConf, SparkContext}

object TestFilter {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("testfilter").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("input/word.txt")
    val cnt = rdd.filter(line => line.toLowerCase().contains("spark")).count()
    println(cnt)
  }
}

习题 20:统计最长单词

题目:读取文件,找出所有单词中长度最大的值。

import org.apache.spark.{SparkConf, SparkContext}

object TestMaxWord {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("testmaxnum").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rddlines = sc.textFile("input/word.txt")
    val rddwords = rddlines.flatMap(line => line.split(" "))
    val rddlen = rddwords.map(word => word.length)
    val maxlen = rddlen.reduce((x, y) => Math.max(x, y))
    println(maxlen)
  }
}