第1关:加载与保存操作
编程要求
打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,完善程序。读取本地文件 file:///data/bigfiles/demo.json,根据年龄字段 age 设置降序,输出结果。
demo.json 文件内容如下所示:
{"name": "zhangsan", "age": 20, "sex": "m"},
{"name": "lisi", "age": 21, "sex": "m"},
{"name": "tiantian", "age": 22, "sex": "f"},
{"name": "lihua", "age": 23, "sex": "f"},
{"name": "zhaoliu", "age": 24, "sex": "m"},
{"name": "liguanqing", "age": 25, "sex": "f"},
{"name": "zhangqi", "age": 26, "sex": "m"},
{"name": "zhaoai", "age": 27, "sex": "m"},
{"name": "wangjiu", "age": 28, "sex": "f"}
开始任务前,注意先启动 Hadoop 与 Hive 环境:start-all.sh、nohup hive --service metastore &
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object First_Question {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("First_Question")
.master("local[*]")
.getOrCreate()
/******************* Begin *******************/
val df: DataFrame = spark.read.json("file:///data/bigfiles/demo.json")
val sortedDf = df.orderBy(df("age").desc)
sortedDf.show()
/******************* End *******************/
spark.stop()
}
}
第2关:Parquet 格式文件
编程要求
打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,根据下列要求,完善程序。
读取本地文件 file:///data/bigfiles/demo.json,使用 Parquet 完成分区,列名为 student=1,保存到本地路径file:///result/下。
读取本地文件 file:///data/bigfiles/demo2.json,使用 Parquet 完成分区,列名为 student=2,保存到本地路径file:///result/下。
demo.json 文件内容如下所示:
{"name": "zhangsan", "age": 20, "sex": "m"},
{"name": "lisi", "age": 21, "sex": "m"},
{"name": "tiantian", "age": 22, "sex": "f"},
{"name": "lihua", "age": 23, "sex": "f"},
{"name": "zhaoliu", "age": 24, "sex": "m"},
{"name": "liguanqing", "age": 25, "sex": "f"},
{"name": "zhangqi", "age": 26, "sex": "m"},
{"name": "zhaoai", "age": 27, "sex": "m"},
{"name": "wangjiu", "age": 28, "sex": "f"}
demo2.json 文件内容如下所示:
{"name": "hongkong", "age": 20, "sex": "m"},
{"name": "kulu", "age": 21, "sex": "m"},
{"name": "huxiaotian", "age": 22, "sex": "f"},
{"name": "yueming", "age": 23, "sex": "f"},
{"name": "wangsan", "age": 24, "sex": "m"},
{"name": "zhaojiu", "age": 25, "sex": "f"},
{"name": "wangqiqi", "age": 26, "sex": "m"},
{"name": "wangxiantian", "age": 27, "sex": "m"},
{"name": "zhaoba", "age": 28, "sex": "f"}
开始任务前,注意先启动 Hadoop 与 Hive 环境:start-all.sh、nohup hive --service metastore &
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object Second_Question {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("Second_Question")
.master("local[*]")
.getOrCreate()
/******************* Begin *******************/
spark.read.json("file:///data/bigfiles/demo.json").write.parquet("file:///result/student=1") spark.read.json("file:///data/bigfiles/demo2.json").write.parquet("file:///result/student=2")
/******************* End *******************/
spark.stop()
}
}
第3关:ORC 格式文件
编程要求
根据下列要求,完善程序。
创建 Orc 格式的 Hive 数据表 student,添加字段id(int),name(string),age(int),class(string)。
按顺序插入如下数据:
1001,"王刚",19,"大数据一班"
1002,"李虹",18,"大数据一班"
1003,"张子萱",20,"大数据一班"
1004,"赵云",18,"大数据一班"
1005,"李晓玲",19,"大数据一班"
1006,"张惠",18,"大数据二班"
1007,"秦散",19,"大数据二班"
1008,"王丽",18,"大数据二班"
1009,"田忌",20,"大数据二班"
1010,"张花",18,"大数据二班"
打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,编写 spark sql 程序,读取创建的 student 表并按字段 id 升序输出。
开始任务前,注意先启动 Hadoop 与 Hive 环境:start-all.sh、nohup hive --service metastore &
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object Third_Question {
def main(args: Array[String]): Unit = {
// 创建 SparkSession,并启用 Hive 支持
val spark: SparkSession = SparkSession
.builder()
.appName("Third_Question")
.master("local[*]")
.enableHiveSupport() // 启用 Hive 支持
.getOrCreate()
/******************* Begin *******************/
// 创建 Hive 数据表 student
spark.sql(
"""
|CREATE TABLE IF NOT EXISTS student (
| id INT,
| name STRING,
| age INT,
| class STRING
|)
|STORED AS ORC
""".stripMargin)
// 插入数据到 student 表
spark.sql(
"""
|INSERT INTO student VALUES
|(1001, '王刚', 19, '大数据一班'),
|(1002, '李虹', 18, '大数据一班'),
|(1003, '张子萱', 20, '大数据一班'),
|(1004, '赵云', 18, '大数据一班'),
|(1005, '李晓玲', 19, '大数据一班'),
|(1006, '张惠', 18, '大数据二班'),
|(1007, '秦散', 19, '大数据二班'),
|(1008, '王丽', 18, '大数据二班'),
|(1009, '田忌', 20, '大数据二班'),
|(1010, '张花', 18, '大数据二班')
""".stripMargin)
// 查询并按 id 字段升序输出 student 表数据
val studentDF: DataFrame = spark.sql("SELECT * FROM student ORDER BY id ASC")
studentDF.show()
/******************* End *******************/
spark.stop()
}
}
第4关:JSON 格式文件
编程要求
打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,完善程序。读取本地文件 file:///data/bigfiles/test.json,不改变原数据排列顺序进行输出。
test.json 文件内容如下所示:
{"id":1001,"name":"王刚","age":19,"class":"大数据一班"},
{"id":1002,"name":"李虹","age":18,"class":"大数据一班"},
{"id":1003,"name":"张子萱","age":20,"class":"大数据一班"},
{"id":1004,"name":"赵云","age":18,"class":"大数据一班"},
{"id":1005,"name":"李晓玲","age":19,"class":"大数据一班"},
{"id":1006,"name":"张惠","age":18,"class":"大数据二班"},
{"id":1007,"name":"秦散","age":19,"class":"大数据二班"},
{"id":1008,"name":"王丽","age":18,"class":"大数据二班"},
{"id":1009,"name":"田忌","age":20,"class":"大数据二班"},
{"id":1010,"name":"张花","age":18,"class":"大数据二班"}
开始任务前,注意先启动 Hadoop 与 Hive 环境:start-all.sh、nohup hive --service metastore &
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object Forth_Question {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("Forth_Question")
.master("local[*]")
.getOrCreate()
/******************* Begin *******************/
// 读取 JSON 文件
val df = spark.read.json("file:///data/bigfiles/test.json")
df.select("id", "name", "age", "class").show()
/******************* End *******************/
spark.stop()
}
}
第5关:JDBC 操作数据库
编程要求
打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,完善程序。读取本地 csv 文件 file:///data/bigfiles/job58_data.csv(有表头),将加载的数据以覆盖的方式保存到本地 Mysql 数据库的 work.job_data 表中,数据库连接信息如下:
账号:root
密码:123123
端口:3306
注意设置 useSSL=false。
开始任务前,注意先启动 Hadoop 与 Hive 环境:start-all.sh、nohup hive --service metastore &
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object Fifth_Question {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("Fifth_Question")
.master("local[*]")
.getOrCreate()
/******************* Begin *******************/
// 1. 读取本地 CSV 文件
val df: DataFrame = spark.read
.option("header", "true")
.csv("file:///data/bigfiles/job58_data.csv")
// 2. MySQL 连接信息
val jdbcUrl = "jdbc:mysql://localhost:3306/work?useSSL=false"
val dbProperties = new java.util.Properties()
dbProperties.setProperty("user", "root")
dbProperties.setProperty("password", "123123")
// 3. 将数据写入 MySQL 数据库的 work.job_data 表中,覆盖原有数据
df.write
.mode(SaveMode.Overwrite)
.jdbc(jdbcUrl, "job_data", dbProperties)
/******************* End *******************/
spark.stop()
}
}
第6关:Hive 表操作
编程要求
打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,根据下列要求,完善程序。
在 Hive 中创建数据表 employee,添加字段eid(string),ename(string),age(int),part(string)。
插入如下数据:
"A568952","王晓",25,"财务部"
"B256412","张天",28,"人事部"
"C125754","田笑笑",23,"销售部"
"D265412","赵云",24,"研发部"
"F256875","李姿姿",26,"后勤部"
编写 spark sql 程序,直接采用 Spark on Hive 的方式读取创建的 employee 表并按字段 eid 升序输出。
开始任务前,注意先启动 Hadoop 与 Hive 环境:start-all.sh、nohup hive --service metastore &
import org.apache.spark.sql.{DataFrame, SparkSession}
object Sixth_Question {
def main(args: Array[String]): Unit = {
/******************* Begin *******************/
try {
// 创建SparkSession,启用Hive支持
val spark = SparkSession.builder()
.appName("ReadHiveTable")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
// 如果表不存在则创建
if (!spark.catalog.tableExists("employee")) {
sql("""
CREATE TABLE employee (
eid STRING,
ename STRING,
age INT,
part STRING
)
""")
// 准备数据
val employeeData = Seq(
("A568952","王晓",25,"财务部"),
("B256412","张天",28,"人事部"),
("C125754","田笑笑",23,"销售部"),
("D265412","赵云",24,"研发部"),
("F256875","李姿姿",26,"后勤部")
)
// 转换为DataFrame并写入Hive表
employeeData.toDF("eid", "ename", "age", "part")
.write.mode("append").saveAsTable("employee")
}
// 读取Hive表并按eid升序排序
val employeeDF: DataFrame = sql("SELECT * FROM employee ORDER BY eid ASC")
// 显示结果
println("")
employeeDF.show()
// 停止SparkSession
spark.stop()
} catch {
case e: Exception =>
println(s"程序执行出错: ${e.getMessage}")
e.printStackTrace()
}
/******************* End *******************/
}
}