原文:
annas-archive.org/md5/39ee***62e023387ee8c22ca10d1a221a译者:飞龙
协议:*** BY-NC-SA 4.0
第十章:一切皆相连 - GraphX
“技术使得大规模人口成为可能;而大规模人口现在使得技术变得不可或缺。”
- Joseph Wood Krutch
在这一章中,我们将学习如何使用图形模型(并解决)许多现实世界的问题。我们看到 Apache Spark 有自己的图形库,你在学习 RDD 时学到的内容在这里也能派上用场(这次作为顶点和边的 RDD)。
简而言之,本章将涵盖以下主题:
-
图论简要介绍
-
GraphX
-
VertexRDD 和 EdgeRDD
-
图形操作符
-
Pregel API
-
PageRank
图论简要介绍
为了更好地理解图形,让我们来看一下 Facebook 以及你通常如何使用 Facebook。每天你使用智能手机在朋友的墙上发布消息或更新你的状态。你的朋友们也都在发布自己的消息、照片和视频。
你有朋友,你的朋友有朋友,他们有朋友,依此类推。Facebook 有设置让你可以交新朋友或从朋友列表中删除朋友。Facebook 也有权限设置,可以精细控制谁能看到什么,以及谁可以与谁沟通。
现在,当你考虑到有十亿 Facebook 用户时,所有用户的朋友和朋友的朋友列表变得非常庞大和复杂。要理解和管理所有不同的关系或友谊是很困难的。
所以,如果有人想了解你和另一个人X是否有任何关系,他们可以简单地从查看你所有的朋友以及你朋友的朋友开始,依此类推,试图找到X。如果X是朋友的朋友,那么你和X是间接连接的。
在你的 Facebook 账户中搜索一两位名人,看看是否有人是你朋友的朋友。也许你可以尝试将他们添加为朋友。
我们需要构建存储和检索关于人及其朋友的数据,以便让我们能够回答如下问题:
-
X 是 Y 的朋友吗?
-
X 和 Y 是否直接连接,或者在两步内连接?
-
X 有多少个朋友?
我们可以从尝试一个简单的数据结构开始,比如数组,这样每个人都有一个朋友数组。现在,只需要取数组的长度就能回答问题 3。我们还可以直接扫描数组并快速回答问题 1。现在,问题 2 则需要多一点工作,取出X的朋友数组,并对每个朋友扫描他们的朋友数组。
我们通过构建一个专门的数据结构(如下面的例子所示),解决了这个问题,在这个例子中,我们创建了一个Person的 case 类,然后通过添加朋友来建立类似于john | ken | mary | dan的关系。
case class Person(name: String) {
val friends = scala.collection.mutable.ArrayBuffer[Person]()
def numberOfFriends() = friends.length
def isFriend(other: Person) = friends.find(_.name == other.name)
def isConnectedWithin2Steps(other: Person) = {
for {f <- friends} yield {f.name == other.name ||
f.isFriend(other).isDefined}
}.find(_ == true).isDefined
}
scala> val john = Person("John")
john: Person = Person(John)
scala> val ken = Person("Ken")
ken: Person = Person(Ken)
scala> val mary = Person("Mary")
mary: Person = Person(Mary)
scala> val dan = Person("Dan")
dan: Person = Person(Dan)
scala> john.numberOfFriends
res33: Int = 0
scala> john.friends += ken
res34: john.friends.type = ArrayBuffer(Person(Ken)) //john -> ken
scala> john.numberOfFriends
res35: Int = 1
scala> ken.friends += mary
res36: ken.friends.type = ArrayBuffer(Person(Mary)) //john -> ken -> mary
scala> ken.numberOfFriends
res37: Int = 1
scala> mary.friends += dan
res38: mary.friends.type = ArrayBuffer(Person(Dan)) //john -> ken -> mary -> dan
scala> mary.numberOfFriends
res39: Int = 1
scala> john.isFriend(ken)
res40: Option[Person] = Some(Person(Ken)) //Yes, ken is a friend of john
scala> john.isFriend(mary)
res41: Option[Person] = None //No, mary is a friend of ken not john
scala> john.isFriend(dan)
res42: Option[Person] = None //No, dan is a friend of mary not john
scala> john.isConnectedWithin2Steps(ken)
res43: Boolean = true //Yes, ken is a friend of john
scala> john.isConnectedWithin2Steps(mary)
res44: Boolean = true //Yes, mary is a friend of ken who is a friend of john
scala> john.isConnectedWithin2Steps(dan)
res45: Boolean = false //No, dan is a friend of mary who is a friend of ken who is a friend of john
如果我们为所有 Facebook 用户构建Person()实例,并按照前面的代码将朋友添加到数组中,那么最终,我们将能够执行大量关于谁是朋友以及两个人之间关系的查询。
下图展示了数据结构中的 Person() 实例以及它们之间的逻辑关系:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00008.jpeg
如果你想使用上面的图,仅仅找出约翰的朋友,约翰朋友的朋友,依此类推,这样我们就可以快速找到直接朋友、间接朋友(朋友的二级关系)、三级朋友(朋友的朋友的朋友),你会看到类似以下的图示:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00014.jpeg
我们可以轻松扩展 Person() 类并提供更多功能来回答不同的问题。这并不是重点,我们想要关注的是前面那个图示,展示了 Person 和 Person 的朋友,以及如何将每个 Person 的所有朋友绘制出来,从而形成一个人物之间的关系网。
现在我们引入图论,它源于数学领域。图论将图定义为由顶点、节点或点组成的结构,这些顶点通过边、弧和线连接。如果你将顶点集合视为 V,边集合视为 E,那么图 G 可以定义为有序对 V 和 E。
Graph G = (V, E)
V - set of Vertices
E - set of Edges
在我们 Facebook 朋友图的例子中,我们可以简单地将每个人视为顶点集合中的一个顶点,然后两个人之间的每一条链接可以视为边集合中的一条边。
根据这个逻辑,我们可以列出顶点和边,如下图所示:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00016.jpeg
这种作为数学图的描述引出了多种遍历和查询图的数学方法。当这些技术应用于计算机科学,作为开发程序方法来执行必要的数学运算时,正式的做法当然是开发算法,以可扩展、高效的方式实现数学规则。
我们已经尝试使用案例类 Person 实现一个简单的图形程序,但这只是最简单的用例,应该显而易见的是,存在许多复杂的扩展是可能的,比如以下问题需要解答:
-
从 X 到 Y 的最佳路径是什么?一个这样的例子是你的车载 GPS 告诉你去超市的最佳路线。
-
如何识别关键边,这些边可能导致图的分割?一个这样的例子是确定连接各个城市互联网服务/水管/电力线路的关键链接。关键边会切断连通性,产生两个连接良好的城市子图,但这两个子图之间将无法进行任何通信。
回答上述问题可以得出若干算法,如最小生成树、最短路径、网页排名、ALS(交替最小二乘法)、最大割最小流算法等,这些算法适用于广泛的使用场景。
其他示例包括 LinkedIn 的个人资料和连接、Twitter 的粉丝、Google 的页面排名、航空公司调度、汽车中的 GPS 等等,你可以清楚地看到一个包含顶点和边的图。通过使用图算法,可以使用不同的算法来分析 Facebook、LinkedIn 和 Google 等示例中看到的图,从而得出不同的商业用例。
以下是一些现实生活中图的实际应用示例,展示了图和图算法在一些现实生活中的用例中的应用,例如:
-
帮助确定机场之间的航班路线
-
规划如何将水管道布局到本地区的所有家庭
-
让你的汽车 GPS 规划开车去超市的路线
-
设计如何从一个城市到另一个城市、一个州到另一个州、一个国家到另一个国家的互联网流量路由
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00018.jpeg
现在让我们深入探讨如何使用 Spark GraphX。
GraphX
如前一部分所示,我们可以将许多现实生活中的用例建模为一个包含顶点集合和边集合的图,这些边连接着顶点。我们还编写了简单的代码,尝试实现一些基本的图操作和查询,比如,X 是否是 Y 的朋友?然而,随着我们进一步探索,算法变得更加复杂,用例也增多,而且图的规模远远大于单台机器能够处理的范围。
不可能将十亿个 Facebook 用户及其所有的友谊关系都装入一台机器或甚至几台机器中。
我们需要做的是超越仅仅将一台机器或几台机器拼凑在一起,开始考虑高度可扩展的架构,以实现复杂的图算法,这些算法能够处理数据量和数据元素之间复杂的互联关系。我们已经看到 Spark 的介绍,Spark 如何解决分布式计算和大数据分析中的一些挑战。我们还看到了实时流处理、Spark SQL 以及 DataFrames 和 RDD。我们能否解决图算法的挑战?答案是 GraphX,它随 Apache Spark 一起提供,就像其他库一样,位于 Spark Core 之上。
GraphX 通过提供一个基于 RDD 概念的图抽象,扩展了 Spark 的 RDD。GraphX 中的图是通过顶点或节点的概念来表示对象,边或链接用于描述对象之间的关系。GraphX 提供了实现许多适合图处理范式的用例的手段。在这一部分,我们将学习 GraphX,如何创建顶点、边和包含顶点和边的图。我们还将编写代码,通过示例学习一些与图算法和处理相关的技术。
要开始,你需要导入以下列出的几个包:
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.GraphLoader
import org.apache.spark.graphx.GraphOps
GraphX 的基本数据结构是图,它抽象地表示一个图,图中的顶点和边与任意对象相关联。图提供了基本操作,用于访问和操作与顶点和边相关联的数据,以及底层结构。与 Spark 的 RDD 类似,图是一种函数式数据结构,变更操作会返回新的图。这种Graph对象的不可变性使得可以进行大规模的并行计算,而不会面临同步问题。
对象的并发更新或修改是许多程序中复杂多线程编程的主要原因。
图定义了基本的数据结构,并且有一个辅助类GraphOps,其中包含了额外的便利操作和图算法。
图的定义如下,作为一个类模板,其中有两个属性指定构成图的两部分的数据类型,即顶点和边:
class Graph[VD: ClassTag, ED: ClassTag]
如我们之前讨论的,图由顶点和边组成。顶点集合存储在一个特殊的数据结构中,称为VertexRDD。类似地,边集合存储在另一个特殊的数据结构中,称为EdgeRDD。顶点和边一起构成了图,所有后续操作都可以使用这两种数据结构进行。
所以,Graph类的声明如下所示:
class Graph[VD, ED] {
//A RDD containing the vertices and their associated attributes.
val vertices: VertexRDD[VD]
//A RDD containing the edges and their associated attributes.
The entries in the RDD contain just the source id and target id
along with the edge data.
val edges: EdgeRDD[ED]
//A RDD containing the edge triplets, which are edges along with the
vertex data associated with the adjacent vertices.
val triplets: RDD[EdgeTriplet[VD, ED]]
}
现在,让我们来看一下Graph类的两个主要组成部分:VertexRDD和EdgeRDD。
VertexRDD 和 EdgeRDD
VertexRDD包含顶点或节点的集合,这些顶点存储在一个特殊的数据结构中;EdgeRDD包含节点/顶点之间边或连接的集合,存储在另一个特殊的数据结构中。VertexRDD和EdgeRDD都基于 RDD,VertexRDD处理图中的每一个节点,而EdgeRDD包含所有节点之间的连接。在本节中,我们将介绍如何创建VertexRDD和EdgeRDD,并使用这些对象来构建图。
VertexRDD
如前所述,VertexRDD是一个包含顶点及其相关属性的 RDD。RDD 中的每个元素代表图中的一个顶点或节点。为了保持顶点的唯一性,我们需要为每个顶点分配一个唯一的 ID。为此,GraphX 定义了一个非常重要的标识符,称为VertexId。
VertexId被定义为一个 64 位的顶点标识符,它唯一标识图中的一个顶点。它不需要遵循任何排序或约束,唯一性是唯一要求。
VertexId的声明如下,简单地说,它是一个 64 位Long类型数字的别名:
type VertexId = Long
VertexRDD扩展了一个包含顶点 ID 和顶点属性的 RDD,表示为RDD[(VertexId, VD)]。它还确保每个顶点只有一个条目,并通过预先索引条目来加速高效的连接操作。两个具有相同索引的VertexRDD可以高效地进行连接。
class VertexRDD[VD]() extends RDD[(VertexId, VD)]
VertexRDD也实现了许多函数,这些函数提供了与图操作相关的重要功能。每个函数通常接受由VertexRDD表示的顶点作为输入。
让我们将顶点加载到VertexRDD中。为此,我们首先声明一个案例类User,如下面所示:
case class User(name: String, o***upation: String)
现在,使用users.txt文件创建VertexRDD:
| VertexID | 姓名 | 职业 |
|---|---|---|
| 1 | John | 会计 |
| 2 | Mark | 医生 |
| 3 | Sam | 律师 |
| 4 | Liz | 医生 |
| 5 | Eric | 会计 |
| 6 | Beth | 会计 |
| 7 | Larry | 工程师 |
| 8 | Marry | 收银员 |
| 9 | Dan | 医生 |
| 10 | Ken | 图书管理员 |
users.txt文件的每一行包含VertexId、姓名和职业,所以我们可以在这里使用String的分割函数:
scala> val users = sc.textFile("users.txt").map{ line =>
val fields = line.split(",")
(fields(0).toLong, User(fields(1), fields(2)))
}
users: org.apache.spark.rdd.RDD[(Long, User)] = MapPartitionsRDD[2645] at map at <console>:127
scala> users.take(10)
res103: Array[(Long, User)] = Array((1,User(John,A***ountant)), (2,User(Mark,Doctor)), (3,User(Sam,Lawyer)), (4,User(Liz,Doctor)), (5,User(Eric,A***ountant)), (6,User(Beth,A***ountant)), (7,User(Larry,Engineer)), (8,User(Mary,Cashier)), (9,User(Dan,Doctor)), (10,User(Ken,Librarian)))
EdgeRDD
EdgeRDD表示顶点之间的边集合,是 Graph 类的成员,如前所述。EdgeRDD与VertexRDD一样,都是从 RDD 扩展而来的,并且可以同时包含边属性和顶点属性。
EdgeRDD[ED, VD]通过将边存储在每个分区的列格式中来扩展RDD[Edge[ED]],以提高性能。它还可以存储与每条边相关联的顶点属性,从而提供三元组视图:
class EdgeRDD[ED]() extends RDD[Edge[ED]]
EdgeRDD还实现了许多函数,这些函数提供了与图操作相关的重要功能。每个函数通常接受由EdgeRDD表示的边作为输入。每条边由源顶点 Id、目标顶点 Id 和边属性(如String、Integer或任何案例类)组成。在下面的例子中,我们使用String类型的朋友作为属性。稍后在本章中,我们将使用以英里为单位的距离(Integer)作为属性。
我们可以通过读取一对顶点 Id 的文件来创建EdgeRDD:
| 源顶点 ID | 目标/目的地顶点 ID | 距离(英里) |
|---|---|---|
| 1 | 3 | 5 |
| 3 | 1 | 5 |
| 1 | 2 | 1 |
| 2 | 1 | 1 |
| 4 | 10 | 5 |
| 10 | 4 | 5 |
| 1 | 10 | 5 |
| 10 | 1 | 5 |
| 2 | 7 | 6 |
| 7 | 2 | 6 |
| 7 | 4 | 3 |
| 4 | 7 | 3 |
| 2 | 3 | 2 |
friends.txt文件的每一行都包含源vertexId和目标vertexId,因此我们可以在这里使用String的分割函数:
scala> val friends = sc.textFile("friends.txt").map{ line =>
val fields = line.split(",")
Edge(fields(0).toLong, fields(1).toLong, "friend")
}
friends: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = MapPartitionsRDD[2648] at map at <console>:125
scala> friends.take(10)
res109: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(1,3,friend), Edge(3,1,friend), Edge(1,2,friend), Edge(2,1,friend), Edge(4,10,friend), Edge(10,4,friend), Edge(1,10,friend), Edge(10,1,friend), Edge(2,7,friend), Edge(7,2,friend))
现在我们有了顶点和边,接下来是将一切整合在一起,探索如何从顶点和边的列表构建一个Graph:
scala> val graph = Graph(users, friends)
graph: org.apache.spark.graphx.Graph[User,String] = org.apache.spark.graphx.impl.GraphImpl@327b69c8
scala> graph.vertices
res113: org.apache.spark.graphx.VertexRDD[User] = VertexRDDImpl[2658] at RDD at VertexRDD.scala:57
scala> graph.edges
res114: org.apache.spark.graphx.EdgeRDD[String] = EdgeRDDImpl[2660] at RDD at EdgeRDD.scala:41
使用Graph对象,我们可以通过collect()函数查看顶点和边,collect()会显示所有的顶点和边。每个顶点的形式是(VertexId,User),每条边的形式是(srcVertexId,dstVertexId,edgeAttribute)。
scala> graph.vertices.collect
res111: Array[(org.apache.spark.graphx.VertexId, User)] = Array((4,User(Liz,Doctor)), (6,User(Beth,A***ountant)), (8,User(Mary,Cashier)), (10,User(Ken,Librarian)), (2,User(Mark,Doctor)), (1,User(John,A***ountant)), (3,User(Sam,Lawyer)), (7,User(Larry,Engineer)), (9,User(Dan,Doctor)), (5,User(Eric,A***ountant)))
scala> graph.edges.collect
res112: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(1,2,friend), Edge(1,3,friend), Edge(1,10,friend), Edge(2,1,friend), Edge(2,3,friend), Edge(2,7,friend), Edge(3,1,friend), Edge(3,2,friend), Edge(3,10,friend), Edge(4,7,friend), Edge(4,10,friend), Edge(7,2,friend), Edge(7,4,friend), Edge(10,1,friend), Edge(10,4,friend), Edge(3,5,friend), Edge(5,3,friend), Edge(5,9,friend), Edge(6,8,friend), Edge(6,10,friend), Edge(8,6,friend), Edge(8,9,friend), Edge(8,10,friend), Edge(9,5,friend), Edge(9,8,friend), Edge(10,6,friend), Edge(10,8,friend))
现在我们已经创建了一个图,接下来我们将在下一部分查看各种操作。
图操作符
让我们从直接使用Graph对象进行的操作开始,例如基于对象的某些属性过滤图中的顶点和边。我们还将看到mapValues()的示例,它可以将图转换为自定义的 RDD。
首先,让我们使用我们在前一节创建的Graph对象来检查顶点和边,然后看一些图操作符。
scala> graph.vertices.collect
res111: Array[(org.apache.spark.graphx.VertexId, User)] = Array((4,User(Liz,Doctor)), (6,User(Beth,A***ountant)), (8,User(Mary,Cashier)), (10,User(Ken,Librarian)), (2,User(Mark,Doctor)), (1,User(John,A***ountant)), (3,User(Sam,Lawyer)), (7,User(Larry,Engineer)), (9,User(Dan,Doctor)), (5,User(Eric,A***ountant)))
scala> graph.edges.collect
res112: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(1,2,friend), Edge(1,3,friend), Edge(1,10,friend), Edge(2,1,friend), Edge(2,3,friend), Edge(2,7,friend), Edge(3,1,friend), Edge(3,2,friend), Edge(3,10,friend), Edge(4,7,friend), Edge(4,10,friend), Edge(7,2,friend), Edge(7,4,friend), Edge(10,1,friend), Edge(10,4,friend), Edge(3,5,friend), Edge(5,3,friend), Edge(5,9,friend), Edge(6,8,friend), Edge(6,10,friend), Edge(8,6,friend), Edge(8,9,friend), Edge(8,10,friend), Edge(9,5,friend), Edge(9,8,friend), Edge(10,6,friend), Edge(10,8,friend))
Filter
对filter()的函数调用将顶点集限制为满足给定谓词的顶点集。此操作保留索引以便与原始 RDD 进行高效连接,并且设置位于位掩码中,而不是分配新内存。
def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
使用filter,我们可以过滤出除用户Mark的顶点之外的所有内容,可以使用顶点 ID 或User.name属性进行过滤。我们还可以过滤User.o***upation属性。
以下是完成相同任务的代码:
scala> graph.vertices.filter(x => x._1 == 2).take(10)
res118: Array[(org.apache.spark.graphx.VertexId, User)] = Array((2,User(Mark,Doctor)))
scala> graph.vertices.filter(x => x._2.name == "Mark").take(10)
res119: Array[(org.apache.spark.graphx.VertexId, User)] = Array((2,User(Mark,Doctor)))
scala> graph.vertices.filter(x => x._2.o***upation == "Doctor").take(10)
res120: Array[(org.apache.spark.graphx.VertexId, User)] = Array((4,User(Liz,Doctor)), (2,User(Mark,Doctor)), (9,User(Dan,Doctor)))
我们也可以对边执行filter操作,使用源顶点 ID 或目标顶点 ID。因此,我们可以过滤出仅显示从John(顶点 ID = 1)发出的边:
scala> graph.edges.filter(x => x.srcId == 1)
res123: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = MapPartitionsRDD[2672] at filter at <console>:134
scala> graph.edges.filter(x => x.srcId == 1).take(10)
res124: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(1,2,friend), Edge(1,3,friend), Edge(1,10,friend))
MapValues
mapValues()映射每个顶点属性,保留索引以避免改变顶点 ID。改变顶点 ID 会导致索引变化,从而使后续操作失败,并且顶点将不再可达。因此,重要的是不要改变顶点 ID。
此函数的声明如下所示:
def mapValuesVD2: ClassTag: VertexRDD[VD2]
//A variant of the mapValues() function a***epts a vertexId in addition
to the vertices.
def mapValuesVD2: ClassTag => VD2): VertexRDD[VD2]
mapValues()也可以操作边,对边进行值映射,保留结构但改变值:
def mapValuesED2: ClassTag: EdgeRDD[ED2]
以下是在顶点和边上调用mapValues()的示例代码。在顶点上,MapValues 将顶点转换为(vertexId, User.name)对的列表。在边上,MapValues 将边转换为(srcId, dstId, string)的三元组:
scala> graph.vertices.mapValues{(id, u) => u.name}.take(10)
res142: Array[(org.apache.spark.graphx.VertexId, String)] = Array((4,Liz), (6,Beth), (8,Mary), (10,Ken), (2,Mark), (1,John), (3,Sam), (7,Larry), (9,Dan), (5,Eric))
scala> graph.edges.mapValues(x => s"${x.srcId} -> ${x.dstId}").take(10)
7), Edge(3,1,3 -> 1), Edge(3,2,3 -> 2), Edge(3,10,3 -> 10), Edge(4,7,4 -> 7))
aggregateMessages
GraphX 中的核心聚合操作是aggregateMessages,它将用户定义的sendMsg函数应用于图中每个边三元组,然后使用mergeMsg函数在目标顶点处聚合这些消息。aggregateMessages在许多图算法中使用,其中我们需要在顶点之间交换信息。
以下是此 API 的签名:
def aggregateMessagesMsg: ClassTag => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[Msg]
关键函数是sendMsg和mergeMsg,它们确定发送到边的源顶点或目标顶点的内容。然后,mergeMsg处理从所有边接收到的消息,并执行计算或聚合。
以下是在Graph图上调用aggregateMessages的简单示例代码,其中我们向所有目标顶点发送消息。每个顶点的合并策略只是将接收到的所有消息相加:
scala> graph.aggregateMessagesInt, _ + _).collect
res207: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,2), (6,2), (8,3), (10,4), (2,3), (1,3), (3,3), (7,2), (9,2), (5,2))
TriangleCounting
如果一个顶点的两个邻居通过一条边相连,就会创建一个三角形。换句话说,用户将与那两个互为朋友的朋友创建一个三角形。
Graph 有一个函数triangleCount(),用于计算图中的三角形。
以下是用于通过首先调用 triangleCount 函数并将三角形与顶点(用户)连接,以生成每个用户及其所属三角形输出的代码:
scala> val triangleCounts = graph.triangleCount.vertices
triangleCounts: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[3365] at RDD at VertexRDD.scala:57
scala> triangleCounts.take(10)
res171: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,0), (6,1), (8,1), (10,1), (2,1), (1,1), (3,1), (7,0), (9,0), (5,0))
scala> val triangleCountsPerUser = users.join(triangleCounts).map { case(id, (User(x,y), k)) => ((x,y), k) }
triangleCountsPerUser: org.apache.spark.rdd.RDD[((String, String), Int)] = MapPartitionsRDD[3371] at map at <console>:153
scala> triangleCountsPerUser.collect.mkString("\n")
res170: String =
((Liz,Doctor),0)
((Beth,A***ountant),1) *//1 count means this User is part of 1 triangle*
((Mary,Cashier),1) *//1 count means this User is part of 1 triangle*
((Ken,Librarian),1) *//1 count means this User is part of 1 triangle*
((Mark,Doctor),1) * //1 count means this User is part of 1 triangle*
((John,A***ountant),1) *//1 count means this User is part of 1 triangle*
((Sam,Lawyer),1) *//1 count means this User is part of 1 triangle*
((Larry,Engineer),0)
((Dan,Doctor),0)
((Eric,A***ountant),0)
我们刚才在前面的代码中计算出的两个三角形的图示显示了两个三角形,(John, Mark, Sam) 和 (Ken, Mary, Beth):
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00023.jpeg
Pregel API
图本质上是递归的数据结构,因为顶点的属性依赖于其邻居的属性,而邻居的属性又依赖于它们自己的邻居的属性。因此,许多重要的图算法需要迭代地重新计算每个顶点的属性,直到达到固定点条件。为了表达这些迭代算法,提出了多种图并行抽象。GraphX 提供了 Pregel API 的变体。
从高层次来看,GraphX 中的 Pregel 运算符是一个批量同步的并行消息抽象,受图的拓扑结构限制。Pregel 运算符在一系列步骤中执行,在这些步骤中,顶点接收来自上一超步的传入消息的总和,计算顶点属性的新值,然后在下一超步中向相邻顶点发送消息。使用 Pregel 时,消息是并行计算的,作为边三元组的函数,消息计算可以访问源和目标顶点的属性。没有接收到消息的顶点会在超步中跳过。Pregel 运算符在没有剩余消息时终止迭代并返回最终图。
一些内置的 Pregel API 算法如下所示:
-
连通分量
-
最短路径
-
旅行商问题
-
PageRank(将在下一节介绍)
Pregel API 的签名如下所示,显示了所需的各种参数。确切的用法将在后续章节中展示,您可以参考此签名以获得更多信息:
def pregel[A]
(initialMsg: A, // the initial message to all vertices
maxIter: Int = Int.MaxValue, // number of iterations
activeDir: EdgeDirection = EdgeDirection.Out) // in***ing or outgoing edges
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], //send message function
mergeMsg: (A, A) => A) //merge strategy
: Graph[VD, ED]
连通分量
连通分量本质上是图中的子图,其中顶点通过某种方式相互连接。这意味着同一组件中的每个顶点都与组件中的其他顶点有一条边。当没有其他边将顶点连接到某个组件时,就会创建一个新的组件,该组件包含该特定顶点。这个过程会一直持续,直到所有顶点都属于某个组件。
图对象提供了一个 connect***ponents() 函数来计算连通分量。该函数在底层使用 Pregel API 来计算顶点所属的组件。以下是用于计算图中连通分量的代码。显然,在此示例中,我们只有一个连通分量,因此所有用户的组件编号都显示为 1:
scala> graph.connected***ponents.vertices.collect res198: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId)] = Array((4,1), (6,1), (8,1), (10,1), (2,1), (1,1), (3,1), (7,1), (9,1), (5,1))
scala> graph.connected***ponents.vertices.join(users).take(10)
res197: Array[(org.apache.spark.graphx.VertexId, (org.apache.spark.graphx.VertexId, User))] = Array((4,(1,User(Liz,Doctor))), (6,(1,User(Beth,A***ountant))), (8,(1,User(Mary,Cashier))), (10,(1,User(Ken,Librarian))), (2,(1,User(Mark,Doctor))), (1,(1,User(John,A***ountant))), (3,(1,User(Sam,Lawyer))), (7,(1,User(Larry,Engineer))), (9,(1,User(Dan,Doctor))), (5,(1,User(Eric,A***ountant))))
旅行商问题
旅行商问题试图在无向图中找到一条最短路径,遍历每个顶点。例如,用户约翰想要开车到每个其他用户的地方,最小化总驾驶距离。随着顶点和边的数量增加,排列组合的数量也会呈多项式增长,覆盖从顶点到顶点的所有可能路径。时间复杂度也会多项式增长,问题的解决可能需要很长时间。与其完全准确地解决它,更常用的方法是采用贪心算法,以尽可能最优的方式解决该问题。
为了解决旅行商问题,贪心算法会快速选择最短的边,尽管我们知道如果继续进一步深度遍历,这可能是一个非最优选择。
贪心算法在用户和朋友图中的示意图如下所示,其中我们看到在每个顶点处选择最短的加权边进行遍历。同时请注意,顶点拉里(7)和莉兹(4)从未被访问过:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00025.jpeg
ShortestPaths
最短路径算法通过从源顶点开始,然后遍历连接各顶点的边,直到到达目标顶点,从而找到两顶点之间的路径。该算法通过各顶点之间交换消息来实现。此外,这个最短路径算法并不是Graph或GraphOps对象的一部分,而必须通过lib.ShortestPaths()来调用:
scala> lib.ShortestPaths.run(graph,Array(1)).vertices.join(users).take(10)
res204: Array[(org.apache.spark.graphx.VertexId, (org.apache.spark.graphx.lib.ShortestPaths.SPMap, User))] = Array((4,(Map(1 -> 2),User(Liz,Doctor))), (6,(Map(1 -> 2),User(Beth,A***ountant))), (8,(Map(1 -> 2),User(Mary,Cashier))), (10,(Map(1 -> 1),User(Ken,Librarian))), (2,(Map(1 -> 1),User(Mark,Doctor))), (1,(Map(1 -> 0),User(John,A***ountant))), (3,(Map(1 -> 1),User(Sam,Lawyer))), (7,(Map(1 -> 2),User(Larry,Engineer))), (9,(Map(1 -> 3),User(Dan,Doctor))), (5,(Map(1 -> 2),User(Eric,A***ountant))))
ShortestPaths选择两个顶点之间跳跃次数最少的最短路径。以下图示展示了约翰到拉里的三种路径,其中两条路径长度为 2,一条路径长度为 3。从前面代码的结果可以清楚地看到,从拉里到约翰选择的路径长度为 2。
上述代码块的输出显示了一个包含路径长度和节点的向量(7,(Map(1 -> 2),User(Larry,Engineer))):
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00031.jpeg
我们还可以使用加权边计算最短路径,这意味着连接用户的每条边的权重不同。例如,如果我们可以将边的值/权重/属性视为每个用户居住地之间的距离,我们就得到一个加权图。在这种情况下,最短路径是通过计算两个用户之间的距离(以英里为单位)来求得的:
scala> val srcId = 1 //vertex ID 1 is the user John
srcId: Int = 1
scala> val initGraph = graph.mapVertices((id, x) => if(id == srcId) 0.0 else Double.PositiveInfinity)
initGraph: org.apache.spark.graphx.Graph[Double,Long] = org.apache.spark.graphx.impl.GraphImpl@2b9b8608
scala> val weightedShortestPath = initGraph.pregel(Double.PositiveInfinity, 5)(
| (id, dist, newDist) => math.min(dist, newDist),
| triplet => {
| if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
| Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
| }
| else {
| Iterator.empty
| }
| },
| (a, b) => math.min(a, b)
| )
weightedShortestPath: org.apache.spark.graphx.Graph[Double,Long] = org.apache.spark.graphx.impl.GraphImpl@1f87fdd3
scala> weightedShortestPath.vertices.take(10).mkString("\n")
res247: String =
(4,10.0)
(6,6.0)
(8,6.0)
(10,5.0)
(2,1.0)
(1,0.0)
(3,3.0)
(7,7.0)
(9,5.0)
(5,4.0)
以下是一个使用 Pregel API 计算从约翰到拉里的单源最短路径的示意图,计算过程从初始化开始,逐步迭代直到我们到达最佳路径。
图的初始化是通过将代表约翰的顶点的值设置为零,所有其他顶点的值设置为正无穷大来完成的:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00010.jpeg
初始化完成后,我们将使用 Pregel 进行四次迭代来重新计算顶点值。在每次迭代中,我们遍历所有顶点,并在每个顶点处检查是否存在更好的从源顶点到目标顶点的路径。如果存在这样的边/路径,则更新顶点值。
让我们定义两个函数 distance(v) 和 distance(s, t),其中 distance(v) 返回一个顶点的值,distance(s,t) 返回连接 s 到 t 的边的值。
在迭代 1 中,除约翰外的每个用户都被设置为无限大,约翰的距离为 0,因为他是源顶点。现在,我们使用 Pregel 遍历各个顶点,检查是否有比无限大更好的值。以 Ken 为例,我们将检查 distance(“John”) + distance(“John”, “Ken”) < distance(“Ken”)。
这相当于检查 0 + 5 < Infinity,结果是 true;所以我们将 Ken 的距离更新为 5。
类似地,我们检查 Mary,distance(“Ken”) + distance(“Ken”, “Mary”) < distance(“Mary”),结果是false,因为那时 Ken 仍然是无限远。因此,在迭代 1 中,我们只能更新与约翰连接的用户。
在下一次迭代中,Mary、Liz、Eric 等人都会被更新,因为现在我们已经有了来自迭代 1 中的 Ken、Mark 和 Sam 的更新值。这将继续进行,直到达到 Pregel API 调用中指定的迭代次数。
以下是计算图中单源最短路径时,各个迭代步骤的示意图:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00038.jpeg
在经过四次迭代后,从约翰到拉里的最短路径显示,最短路径为五英里。从约翰到拉里的路径可以通过以下路径查看:约翰 | 马克 | 萨姆 | 拉里:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00275.jpeg
PageRank
PageRank 是图处理领域最重要的算法之一。该算法源于 Google,以 Google 创始人 Larry Page 的名字命名,并且基于通过关系或边缘对顶点或节点进行排名的概念,已经发展出许多不同的应用场景。
Google PageRank 通过计算指向页面的链接数量和质量,来大致估算一个网站的重要性。其基本假设是,越重要的网站更可能收到来自其他网站的更多链接。如需了解更多信息,您可以阅读 en.wikipedia.org/wiki/PageRank 上的描述。
以 Google PageRank 为例,通过在其他流行网站和技术博客中推广您的网页,您可以提高您公司网站或博客中某个网页的相对重要性。使用这种方法,您的博客网站可能会在 Google 搜索结果中排名高于其他类似网页,尤其是当有许多第三方网站显示您的博客网站及其内容时。
搜索引擎优化(SEO)是营销领域中最大的行业之一,几乎每个网站都在投资这一技术。SEO 涉及各种技术和策略,主要目的是提高网站在搜索引擎结果中排名的高度,当用户搜索相关词汇时,网站能够排在前面。这基于类似 Google PageRank 的概念。
如果你将网页视为节点/顶点,将网页之间的超链接视为边缘,那么我们基本上就创建了一个图。现在,如果你能计算网页的排名,作为指向该网页的超链接/边缘的数量,例如你的 myblog.*** 网站上有指向 ***n.*** 或 msnbc.*** 的链接,用户可以点击这些链接访问你的 myblog.*** 页面。这可以作为一个表示 myblog.*** 顶点重要性的因子。如果我们递归地应用这个简单的逻辑,最终我们会为每个顶点分配一个排名,该排名是通过计算传入边的数量和基于源顶点排名的 PageRank 来得到的。一个被许多高 PageRank 网页链接的页面,自己也会获得较高的排名。让我们看看如何使用 Spark GraphX 在大数据规模上解决 PageRank 问题。正如我们所看到的,PageRank 衡量了图中每个顶点的重要性,假设从 a 到 b 的边表示 a 提升了 b 的值。例如,如果一个 Twitter 用户被许多其他用户关注,那么该用户将被排名较高。
GraphX 提供了静态和动态实现的 PageRank,作为 pageRank 对象上的方法。静态 PageRank 运行固定次数的迭代,而动态 PageRank 会一直运行直到排名收敛。GraphOps 允许直接在图上调用这些算法方法:
scala> val prVertices = graph.pageRank(0.0001).vertices
prVertices: org.apache.spark.graphx.VertexRDD[Double] = VertexRDDImpl[8245] at RDD at VertexRDD.scala:57
scala> prVertices.join(users).sortBy(_._2._1, false).take(10)
res190: Array[(org.apache.spark.graphx.VertexId, (Double, User))] = Array((10,(1.4600029149839906,User(Ken,Librarian))), (8,(1.1424200609462447,User(Mary,Cashier))), (3,(1.1279748817993318,User(Sam,Lawyer))), (2,(1.1253662371576425,User(Mark,Doctor))), (1,(1.0986118723393328,User(John,A***ountant))), (9,(0.8215535923013982,User(Dan,Doctor))), (5,(0.8186673059832846,User(Eric,A***ountant))), (7,(0.8107902215195832,User(Larry,Engineer))), (4,(0.8047583729877394,User(Liz,Doctor))), (6,(0.783902117150218,User(Beth,A***ountant))))
图上的 PageRank 算法示意图如下:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00309.jpeg
总结
在本章中,我们通过使用 Facebook 作为示例介绍了图论;Apache Spark 的图处理库 GraphX、VertexRDD 和 EdgeRDDs;图操作符 aggregateMessages、TriangleCounting 和 Pregel API;以及像 PageRank 算法这样的应用案例。我们还了解了旅行推销员问题和连通分量等内容。我们看到了如何使用 GraphX API 开发大规模图处理算法。
在第十一章,学习机器学习 - Spark MLlib 和 ML,我们将探索 Apache Spark 的机器学习库的精彩世界。
第十一章:学习机器学习 - Spark MLlib 和 Spark ML
“我们每个人,实际上每一种动物,都是数据科学家。我们从传感器中收集数据,然后处理这些数据以获得抽象规则,以感知我们的环境并控制我们在该环境中的行为,减少痛苦和/或增加快乐。我们有记忆来存储这些规则,然后在需要时回忆并使用它们。学习是终身的;当规则不再适用时,我们会忘记它们,或者当环境变化时,我们会修订它们。”
- Ethem Alpaydin,《机器学习:新型人工智能》
本章的目的是为那些在典型的统计学训练中可能没有接触过此类方法的人提供统计机器学习(ML)技术的概念性介绍。本章还旨在通过几个步骤,帮助新人从几乎没有机器学习知识到成为一名有经验的从业者。我们将以理论和实践的方式,重点介绍 Spark 的机器学习 API,称为 Spark MLlib 和 Spark ML。此外,我们还将提供一些涵盖特征提取与转换、降维、回归和分类分析的示例。简而言之,本章将涵盖以下主题:
-
机器学习简介
-
Spark 机器学习 API
-
特征提取与转换
-
使用 PCA 进行回归的降维
-
二分类与多分类
机器学习简介
在这一部分,我们将从计算机科学、统计学和数据分析的角度尝试定义机器学习。**机器学习(ML)**是计算机科学的一个分支,它使计算机能够在没有明确编程的情况下学习(Arthur Samuel,1959 年)。这个研究领域源自人工智能中的模式识别和计算学习理论的研究。
更具体地说,机器学习探索了可以从启发式方法中学习并对数据进行预测的算法的研究和构建。这类算法通过根据样本输入构建模型,从而克服了严格静态的程序指令,通过数据驱动的预测或决策来工作。现在,让我们从计算机科学的角度听听 Tom M. Mitchell 教授对机器学习的更明确和多样化的定义:
如果一个计算机程序在任务类别 T 和性能度量 P 下,随着经验 E 的增加,其在 T 任务中的表现通过 P 测量得到改进,那么我们说该程序从经验 E 中学习。
基于这个定义,我们可以得出结论:计算机程序或机器可以:
-
从数据和历史中学习
-
通过经验得到提升
-
互动地增强一个可以用来预测问题结果的模型
典型的机器学习任务包括概念学习、预测建模、聚类和寻找有用的模式。最终目标是改进学习,使其变得自动化,从而不再需要人工干预,或者尽可能减少人工干预的程度。尽管机器学习有时与知识发现和数据挖掘(KDDM)混淆,但 KDDM 更多地侧重于探索性数据分析,并且被称为无监督学习。典型的机器学习应用可以分为科学知识发现和更多的商业应用,从机器人学或人机交互(HCI)到反垃圾邮件过滤和推荐系统。
典型的机器学习工作流程
一个典型的机器学习应用涉及几个步骤,从输入、处理到输出,形成一个科学工作流程,如图 1所示。一个典型机器学习应用所涉及的步骤如下:
-
加载样本数据。
-
将数据解析为算法的输入格式。
-
对数据进行预处理并处理缺失值。
-
将数据分成两个集:一个用于构建模型(训练数据集),另一个用于测试模型(验证数据集)。
-
运行算法来构建或训练你的机器学习模型。
-
使用训练数据进行预测并观察结果。
-
使用测试数据集测试和评估模型,或者使用交叉验证技术,通过第三个数据集(称为验证数据集)来验证模型。
-
调整模型以提高性能和准确性。
-
扩展模型,使其能够在未来处理大规模数据集。
-
将机器学习模型投入商业化应用。
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00266.jpeg图 1: 机器学习工作流程
通常,机器学习算法有一些方法来处理数据集中的偏斜性。然而,这种偏斜性有时非常严重。在第 4 步中,实验数据集被随机分割,通常分为训练集和测试集,这个过程称为抽样。训练数据集用于训练模型,而测试数据集用于在最后评估最佳模型的表现。更好的做法是尽可能多地使用训练数据集,以提高泛化性能。另一方面,建议只使用一次测试数据集,以避免在计算预测误差和相关度量时出现过拟合问题。
机器学习任务
根据学习系统可获得的反馈类型,机器学习任务或过程通常分为三大类:有监督学习、无监督学习和强化学习,如图 2 所示。此外,还有其他机器学习任务,例如降维、推荐系统、频繁模式挖掘等。
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00272.jpeg图 2: 机器学习任务
有监督学习
监督学习应用基于一组示例做出预测,目标是学习能够将输入映射到符合现实世界输出的通用规则。例如,垃圾邮件过滤的数据集通常包含垃圾邮件和非垃圾邮件。因此,我们可以知道训练集中的邮件是垃圾邮件还是正常邮件。然而,我们可能有机会利用这些信息来训练模型,从而对新的未见过的邮件进行分类。下图展示了监督学习的示意图。当算法找到所需的模式后,这些模式可以用于对无标签的测试数据进行预测。这是最流行且最有用的机器学习任务类型,在 Spark 中也不例外,其中大多数算法都是监督学习技术:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00278.jpeg图 3:监督学习的应用
示例包括用于解决监督学习问题的分类和回归。我们将在本书中提供几个监督学习的示例,如逻辑回归、随机森林、决策树、朴素贝叶斯、一对多分类等。然而,为了使讨论更具实质性,本书将仅讨论逻辑回归和随机森林,其他算法将在第十二章中讨论,章节标题为高级机器学习最佳实践,并附有一些实际的示例。另一方面,线性回归将用于回归分析。
无监督学习
在无监督学习中,数据点没有与之相关的标签。因此,我们需要通过算法为其添加标签,如下图所示。换句话说,训练数据集在无监督学习中的正确类别是未知的。因此,类别必须从非结构化数据集中推断出来,这意味着无监督学习算法的目标是通过描述数据的结构以某种结构化方式对数据进行预处理。
为了克服无监督学习中的这一障碍,通常使用聚类技术根据一定的相似性度量将无标签样本分组。因此,这项任务也涉及挖掘隐藏的模式以进行特征学习。聚类是智能地对数据集中的项进行分类的过程。总体思路是,同一聚类中的两个项彼此“更接近”,而属于不同聚类的项则较远。这是一般定义,留给“接近”的解释是开放的。
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00282.jpeg图 4:无监督学习
示例包括聚类、频繁模式挖掘和降维用于解决无监督学习问题(也可应用于监督学习问题)。我们将在本书中提供几个无监督学习的示例,如 k 均值、二分 k 均值、高斯混合模型,潜在狄利克雷分配 (LDA),等等。我们还将展示如何通过回归分析在监督学习中使用降维算法,如主成分分析 (PCA) 或 奇异值分解 (SVD)。
降维 (DR):降维是一种在特定条件下减少随机变量数量的技术。这种技术用于监督学习和非监督学习。使用降维技术的典型优势如下:
-
它减少了机器学习任务所需的时间和存储空间
-
它有助于消除多重共线性,并改善机器学习模型的性能
-
当降到 2D 或 3D 等非常低的维度时,数据可视化变得更加容易
强化学习
作为一个人类,你和我们也从过去的经验中学习。我们不是偶然变得如此迷人的。多年来的积极赞美和负面批评都帮助塑造了我们今天的样子。通过与朋友、家人甚至陌生人的互动,你学会了如何让人们感到快乐,并且通过尝试不同的肌肉运动来学会骑自行车,直到这些动作变得自然。有时,当你执行动作时,你会立即获得回报。例如,找到附近的购物中心可能会带来即时满足感。而有时,奖励并不会立即出现,例如旅行长途寻找一个特别好的吃饭地点。这些都与强化学习(RL)有关。
因此,RL 是一种技术,其中模型本身从一系列操作或行为中学习。数据集的复杂性或样本复杂性对于算法成功学习目标函数非常重要。此外,在与外部环境交互时,针对每个数据点的最终目标是确保最大化奖励函数,如下图所示:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00288.jpeg图 5:强化学习
强化学习技术正在许多领域中使用。以下是一个非常简短的列表:
-
广告有助于学习排名,使用一次性学习处理新兴项目,新用户将带来更多的收入
-
教导机器人新任务,同时保留先前的知识
-
衍生复杂的层次方案,从国际象棋开局到交易策略
-
路由问题,例如,管理运输舰队,分配卡车/司机到哪种货物
-
在机器人学中,算法必须根据一组传感器读数选择机器人的下一步行动
-
它也是物联网(IoT)应用的一个自然选择,在这些应用中,计算机程序与动态环境互动,在没有明确导师的情况下,它必须完成某个目标。
-
最简单的强化学习问题之一是 n 臂老虎机。问题在于有 n 个老虎机,但每个老虎机的固定支付概率不同。目标是通过始终选择支付最好的老虎机来最大化利润。
-
一个新兴的应用领域是股市交易。在这种情况下,交易员像一个强化学习代理,因为买卖(即行为)某只股票会通过产生利润或亏损来改变交易员的状态,即奖励。
推荐系统
推荐系统是信息过滤系统的一个子类,它旨在预测用户通常对某个物品的评分或偏好。推荐系统的概念近年来变得非常普遍,并已应用于不同的领域。
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00294.jpeg图 6:不同的推荐系统
最常见的推荐系统可能是产品(例如电影、音乐、书籍、研究文章、新闻、搜索查询、社交标签等)。推荐系统通常可以分为以下四类:
-
协同过滤,也叫做社交过滤,通过利用其他人的推荐来过滤信息。问题是,过去在评估某些物品时达成一致的人,未来也可能再次达成一致。因此,例如,想看电影的人可能会向他的朋友们请求推荐。现在,一旦他从一些有相似兴趣并且更受信任的朋友那里获得了推荐,这些推荐就比其他人的更有参考价值。这些信息被用来决定看哪部电影。
-
基于内容的过滤(也称为认知过滤),它根据物品内容与用户档案之间的比较来推荐物品。每个物品的内容通常以一组描述符或术语表示,通常是文档中出现的词语。用户档案使用相同的术语来表示,通过分析用户看到过的物品内容来构建。然而,在实现这些类型的推荐系统时,需要考虑以下一些问题:
- 首先,术语可以自动或手动分配。对于自动分配,必须选择一种方法,以便能够从项目列表中提取这些项。其次,术语必须以一种方式表示,以便用户档案和项目能够以有意义的方式进行比较。学习算法本身必须明智地选择,以便能够基于已观察到(即已看到)的项目来学习用户档案,并根据该用户档案做出适当的推荐。内容过滤系统通常用于文本文档,其中使用术语解析器从文档中选择单个单词。向量空间模型和潜在语义索引是两种方法,通过这些术语将文档表示为多维空间中的向量。此外,它还用于相关反馈、遗传算法、神经网络和贝叶斯分类器,以学习用户档案。
-
混合推荐系统是近年来的研究成果,一种混合方法(即结合协同过滤和内容过滤)。***flix 是这种推荐系统的一个典型例子,它使用限制玻尔兹曼机(RBM)和一种矩阵分解算法形式,处理像 IMDb 这样的庞大电影数据库(详见
pdfs.semanticscholar.org/789a/d4218d1e2e920b4d192023f840fe8246d746.pdf)。这种通过比较相似用户的观看和搜索习惯来推荐电影、电视剧或流媒体的推荐方式被称为评分预测。 -
基于知识的系统,通过用户和产品的知识推理,了解哪些可以满足用户需求,使用感知树、决策支持系统和案例推理。
本章将讨论基于协同过滤的电影推荐系统。
半监督学习
在监督学习和无监督学习之间,有一个小的空间用于半监督学习。在这种情况下,机器学习模型通常会接收一个不完整的训练信号。从统计学的角度来看,机器学习模型接收到的训练集部分目标输出是缺失的。半监督学习或多或少是基于假设的,通常使用三种假设算法作为未标记数据集的学习算法。使用的假设有:平滑性、聚类和流形。换句话说,半监督学习也可以被称为弱监督学习,或者是利用未标记样本的隐藏信息来增强从少量标记数据中学习的自举技术。
如前所述,获取标注数据通常需要熟练的人工操作。因此,标注过程所涉及的成本可能会使得完全标注的训练集不可行,而获取未标注数据则相对便宜。
例如:转录音频片段、确定蛋白质的三维结构或确定某个特定位置是否有石油、期望最小化和人类认知以及传递性。在这种情况下,半监督学习可以具有很大的实际价值。
Spark 机器学习 API
在本节中,我们将介绍 Spark 机器学习库(Spark MLlib 和 Spark ML)所引入的两个关键概念,以及与我们在前面章节中讨论的监督学习和无监督学习技术相对应的最常用的已实现算法。
Spark 机器学习库
正如前面所说,在 Spark 出现之前,大数据建模人员通常使用统计语言如 R、STATA 和 SAS 来构建他们的机器学习模型。然而,这种工作流程(即这些机器学习算法的执行流程)缺乏效率、可扩展性和吞吐量,也缺乏准确性,当然,执行时间也较长。
然后,数据工程师通常会重新用 Java 实现相同的模型,例如部署到 Hadoop 上。使用 Spark,可以重建、采用并部署相同的机器学习模型,使整个工作流程更加高效、稳健和快速,从而提供实际的见解来提高性能。此外,在 Hadoop 中实现这些算法意味着这些算法可以并行运行,这是 R、STATA 和 SAS 等工具无法做到的。Spark 机器学习库分为两个包:Spark MLlib(spark.mllib)和 Spark ML(spark.ml)。
Spark MLlib
MLlib 是 Spark 的可扩展机器学习库,是 Spark 核心 API 的扩展,提供了易于使用的机器学习算法库。Spark 算法是用 Scala 实现的,然后公开 Java、Scala、Python 和 R 的 API。Spark 支持本地向量和矩阵数据类型,这些数据类型存储在单个机器上,同时支持由一个或多个 RDD 支持的分布式矩阵。Spark MLlib 的优点很多。例如,算法具有很高的可扩展性,并利用 Spark 处理海量数据的能力。
-
它们是为并行计算而设计的,采用基于内存的操作,速度比 MapReduce 数据处理快 100 倍(它们也支持基于磁盘的操作,比 MapReduce 的常规数据处理快 10 倍)。
-
它们种类繁多,涵盖了回归分析、分类、聚类、推荐系统、文本分析和频繁模式挖掘等常见的机器学习算法,显然涵盖了构建可扩展机器学习应用程序所需的所有步骤。
Spark ML
Spark ML 添加了一套新的机器学习 API,让用户能够在数据集之上快速组装和配置实用的机器学习管道。Spark ML 的目标是提供一组统一的高级 API,建立在 DataFrame 之上,而非 RDD,帮助用户创建和调整实用的机器学习管道。Spark ML API 标准化了机器学习算法,使得学习任务更容易将多个算法合并成单一的管道或数据工作流,方便数据科学家使用。Spark ML 使用了 DataFrame 和 Datasets 的概念,这些概念在 Spark 1.6 中作为实验性功能被引入,随后在 Spark 2.0+ 中得到了应用。
在 Scala 和 Java 中,DataFrame 和 Dataset 已经统一,即 DataFrame 只是行数据集的类型别名。在 Python 和 R 中,由于缺乏类型安全,DataFrame 是主要的编程接口。
数据集包含多种数据类型,比如存储文本、特征向量和数据的真实标签的列。除此之外,Spark ML 还使用转换器将一个 DataFrame 转换为另一个,反之亦然,其中估算器的概念用于在 DataFrame 上进行拟合,产生新的转换器。另一方面,管道 API 可以将多个转换器和估算器组合在一起,指定 ML 数据工作流。在开发 ML 应用时,引入了参数的概念,以便将所有转换器和估算器在同一个 API 下共享。
Spark MLlib 还是 Spark ML?
Spark ML 提供了一个基于 DataFrame 的高级 API,用于构建 ML 管道。基本上,Spark ML 提供了一套工具集,帮助你在数据上构建不同的机器学习相关转换管道。例如,它可以轻松地将特征提取、降维和分类器训练链式组合成一个模型,整体上可以用于分类任务。然而,MLlib 是一个较旧的库,开发时间较长,因此它拥有更多的功能。因此,推荐使用 Spark ML,因为它的 API 在与 DataFrame 配合时更加灵活且功能多样。
特征提取与转换
假设你要构建一个机器学习模型,用于预测信用卡交易是否为欺诈交易。根据现有的背景知识和数据分析,你可能会决定哪些数据字段(即特征)对于训练你的模型是重要的。例如,金额、客户姓名、购买公司名称以及信用卡持有者的地址都值得提供用于整个学习过程。这些需要考虑,因为如果你只提供一个随机生成的交易 ID,它将不包含任何信息,因此完全没有用。因此,一旦你决定了训练集需要包含哪些特征,你就需要对这些特征进行转换,以便更好地训练模型。特征转换有助于你向训练数据中添加更多的背景信息。这些信息最终使机器学习模型能够从中受益。为了让上述讨论更加具体,假设你有以下客户地址字符串:
"123 Main Street, Seattle, WA 98101"
如果你看到上面的地址,发现这个地址缺乏适当的语义。换句话说,这个字符串的表达能力有限。这个地址只对学习与该精确地址相关的地址模式有用,例如,在数据库中进行学习时。但将其拆解为基本部分可以提供更多特征,例如:
-
“Address”(123 Main Street)
-
“City”(Seattle)
-
“State”(WA)
-
“Zip”(98101)
如果你看到上面的模式,机器学习算法现在可以将更多不同的交易归为一类,并发现更广泛的模式。这是正常现象,因为一些客户的邮政编码比其他人更容易发生欺诈活动。Spark 提供了若干实现的算法用于特征提取,并使转换更加简便。例如,当前版本提供了以下特征提取算法:
-
TF-IDF
-
Word2vec
-
CountVectorizer
另一方面,特征转换器是一个抽象,包含了特征转换器和学习模型。技术上,转换器实现了一个名为transform()的方法,该方法将一个 DataFrame 转换为另一个,通常通过附加一个或多个列。Spark 支持以下转换器到 RDD 或 DataFrame:
-
Tokenizer
-
StopWordsRemover
-
n-gram
-
Binarizer
-
PCA
-
PolynomialExpansion
-
离散余弦变换(DCT)
-
StringIndexer
-
IndexToString
-
OneHotEncoder
-
VectorIndexer
-
Interaction
-
Normalizer
-
StandardScaler
-
MinMaxScaler
-
MaxAbsScaler
-
Bucketizer
-
ElementwiseProduct
-
SQLTransformer
-
VectorAssembler
-
QuantileDiscretizer
由于页面限制,我们无法描述所有内容。但我们将讨论一些广泛使用的算法,例如CountVectorizer、Tokenizer、StringIndexer、StopWordsRemover、OneHotEncoder等。常用于降维的 PCA 将在下一节中讨论。
CountVectorizer
CountVectorizer 和 CountVectorizerModel 旨在帮助将一组文本文档转换为标记计数的向量。当先前的字典不可用时,可以使用 CountVectorizer 作为估计器来提取词汇,并生成 CountVectorizerModel。该模型为文档在词汇表上的表示生成稀疏表示,随后可以传递给其他算法,如 LDA。
假设我们有如下的文本语料库:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00296.gif图 7:仅包含名称的文本语料库
现在,如果我们想将前面的文本集合转换为标记计数的向量,Spark 提供了 CountVectorizer() API 来实现这一点。首先,让我们为之前的表格创建一个简单的 DataFrame,如下所示:
val df = spark.createDataFrame(
Seq((0, Array("Jason", "David")),
(1, Array("David", "Martin")),
(2, Array("Martin", "Jason")),
(3, Array("Jason", "Daiel")),
(4, Array("Daiel", "Martin")),
(5, Array("Moahmed", "Jason")),
(6, Array("David", "David")),
(7, Array("Jason", "Martin")))).toDF("id", "name")
df.show(false)
在许多情况下,你可以使用 setInputCol 设置输入列。让我们来看一个例子,并从语料库拟合一个 CountVectorizerModel 对象,如下所示:
val cvModel: CountVectorizerModel = new CountVectorizer()
.setInputCol("name")
.setOutputCol("features")
.setVocabSize(3)
.setMinDF(2)
.fit(df)
现在让我们使用提取器下游处理向量化,如下所示:
val feature = cvModel.transform(df)
spark.stop()
现在让我们检查一下,确保它正常工作:
feature.show(false)
上面的代码行生成了以下输出:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00306.gif图 8:名称文本语料库已经被特征化
现在让我们进入特征转换器部分。最重要的转换器之一是分词器,它在机器学习任务中经常用于处理分类数据。我们将在下一节中看到如何使用这个转换器。
分词器
分词是从原始文本中提取重要成分的过程,如单词和句子,并将原始文本拆分为单个术语(也称为单词)。如果你希望对常规表达式匹配进行更高级的分词,RegexTokenizer 是一个很好的选择。默认情况下,pattern(正则表达式,默认值:s+)作为分隔符来拆分输入文本。否则,你还可以将参数 gaps 设置为 false,表示正则表达式 pattern 表示 tokens 而不是拆分空隙。这样,你可以找到所有匹配的出现,作为分词的结果。
假设你有以下句子:
-
分词(Tokenization)是从原始文本中提取单词的过程。
-
如果你希望进行更高级的分词,
RegexTokenizer是一个不错的选择。 -
这里将提供一个示例,演示如何对句子进行分词。
-
这样,你可以找到所有匹配的出现。
现在,你想要对前面四个句子中的每个有意义的单词进行分词。让我们从之前的句子中创建一个 DataFrame,如下所示:
val sentence = spark.createDataFrame(Seq(
(0, "Tokenization,is the process of enchanting words,from the raw text"),
(1, " If you want,to have more advance tokenization,RegexTokenizer,
is a good option"),
(2, " Here,will provide a sample example on how to tockenize sentences"),
(3, "This way,you can find all matching o***urrences"))).toDF("id",
"sentence")
现在让我们通过实例化 Tokenizer() API 创建一个分词器,如下所示:
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
现在,使用 UDF 计算每个句子中标记的数量,如下所示:import org.apache.spark.sql.functions._
val countTokens = udf { (words: Seq[String]) => words.length }
现在分词每个句子中的单词,如下所示:
val tokenized = tokenizer.transform(sentence)
最后,显示每个标记与每个原始句子的对应关系,如下所示:
tokenized.select("sentence", "words")
.withColumn("tokens", countTokens(col("words")))
.show(false)
上面的代码行打印出来自标记化 DataFrame 的快照,其中包含原始句子、单词袋和标记数量:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00315.gif图 9:从原始文本中分词后的单词
然而,如果您使用RegexTokenizer API,将获得更好的结果。过程如下:
通过实例化RegexTokenizer() API 来创建一个正则表达式分词器:
val regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\\W+")
.setGaps(true)
现在,按照以下方式对每个句子中的单词进行分词:
val regexTokenized = regexTokenizer.transform(sentence)
regexTokenized.select("sentence", "words")
.withColumn("tokens", countTokens(col("words")))
.show(false)
上述代码行通过 RegexTokenizer 打印出经过分词的 DataFrame 快照,包含原始句子、词袋以及词汇数量:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00318.gif图 10:使用 RegexTokenizer 进行更好的分词
StopWordsRemover
停用词是指应该从输入中排除的词,通常是因为这些词出现频率较高且含义不大。Spark 的StopWordsRemover接受一个由Tokenizer或RegexTokenizer分词后的字符串序列作为输入,然后从输入序列中去除所有停用词。停用词列表由stopWords参数指定。目前StopWordsRemover API 的实现支持丹麦语、荷兰语、芬兰语、法语、德语、匈牙利语、意大利语、挪威语、葡萄牙语、俄语、西班牙语、瑞典语、土耳其语和英语等语言。为了提供一个示例,我们可以简单地扩展前一节中的Tokenizer示例,因为它们已经被分词。然而,在这个示例中,我们将使用RegexTokenizer API。
首先,通过StopWordsRemover() API 创建一个停用词移除器实例,如下所示:
val remover = new StopWordsRemover()
.setInputCol("words")
.setOutputCol("filtered")
现在,让我们去除所有停用词并打印结果,如下所示:
val newDF = remover.transform(regexTokenized)
newDF.select("id", "filtered").show(false)
上述代码行打印出去除停用词后的过滤 DataFrame 快照:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00324.gif图 11:过滤后的(即没有停用词的)分词
StringIndexer
StringIndexer 将标签的字符串列编码为标签索引列。索引值位于0, numLabels)区间内,按标签频率排序,因此最常见的标签将获得索引 0。如果输入列是数值型,我们将其转换为字符串并对字符串值进行索引。当下游管道组件(如估算器或转换器)使用该字符串索引标签时,必须将组件的输入列设置为此字符串索引列的名称。在许多情况下,您可以通过setInputCol设置输入列。假设您有以下格式的分类数据:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00381.gif图 13:使用 StringIndexer 创建标签
另一个重要的转换器是 OneHotEncoder,它在处理类别数据的机器学习任务中经常被使用。我们将在下一节中学习如何使用这个转换器。
OneHotEncoder
独热编码将标签索引列映射到一个二进制向量列,最多只有一个值。这个编码方式使得像逻辑回归这样的算法能够使用类别特征,尽管它们期望的是连续特征。假设你有一些类别数据,格式如下(与我们在上一节中描述 StringIndexer 使用的格式相同):
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00253.gif图 14: 应用 OneHotEncoder 的 DataFrame
现在,我们想要对名字列进行索引,以便数据集中最频繁的名字(在我们的例子中是Jason)得到索引0。但是,仅仅进行索引有什么用呢?换句话说,你可以进一步将它们向量化,然后你可以轻松地将 DataFrame 输入任何机器学习模型。由于我们已经在上一节中学习了如何创建 DataFrame,接下来我们将展示如何将它们编码成向量:
val indexer = new StringIndexer()
.setInputCol("name")
.setOutputCol("categoryIndex")
.fit(df)
val indexed = indexer.transform(df)
val encoder = new OneHotEncoder()
.setInputCol("categoryIndex")
.setOutputCol("categoryVec")
现在,让我们使用 Transformer 将其转换为向量,并查看内容,如下所示:
val encoded = encoder.transform(indexed)
encoded.show()
结果 DataFrame 中包含的快照如下:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00228.gif图 15: 使用 OneHotEncoder 创建类别索引和向量
现在,你可以看到,在结果 DataFrame 中已添加一个包含特征向量的新列。
Spark ML 管道
MLlib 的目标是使实用的机器学习(ML)具有可扩展性并且易于使用。Spark 引入了管道 API,用于轻松创建和调优实用的机器学习管道。正如前面所讨论的,通过特征工程在机器学习管道创建过程中提取有意义的知识,涉及数据收集、预处理、特征提取、特征选择、模型拟合、验证和模型评估等一系列阶段。例如,文本分类可能涉及文本分割和清洗、提取特征以及通过交叉验证调优的分类模型训练。大多数机器学习库并不设计用于分布式计算,或者它们不提供原生支持管道创建和调优。
数据集抽象
当从其他编程语言(例如 Java)运行 SQL 查询时,结果以 DataFrame 的形式返回。DataFrame 是一种分布式数据集合,组织成命名的列。另一方面,数据集(dataset)是一个接口,旨在提供 Spark SQL 中 RDD 的优势。数据集可以从一些 JVM 对象构建,比如基本类型(例如 String、Integer 和 Long)、Scala 案例类和 Java Beans。机器学习管道包含多个数据集转换和模型的序列。每个转换都接受一个输入数据集,并输出转换后的数据集,后者成为下一个阶段的输入。因此,数据导入和导出是机器学习管道的起点和终点。为了简化这些过程,Spark MLlib 和 Spark ML 提供了多种应用特定类型的数据集、DataFrame、RDD 和模型的导入导出工具,包括:
-
用于分类和回归的 LabeledPoint
-
用于交叉验证和潜在狄利克雷分配(LDA)的 LabeledDocument
-
协同过滤的评分与排名
然而,实际数据集通常包含多种类型,如用户 ID、物品 ID、标签、时间戳和原始记录。不幸的是,当前 Spark 实现的工具不能轻松处理包含这些类型的数据集,特别是时间序列数据集。特征转换通常构成实际机器学习管道的主要部分。特征转换可以视为从现有列创建新列,并将其附加或删除。
在下图中,你将看到文本标记器将文档拆分成一个词袋。之后,TF-IDF 算法将词袋转换成特征向量。在转换过程中,标签需要保留以供模型拟合阶段使用:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00137.jpeg图 16:机器学习模型的文本处理(DS 表示数据源)
在这里,ID、文本和单词在转换步骤中被包含。这些在做出预测和模型检查时很有用。然而,它们在模型拟合时实际上是多余的。如果预测数据集仅包含预测标签,它们也不会提供太多信息。因此,如果你想检查预测指标,如准确率、精确度、召回率、加权真正例和加权假正例,查看预测标签以及原始输入文本和标记化单词是非常有帮助的。这个建议同样适用于使用 Spark ML 和 Spark MLlib 的其他机器学习应用。
因此,RDD、数据集和 DataFrame 之间的轻松转换已使得内存、磁盘或外部数据源(如 Hive 和 Avro)成为可能。虽然通过用户定义的函数创建新列非常简单,但数据集的实现是一个惰性操作。相比之下,数据集只支持一些标准数据类型。然而,为了增加可用性,并使其更好地适应机器学习模型,Spark 还增加了对Vector类型的支持,作为一种支持稠密和稀疏特征向量的用户定义类型,位于mllib.linalg.DenseVector和mllib.linalg.Vector下。
完整的 DataFrame、数据集和 RDD 的 Java、Scala 和 Python 示例可以在 Spark 分发包中的examples/src/main/文件夹中找到。有兴趣的读者可以参考 Spark SQL 的用户指南,spark.apache.org/docs/latest/sql-programming-guide.html,以了解更多关于 DataFrame、数据集及其支持的操作。
创建一个简单的管道
Spark 提供了 Spark ML 下的管道 API。管道由一系列阶段组成,这些阶段包括转换器和估算器。管道阶段有两种基本类型,分别是转换器和估算器:
-
转换器将数据集作为输入,生成增强后的数据集作为输出,以便将输出传递到下一步。例如,Tokenizer和HashingTF就是两个转换器。Tokenizer 将包含文本的数据集转换为包含分词的数据集。另一方面,HashingTF 则生成词频。分词和 HashingTF 的概念常用于文本挖掘和文本分析。
-
相反,估算器必须是输入数据集中的第一个,以生成模型。在这种情况下,模型本身将作为转换器,用于将输入数据集转换为增强的输出数据集。例如,逻辑回归或线性回归可以作为估算器,在用相应的标签和特征拟合训练数据集后使用。
之后,它会生成一个逻辑回归或线性回归模型,这意味着开发一个管道非常简单。实际上,你只需要声明所需的阶段,然后配置相关阶段的参数;最后,将它们链接成一个管道对象,如下图所示:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00374.jpeg图 17:使用逻辑回归估算器的 Spark ML 管道模型(DS 表示数据存储,虚线内的步骤仅在管道拟合期间发生)
如果你查看图 17,拟合的模型包含一个分词器、一个 HashingTF 特征提取器和一个拟合的逻辑回归模型。拟合的管道模型充当一个转换器,可以用于预测、模型验证、模型检查,最终是模型部署。然而,为了提高预测准确度,模型本身需要进行调优。
现在我们已经了解了 Spark MLlib 和 ML 中可用的算法,是时候在正式使用它们解决有监督和无监督学习问题之前做好准备了。在下一节中,我们将开始进行特征提取和转换。
无监督机器学习
在本节中,为了使讨论更具体,我们将只讨论使用 PCA 进行的降维和用于主题建模的 LDA 文本聚类。其他无监督学习的算法将在第十三章中讨论,我的名字是贝叶斯,朴素贝叶斯,并附带一些实际例子。
降维
降维是减少考虑变量数量的过程。它可以用来从原始和噪声特征中提取潜在特征,或者在保持结构的同时压缩数据。Spark MLlib 支持在RowMatrix类上进行降维。最常用的降维算法是 PCA 和 SVD。然而,在本节中,为了使讨论更具体,我们将仅讨论 PCA。
PCA
PCA 是一种统计程序,使用正交变换将可能相关的变量观察值转换为一组线性无关的变量,称为主成分。PCA 算法可以用来将向量投影到低维空间。然后,可以基于减少后的特征向量来训练一个机器学习模型。以下示例展示了如何将 6 维特征向量投影到四维主成分。假设你有一个如下的特征向量:
val data = Array(
Vectors.dense(3.5, 2.0, 5.0, 6.3, 5.60, 2.4),
Vectors.dense(4.40, 0.10, 3.0, 9.0, 7.0, 8.75),
Vectors.dense(3.20, 2.40, 0.0, 6.0, 7.4, 3.34) )
现在,让我们从中创建一个 DataFrame,如下所示:
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
df.show(false)
上述代码生成了一个特征 DataFrame,包含 PCA 的 6 维特征向量:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00291.gif图 18:为 PCA 创建特征 DataFrame(6 维特征向量)
现在,让我们通过设置必要的参数来实例化 PCA 模型,具体如下:
val pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(4)
.fit(df)
现在,为了做出区分,我们使用setOutputCol()方法将输出列设置为pcaFeatures。然后,我们设置 PCA 的维度。最后,我们拟合 DataFrame 以进行转换。请注意,PCA 模型包括一个explainedVariance成员。一个模型可以从这样的旧数据中加载,但其explainedVariance将为空向量。现在让我们展示结果特征:
val result = pca.transform(df).select("pcaFeatures")
result.show(false)
上述代码生成了一个特征 DataFrame,包含 4 维主成分特征向量,使用 PCA 方法:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00384.gif图 19:四维主成分(PCA 特征)
使用 PCA
PCA 广泛用于降维,它是一种统计方法,帮助找出旋转矩阵。例如,如果我们想检查第一个坐标是否具有最大的方差。同时,它还帮助检查是否有任何后续坐标会使方差最大化。
最终,PCA 模型计算出这些参数,并将其作为旋转矩阵返回。旋转矩阵的列称为主成分。Spark MLlib 支持针对存储在行导向格式中的高且细的矩阵和任何向量进行 PCA 处理。
回归分析 - PCA 的实际应用
在本节中,我们将首先探索将用于回归分析的MSD(百万歌曲数据集)。接着,我们将展示如何使用 PCA 来降低数据集的维度。最后,我们将评估线性回归模型的回归质量。
数据集收集与探索
在本节中,我们将介绍非常著名的 MNIST 数据集。这个数据集将贯穿整个章节。手写数字的 MNIST 数据库(可以从 www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass.html 下载)有 60,000 个训练样本和 10,000 个测试样本。它是 NIST 提供的一个更大数据集的子集。这些数字经过大小归一化并且居中显示在固定大小的图像中。因此,这是一个非常好的示例数据集,适用于那些尝试在实际数据上学习技术和模式识别方法的人,同时最大限度地减少预处理和格式化工作。NIST 提供的原始黑白(双级)图像被大小归一化,以适应一个 20 x 20 像素的框,并保持其纵横比。
MNIST 数据库是从 NIST 的特殊数据库 3 和特殊数据库 1 中构建的,这些数据库包含手写数字的二进制图像。以下是该数据集的一个样本:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00121.gif图 20:MNIST 数据集的快照
你可以看到总共有 780 个特征。因此,有时许多机器学习算法会因为数据集的高维特性而失败。因此,为了解决这个问题,在接下来的部分中,我们将展示如何在不牺牲机器学习任务(如分类)质量的情况下减少数据的维度。然而,在深入解决这个问题之前,我们先来了解一下回归分析的背景知识。
什么是回归分析?
线性回归属于回归算法家族。回归的目标是找到变量之间的关系和依赖关系。它建模了一个连续标量因变量 y(在机器学习术语中,也称为标签或目标变量)与一个或多个(D 维向量)解释变量(也称为自变量、输入变量、特征、观察数据、观测值、属性、维度、数据点等)之间的关系,使用的是线性函数。在回归分析中,目标是预测一个连续的目标变量,如下图所示:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00171.jpeg
图 21:回归算法的目的是产生连续的输出。输入可以是以下任何形式
离散或连续(来源:Nishant Shukla,《使用 TensorFlow 的机器学习》,Manning Publications 公司,2017 年)
现在,你可能会对分类问题和回归问题之间的基本区别感到困惑。以下信息框将为你澄清:
回归与分类: 另一方面,另一个领域叫做分类,涉及的是从有限的集合中预测一个标签,但其值是离散的。了解这个区分很重要,因为离散值输出由分类处理得更好,这将在接下来的章节中讨论。
涉及输入变量的线性组合的多重回归模型呈现以下形式:
y = ss[0] + ss[1]x[1] + ss[2]x[2] + ss[3]x[3] +… + e
图 22 显示了一个简单线性回归的示例,只有一个自变量(x 轴)。该模型(红线)是通过训练数据(蓝点)计算得到的,每个点都有已知的标签(y 轴),通过最小化所选损失函数的值来尽可能精确地拟合这些点。然后,我们可以使用该模型来预测未知的标签(我们只知道 x 值,并希望预测 y 值)。
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00383.jpeg
图 22:回归图将数据点分离开来(图中的点 [.] 代表数据点,红线代表回归)
Spark 提供了一个基于 RDD 的线性回归算法实现。你可以使用随机梯度下降法训练一个没有正则化的线性回归模型。这解决了最小二乘回归公式 f (weights) = 1/n ||A weights-y||²(即均方误差)。在这里,数据矩阵有 n 行,输入 RDD 包含 A 的行集,每行有其对应的右侧标签 y。更多信息,请参见 github.***/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala。
步骤 1. 加载数据集并创建 RDD
在 LIBSVM 格式中加载 MNIST 数据集时,我们使用了 Spark MLlib 中内置的 API,名为 MLUtils:
val data = MLUtils.loadLibSVMFile(spark.sparkContext, "data/mnist.bz2")
步骤 2. 计算特征数量以简化降维:
val featureSize = data.first().features.size
println("Feature Size: " + featureSize)
这将导致以下输出:
Feature Size: 780
所以数据集有 780 列 —— 即特征,因此可以认为这是一个高维数据集(特征)。因此,有时值得减少数据集的维度。
步骤 3. 现在按照以下方式准备训练集和测试集:
实际上,我们会训练 LinearRegressionwithSGD 模型两次。首先,我们将使用具有原始特征维度的正常数据集;其次,使用一半的特征。对于原始数据,训练和测试集的准备步骤如下:
val splits = data.randomSplit(Array(0.75, 0.25), seed = 12345L)
val (training, test) = (splits(0), splits(1))
现在,对于降维后的特征,训练步骤如下:
val pca = new PCA(featureSize/2).fit(data.map(_.features))
val training_pca = training.map(p => p.copy(features = pca.transform(p.features)))
val test_pca = test.map(p => p.copy(features = pca.transform(p.features)))
步骤 4. 训练线性回归模型
现在,分别对正常特征和降维特征进行 20 次迭代,并训练 LinearRegressionWithSGD,步骤如下:
val numIterations = 20
val stepSize = 0.0001
val model = LinearRegressionWithSGD.train(training, numIterations)
val model_pca = LinearRegressionWithSGD.train(training_pca, numIterations)
小心!有时候,LinearRegressionWithSGD() 会返回 NaN。我认为导致这种情况的原因有两个:
-
如果
stepSize很大,在这种情况下,你应该使用更小的值,比如 0.0001、0.001、0.01、0.03、0.1、0.3、1.0 等等。 -
你的训练数据中有
NaN。如果是这样,结果很可能是NaN。因此,建议在训练模型之前先去除空值。
步骤 5. 评估两个模型
在我们评估分类模型之前,首先,让我们准备计算正常情况下的均方误差(MSE),以观察降维对原始预测的影响。显然,如果你想要一种正式的方式来量化模型的准确性,并可能提高精度、避免过拟合,你可以通过残差分析来实现。另外,分析用于模型构建和评估的训练集和测试集的选择也是值得的。最后,选择技术有助于描述模型的各种属性:
val valuesAndPreds = test.map { point =>
val score = model.predict(point.features)
(score, point.label)
}
现在按照以下方式计算 PCA 预测集:
val valuesAndPreds_pca = test_pca.map { point =>
val score = model_pca.predict(point.features)
(score, point.label)
}
现在计算每种情况的 MSE 并打印出来:
val MSE = valuesAndPreds.map { case (v, p) => math.pow(v - p 2) }.mean()
val MSE_pca = valuesAndPreds_pca.map { case (v, p) => math.pow(v - p, 2) }.mean()
println("Mean Squared Error = " + MSE)
println("PCA Mean Squared Error = " + MSE_pca)
你将得到以下输出:
Mean Squared Error = 2.9164359135973043E78
PCA Mean Squared Error = 2.9156682256149184E78
请注意,MSE 实际上是使用以下公式计算的:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00238.gif
步骤 6. 观察两个模型的系数
按照以下方式计算模型系数:
println("Model coefficients:"+ model.toString())
println("Model with PCA coefficients:"+ model_pca.toString())
现在你应该在终端/控制台上观察到以下输出:
Model coefficients: intercept = 0.0, numFeatures = 780
Model with PCA coefficients: intercept = 0.0, numFeatures = 390
二元和多类分类
二元分类器用于将给定数据集的元素分为两个可能的组(例如,欺诈或非欺诈),它是多类分类的特例。大多数二元分类指标可以推广到多类分类指标。多类分类描述了一种分类问题,其中每个数据点有 M>2 个可能标签(当 M=2 时就是二元分类问题)。
对于多分类指标,正例和负例的概念稍有不同。预测和标签仍然可以是正例或负例,但必须考虑到特定类别的上下文。每个标签和预测都属于多个类别中的一个,因此它们被认为对于其特定类别是正例,对于其他所有类别是负例。因此,当预测与标签匹配时,就发生了真正的正例,而当预测和标签都不属于某个给定类别时,就发生了真正的负例。根据这一惯例,给定数据样本可能会有多个真正的负例。假设正例和负例的定义从先前的正负标签扩展是很直接的。
性能指标
虽然有许多不同类型的分类算法,但评估指标在原则上大致相似。在监督分类问题中,对于每个数据点,都存在一个真实的输出和一个模型生成的预测输出。因此,每个数据点的结果可以分为以下四类:
-
真正例(TP):标签为正例,预测也是正例。
-
真负例(TN):标签为负例,预测也是负例。
-
假正例(FP):标签为负例,但预测为正例。
-
假负例(FN):标签为正例,但预测为负例。
现在,为了更清楚地了解这些参数,请参阅下图:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00181.jpeg图 23:预测分类器(即混淆矩阵)
TP、FP、TN、FN 是大多数分类器评估指标的基础。考虑分类器评估时,一个基本的观点是,纯粹的准确度(即预测是否正确)通常不是一个好的评估指标。原因在于,数据集可能高度不平衡。例如,如果一个模型用于预测欺诈数据,而数据集中 95%的数据点不是欺诈,只有 5%是欺诈。那么假设一个简单的分类器总是预测不是欺诈(不管输入是什么),其准确率将达到 95%。因此,通常使用像精确度和召回率这样的指标,因为它们考虑了错误的类型。在大多数应用中,精确度和召回率之间存在某种理想的平衡,这种平衡可以通过将二者结合成一个单一的指标,称为F-measure来捕捉。
精确度表示被分类为正类的样本中有多少是相关的。另一方面,召回率表示测试在检测正类样本方面的效果如何?在二分类中,召回率也叫敏感度。需要注意的是,精确度可能不会随着召回率的提高而降低。召回率与精确度之间的关系可以在图表的阶梯区域中观察到:
-
接收操作特征(ROC)
-
ROC 曲线下面积
-
精确度-召回率曲线下面积
这些曲线通常用于二元分类,以研究分类器的输出。然而,有时将精确率和召回率结合起来选择两个模型会更好。与多数评估指标一起使用精确率和召回率使得比较算法更为困难。假设你有两个如下表现的算法:
| 分类器 | 精确率 | 召回率 |
|---|---|---|
| X | 96% | 89% |
| Y | 99% | 84% |
在这里,没有明显优越的分类器,因此它不会立即指导你选择最佳的那一个。但使用 F1 分数,这是一个结合了精确率和召回率的度量(即精确率和召回率的调和平均数),可以平衡 F1 分数。让我们计算一下,并将其放入表格中:
| 分类器 | 精确率 | 召回率 | F1 分数 |
|---|---|---|---|
| X | 96% | 89% | 92.36% |
| Y | 99% | 84% | 90.885% |
因此,拥有 F1 分数有助于在大量分类器中进行选择。它为所有分类器提供了清晰的偏好排名,从而为进展提供了明确的方向,即分类器X。
对于二元分类,可以计算如下性能指标:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00057.jpeg图 24:计算二元分类器性能指标的数学公式(来源:spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html)
然而,在多类分类问题中,关联超过两个预测标签时,计算前述指标更为复杂,但可以使用以下数学方程计算:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00169.jpeg图 25:计算多类分类器性能指标的数学公式
其中δ^(x)称为修改的 delta 函数,并可定义如下(来源:spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html):
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00026.jpeg
使用逻辑回归进行二元分类
逻辑回归广泛用于预测二元响应。这是一种线性方法,可以数学表示如下:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00160.jpeg
在上述方程中,L(w; x, y) 是称为逻辑损失的损失函数。
对于二元分类问题,算法将输出一个二元逻辑回归模型。给定一个新数据点,表示为x,模型通过应用逻辑函数进行预测:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00233.jpeg
其中z = wTx*,默认情况下,如果*f(wTx)>0.5,则结果为正,否则为负,尽管与线性 SVM 不同,逻辑回归模型的原始输出f(z) 具有概率解释(即x 为正的概率)。
线性支持向量机(Linear SVM) 是一种全新的、极其快速的机器学习(数据挖掘)算法,专门用于解决来自超大数据集的多类别分类问题,它实现了一个原始的专有裁切平面算法,用于设计线性支持向量机(来源:www.linearsvm.***/)。
使用 Spark ML 的逻辑回归进行乳腺癌预测
在本节中,我们将学习如何使用 Spark ML 开发乳腺癌诊断流水线。我们将使用一个真实的数据集来预测乳腺癌的概率。更具体地说,将使用威斯康星乳腺癌数据集。
数据集收集
在这里,我们使用了更简单的、结构化并手动整理的数据集,适用于机器学习应用开发,当然,其中许多数据集显示出良好的分类精度。来自 UCI 机器学习库的威斯康星乳腺癌数据集([archive.ics.uci.edu/ml/datasets/Breast+Cancer+Wisconsin+(Original)](https://archive.ics.uci.edu/ml/datasets/Breast+Cancer+Wisconsin+(Original))包含了由威斯康星大学的研究人员捐赠的数据,数据包括来自乳腺肿块的细针穿刺数字化图像的测量值。数值代表了数字图像中描述的细胞核特征,详情见下文:
0\. Sample code number id number
1\. Clump Thickness 1 - 10
2\. Uniformity of Cell Size 1 - 10
3\. Uniformity of Cell Shape 1 - 10
4\. Marginal Adhesion 1 - 10
5\. Single Epithelial Cell Size 1 - 10
6\. Bare Nuclei 1 - 10
7\. Bland Chromatin 1 - 10
8\. Normal Nucleoli 1 - 10
9\. Mitoses 1 - 10
10\. Class: (2 for benign, 4 for malignant)
要了解更多关于威斯康星乳腺癌数据集的信息,请参考作者的出版物:乳腺肿瘤诊断的核特征提取,IS&T/SPIE 1993 国际电子成像科学与技术研讨会,卷 1905,第 861-870 页,由 W.N. Street、W.H. Wolberg 和 O.L. Mangasarian 编写,1993 年。
使用 Spark ML 开发流水线
现在我们将展示如何通过逐步示例预测乳腺癌的可能性:
步骤 1:加载并解析数据
val rdd = spark.sparkContext.textFile("data/wbcd.csv")
val cancerRDD = parseRDD(rdd).map(parseCancer)
parseRDD() 方法如下所示:
def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {
rdd.map(_.split(",")).filter(_(6) != "?").map(_.drop(1)).map(_.map(_.toDouble))
}
parseCancer() 方法如下所示:
def parseCancer(line: Array[Double]): Cancer = {
Cancer(if (line(9) == 4.0) 1 else 0, line(0), line(1), line(2), line(3), line(4), line(5), line(6), line(7), line(8))
}
请注意,这里我们简化了数据集。对于值为 4.0 的数据,我们将其转换为 1.0,其余为 0.0。Cancer 类是一个可以如下定义的案例类:
case class Cancer(cancer_class: Double, thickness: Double, size: Double, shape: Double, madh: Double, epsize: Double, bnuc: Double, bchrom: Double, nNuc: Double, mit: Double)
步骤 2:将 RDD 转换为 DataFrame 以便用于 ML 流水线
import spark.sqlContext.implicits._
val cancerDF = cancerRDD.toDF().cache()
cancerDF.show()
DataFrame 看起来如下所示:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00334.gif图 26: 乳腺癌数据集快照
步骤 3:特征提取与转换
首先,让我们选择特征列,如下所示:
val featureCols = Array("thickness", "size", "shape", "madh", "epsize", "bnuc", "bchrom", "nNuc", "mit")
现在让我们将它们组装成一个特征向量,如下所示:
val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
现在将它们转换为 DataFrame,如下所示:
val df2 = assembler.transform(cancerDF)
让我们查看转换后的 DataFrame 结构:
df2.show()
现在,你应该能看到一个包含基于左侧列计算的特征的 DataFrame:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00361.gif图 27: 新的包含特征的 DataFrame
最后,让我们使用 StringIndexer 来创建训练数据集的标签,如下所示:
val labelIndexer = new StringIndexer().setInputCol("cancer_class").setOutputCol("label")
val df3 = labelIndexer.fit(df2).transform(df2)
df3.show()
现在你应该能看到一个 DataFrame,包含基于左侧列计算的特征和标签:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00204.gif图 28: 包含特征和标签的新 DataFrame 用于训练 ML 模型
步骤 4:创建测试集和训练集
val splitSeed = 1234567
val Array(trainingData, testData) = df3.randomSplit(Array(0.7, 0.3), splitSeed)
步骤 5:使用训练集创建估算器
让我们使用 elasti***etParam 创建一个用于管道的逻辑回归估算器。我们还指定了最大迭代次数和回归参数,如下所示:
val lr = new LogisticRegression().setMaxIter(50).setRegParam(0.01).setElasti***etParam(0.01)
val model = lr.fit(trainingData)
步骤 6:获取测试集的原始预测、概率和预测
使用测试集转化模型,获取原始预测、概率和测试集预测:
val predictions = model.transform(testData)
predictions.show()
生成的 DataFrame 如下所示:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00159.gif图 29: 包含原始预测和每行实际预测的新 DataFrame
步骤 7:生成训练过程中的目标历史记录
让我们生成模型在每次迭代中的目标历史记录,如下所示:
val trainingSummary = model.summary
val objectiveHistory = trainingSummary.objectiveHistory
objectiveHistory.foreach(loss => println(loss))
上述代码段产生以下关于训练损失的输出:
0.6562291876496595
0.6087867761081431
0.538972588904556
0.4928455913405332
0.46269258074999386
0.3527914819973198
0.20206901337404978
0.16459454874996993
0.13783437051276512
0.11478053164710095
0.11420433621438157
0.11138884788059378
0.11041889032338036
0.10849477236373875
0.10818880537879513
0.10682868640074723
0.10641395229253267
0.10555411704574749
0.10505186414044905
0.10470425580130915
0.10376219754747162
0.10331139609033112
0.10276173290225406
0.10245982201904923
0.10198833366394071
0.10168248313103552
0.10163242551955443
0.10162826209311404
0.10162119367292953
0.10161235376791203
0.1016114803209495
0.10161090505556039
0.1016107261254795
0.10161056082112738
0.10161050381332608
0.10161048515341387
0.10161043900301985
0.10161042057436288
0.10161040971267737
0.10161040846923354
0.10161040625542347
0.10161040595207525
0.10161040575664354
0.10161040565870835
0.10161040519559975
0.10161040489834573
0.10161040445215266
0.1016104043469577
0.1016104042793553
0.1016104042606048
0.10161040423579716
如你所见,损失在后续迭代中逐渐减小。
步骤 8:评估模型
首先,我们必须确保我们使用的分类器来自二元逻辑回归总结:
val binarySummary = trainingSummary.asInstanceOf[BinaryLogisticRegressionSummary]
现在让我们获得 ROC 曲线作为 DataFrame 和 areaUnderROC。接近 1.0 的值更好:
val roc = binarySummary.roc
roc.show()
println("Area Under ROC: " + binarySummary.areaUnderROC)
上述代码打印出 areaUnderROC 的值,如下所示:
Area Under ROC: 0.9959095884623509
这太棒了!现在让我们计算其他度量标准,如真正率、假正率、假负率、总计数,以及正确和错误预测的实例数量,如下所示:
import org.apache.spark.sql.functions._
// Calculate the performance metrics
val lp = predictions.select("label", "prediction")
val counttotal = predictions.count()
val correct = lp.filter($"label" === $"prediction").count()
val wrong = lp.filter(not($"label" === $"prediction")).count()
val truep = lp.filter($"prediction" === 0.0).filter($"label" === $"prediction").count()
val falseN = lp.filter($"prediction" === 0.0).filter(not($"label" === $"prediction")).count()
val falseP = lp.filter($"prediction" === 1.0).filter(not($"label" === $"prediction")).count()
val ratioWrong = wrong.toDouble / counttotal.toDouble
val ratioCorrect = correct.toDouble / counttotal.toDouble
println("Total Count: " + counttotal)
println("Correctly Predicted: " + correct)
println("Wrongly Identified: " + wrong)
println("True Positive: " + truep)
println("False Negative: " + falseN)
println("False Positive: " + falseP)
println("ratioWrong: " + ratioWrong)
println("ratioCorrect: " + ratioCorrect)
现在你应该能看到上面代码的输出,如下所示:
总计数:209 正确预测:202 错误识别:7 真正例:140 假负例:4 假正例:3 错误比率:0.03349282296650718 正确比率:0.9665071770334929
最后,让我们判断模型的准确性。不过,在此之前,我们需要设置模型阈值以最大化 fMeasure:
val fMeasure = binarySummary.fMeasureByThreshold
val fm = fMeasure.col("F-Measure")
val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)
val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure).select("threshold").head().getDouble(0)
model.setThreshold(bestThreshold)
现在让我们计算准确率,如下所示:
val evaluator = new BinaryClassificationEvaluator().setLabelCol("label")
val a***uracy = evaluator.evaluate(predictions)
println("A***uracy: " + a***uracy)
上述代码产生了以下输出,几乎为 99.64%:
A***uracy: 0.9963975418520874
使用逻辑回归进行多分类
二元逻辑回归可以推广到多项式逻辑回归,用于训练和预测多分类问题。例如,对于 K 种可能的结果,可以选择其中一个结果作为枢轴,其他 K−1 个结果可以分别与枢轴结果进行回归。在 spark.mllib 中,第一个类 0 被选择为 枢轴 类。
对于多分类问题,算法将输出一个多项式逻辑回归模型,该模型包含 k−1 个二元 逻辑回归模型,与第一类进行回归。给定一个新的数据点,k−1 个模型 将运行,具有最大概率的类将被选择为预测类。在本节中,我们将展示一个使用 L-BFGS 逻辑回归的分类示例,以便更快的收敛。
步骤 1. 加载并解析 MNIST 数据集(LIVSVM 格式)
// Load training data in LIBSVM format.
val data = MLUtils.loadLibSVMFile(spark.sparkContext, "data/mnist.bz2")
步骤 2. 准备训练集和测试集
将数据拆分为训练集(75%)和测试集(25%),如下所示:
val splits = data.randomSplit(Array(0.75, 0.25), seed = 12345L)
val training = splits(0).cache()
val test = splits(1)
步骤 3. 运行训练算法以构建模型
运行训练算法以通过设置类别数量(对于该数据集为 10 个)来构建模型。为了更好的分类准确度,您还可以指定截距,并使用布尔值 true 来验证数据集,如下所示:
val model = new LogisticRegressionWithLBFGS()
.setNumClasses(10)
.setIntercept(true)
.setValidateData(true)
.run(training)
如果算法需要使用 setIntercept() 添加截距,请将截距设置为 true。如果您希望算法在模型构建之前验证训练集,应该使用 setValidateData() 方法将该值设置为 true。
步骤 4. 清除默认阈值
清除默认阈值,以便训练时不使用默认设置,如下所示:
model.clearThreshold()
步骤 5. 在测试集上计算原始分数
在测试集上计算原始分数,以便使用上述性能指标评估模型,如下所示:
val scoreAndLabels = test.map { point =>
val score = model.predict(point.features)
(score, point.label)
}
步骤 6. 为评估实例化多类指标
val metrics = new MulticlassMetrics(scoreAndLabels)
步骤 7. 构建混淆矩阵
println("Confusion matrix:")
println(metrics.confusionMatrix)
在混淆矩阵中,矩阵的每一列代表预测类别中的实例,而每一行代表实际类别中的实例(或反之)。该名称源于其能够轻松地显示系统是否混淆了两个类别。更多信息,请参阅矩阵(en.wikipedia.org/wiki/Confusion_matrix.Confusion):
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00065.gif图 30: 由逻辑回归分类器生成的混淆矩阵
步骤 8. 整体统计数据
现在让我们计算整体统计数据,以评估模型的性能:
val a***uracy = metrics.a***uracy
println("Summary Statistics")
println(s"A***uracy = $a***uracy")
// Precision by label
val labels = metrics.labels
labels.foreach { l =>
println(s"Precision($l) = " + metrics.precision(l))
}
// Recall by label
labels.foreach { l =>
println(s"Recall($l) = " + metrics.recall(l))
}
// False positive rate by label
labels.foreach { l =>
println(s"FPR($l) = " + metrics.falsePositiveRate(l))
}
// F-measure by label
labels.foreach { l =>
println(s"F1-Score($l) = " + metrics.fMeasure(l))
}
上述代码段产生以下输出,包含一些性能指标,如准确率、精度、召回率、真正率、假阳性率和 F1 分数:
Summary Statistics
----------------------
A***uracy = 0.9203609775377116
Precision(0.0) = 0.9606815203145478
Precision(1.0) = 0.9595732734418866
.
.
Precision(8.0) = 0.8942172073342737
Precision(9.0) = 0.9027210884353741
Recall(0.0) = 0.9638395792241946
Recall(1.0) = 0.9732346241457859
.
.
Recall(8.0) = 0.8720770288858322
Recall(9.0) = 0.8936026936026936
FPR(0.0) = 0.004392386530014641
FPR(1.0) = 0.005363128491620112
.
.
FPR(8.0) = 0.010927369417935456
FPR(9.0) = 0.010441004672897197
F1-Score(0.0) = 0.9622579586478502
F1-Score(1.0) = 0.966355668645745
.
.
F1-Score(9.0) = 0.8981387478849409
现在让我们计算整体的统计数据,即汇总统计数据:
println(s"Weighted precision: ${metrics.weightedPrecision}")
println(s"Weighted recall: ${metrics.weightedRecall}")
println(s"Weighted F1 score: ${metrics.weightedFMeasure}")
println(s"Weighted false positive rate: ${metrics.weightedFalsePositiveRate}")
上述代码段输出以下结果,包含加权精度、召回率、F1 分数和假阳性率:
Weighted precision: 0.920104303076327
Weighted recall: 0.9203609775377117
Weighted F1 score: 0.9201934861645358
Weighted false positive rate: 0.008752250453215607
整体统计数据表明模型的准确率超过 92%。然而,我们仍然可以通过使用更好的算法(如 随机森林 (RF))来提高准确度。在下一部分,我们将查看随机森林实现,以对同一模型进行分类。
使用随机森林提高分类准确度
随机森林(有时也称为随机决策森林)是决策树的集成。随机森林是最成功的机器学习模型之一,广泛应用于分类和回归。它们结合了多棵决策树,以减少过拟合的风险。与决策树一样,随机森林能够处理分类特征,扩展到多类别分类设置,不需要特征缩放,并且能够捕捉非线性关系和特征之间的交互。随机森林有许多优势。通过结合多棵决策树,它们能够克服训练数据集中的过拟合问题。
RF 或 RDF 中的森林通常由数十万棵树组成。这些树实际上是基于同一个训练集的不同部分进行训练的。更技术性地讲,生长得非常深的单棵树往往倾向于学习高度不可预测的模式。树木的这种特性会在训练集上造成过拟合问题。此外,低偏差使得分类器即使在数据集的特征质量良好的情况下,性能仍然较低。另一方面,RF 通过将多个决策树平均化,目的是通过计算样本对之间的相似度来减少方差,确保一致性。
然而,这会增加一点偏差或在结果可解释性上的一些损失。但最终,最终模型的性能会显著提高。在将 RF 作为分类器使用时,参数设置如下:
-
如果树的数量为 1,则完全不使用自助采样;然而,如果树的数量 > 1,则会使用自助采样。支持的值包括
auto、all、sqrt、log2和o***hird。 -
支持的数值范围是 (0.0-1.0] 和 [1-n]。然而,如果选择
featureSubsetStrategy为auto,算法会自动选择最佳的特征子集策略。 -
如果
numTrees == 1,则featureSubsetStrategy被设置为all。但是,如果numTrees > 1(即森林),则featureSubsetStrategy被设置为分类时的sqrt。 -
此外,如果一个实数 n 设置在范围 (0, 1.0] 内,则会使用
n*number_of_features。然而,如果整数值 n 在range (1, 特征数量)内,则仅使用n个特征,交替进行选择。 -
categoricalFeaturesInfo参数是一个映射,用于存储任意的分类特征。条目 (n -> k) 表示特征 n 是分类的,且有 k 个类别,索引范围为 0: {0, 1,…,k-1}。 -
纯度标准仅用于信息增益的计算。分类和回归分别支持 gini 和 variance 作为纯度标准。
-
maxDepth是树的最大深度(例如,深度 0 表示 1 个叶节点,深度 1 表示 1 个内部节点 + 2 个叶节点,以此类推)。 -
maxBins表示用于拆分特征的最大箱数,建议值为 100,以获得更好的结果。 -
最后,随机种子用于自助法(bootstrapping)和选择特征子集,以避免结果的随机性。
如前所述,由于随机森林(RF)对大规模数据集来说足够快速且可扩展,Spark 是实现随机森林以应对大规模扩展的合适技术。然而,如果计算了相似度,存储需求也会呈指数级增长。
使用随机森林分类 MNIST 数据集
在本节中,我们将展示使用随机森林进行分类的示例。我们将逐步分解代码,帮助你轻松理解解决方案。
步骤 1: 以 LIVSVM 格式加载并解析 MNIST 数据集
// Load training data in LIBSVM format.
val data = MLUtils.loadLibSVMFile(spark.sparkContext, "data/mnist.bz2")
步骤 2: 准备训练集和测试集
将数据分割为训练集(75%)和测试集(25%),并设置种子以确保可重复性,如下所示:
val splits = data.randomSplit(Array(0.75, 0.25), seed = 12345L)
val training = splits(0).cache()
val test = splits(1)
步骤 3: 运行训练算法以构建模型
使用空的 categoricalFeaturesInfo 训练一个随机森林模型。由于数据集中的所有特征都是连续的,因此这是必需的:
val numClasses = 10 //number of classes in the MNIST dataset
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 50 // Use more in practice.More is better
val featureSubsetStrategy = "auto" // Let the algorithm choose.
val impurity = "gini" // see above notes on RandomForest for explanation
val maxDepth = 30 // More is better in practice
val maxBins = 32 // More is better in practice
val model = RandomForest.trainClassifier(training, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
请注意,训练一个随机森林模型非常消耗资源。因此,它将占用更多内存,因此请注意内存溢出(OOM)。我建议在运行此代码之前增加 Java 堆内存。
步骤 4: 在测试集上计算原始分数
在测试集上计算原始分数,以便我们可以使用上述性能指标来评估模型,如下所示:
val scoreAndLabels = test.map { point =>
val score = model.predict(point.features)
(score, point.label)
}
步骤 5: 实例化一个多类度量标准用于评估
val metrics = new MulticlassMetrics(scoreAndLabels)
步骤 6: 构建混淆矩阵
println("Confusion matrix:")
println(metrics.confusionMatrix)
前面的代码打印出以下混淆矩阵用于我们的分类:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00122.gif图 31: 由随机森林分类器生成的混淆矩阵
步骤 7: 总体统计
现在让我们计算总体统计数据来判断模型的表现:
val a***uracy = metrics.a***uracy
println("Summary Statistics")
println(s"A***uracy = $a***uracy")
// Precision by label
val labels = metrics.labels
labels.foreach { l =>
println(s"Precision($l) = " + metrics.precision(l))
}
// Recall by label
labels.foreach { l =>
println(s"Recall($l) = " + metrics.recall(l))
}
// False positive rate by label
labels.foreach { l =>
println(s"FPR($l) = " + metrics.falsePositiveRate(l))
}
// F-measure by label
labels.foreach { l =>
println(s"F1-Score($l) = " + metrics.fMeasure(l))
}
前面的代码段产生以下输出,包含一些性能指标,如准确率、精度、召回率、真正率、假正率和 F1 分数:
Summary Statistics:
------------------------------
Precision(0.0) = 0.9861932938856016
Precision(1.0) = 0.9891799544419134
.
.
Precision(8.0) = 0.9546079779917469
Precision(9.0) = 0.9474747474747475
Recall(0.0) = 0.9778357235984355
Recall(1.0) = 0.9897435897435898
.
.
Recall(8.0) = 0.9442176870748299
Recall(9.0) = 0.9449294828744124
FPR(0.0) = 0.0015387997362057595
FPR(1.0) = 0.0014151646059883808
.
.
FPR(8.0) = 0.0048136532710962
FPR(9.0) = 0.0056967572304995615
F1-Score(0.0) = 0.9819967266775778
F1-Score(1.0) = 0.9894616918256907
.
.
F1-Score(8.0) = 0.9493844049247605
F1-Score(9.0) = 0.9462004034969739
现在让我们计算总体统计数据,如下所示:
println(s"Weighted precision: ${metrics.weightedPrecision}")
println(s"Weighted recall: ${metrics.weightedRecall}")
println(s"Weighted F1 score: ${metrics.weightedFMeasure}")
println(s"Weighted false positive rate: ${metrics.weightedFalsePositiveRate}")
val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / test.count()
println("A***uracy = " + (1-testErr) * 100 + " %")
前面的代码段打印出以下输出,包含加权精度、召回率、F1 分数和假正率:
Overall statistics
----------------------------
Weighted precision: 0.966513107682512
Weighted recall: 0.9664712469534286
Weighted F1 score: 0.9664794711607312
Weighted false positive rate: 0.003675328222679072
A***uracy = 96.64712469534287 %
总体统计显示,模型的准确率超过 96%,优于逻辑回归。然而,我们仍然可以通过更好的模型调优来进一步提升它。
总结
在本章中,我们简要介绍了这个主题,并掌握了简单、强大且常见的机器学习技术。最后,你了解了如何使用 Spark 构建自己的预测模型。你学会了如何构建分类模型,如何使用模型进行预测,以及如何使用常见的机器学习技术,如降维和独热编码。
在后面的章节中,你了解了如何将回归技术应用于高维数据集。接着,你看到了如何应用二分类和多分类算法进行预测分析。最后,你学习了如何使用随机森林算法实现卓越的分类准确率。然而,我们还有其他机器学习主题需要讲解,例如推荐系统以及在最终部署模型之前进行模型调优,以获得更稳定的性能。
在下一章,我们将介绍一些 Spark 的高级主题。我们将提供机器学习模型调优的示例,以获得更好的性能,还将分别介绍电影推荐和文本聚类的两个示例。
第十二章:高级机器学习最佳实践
“超参数优化或模型选择是选择一组超参数的问题[当其定义为?]用于学习算法,通常目标是优化该算法在独立数据集上的表现度量。”
- 机器学习模型调优引用
在本章中,我们将介绍一些关于使用 Spark 的机器学习(ML)高级主题的理论和实践方面的内容。我们将看到如何通过网格搜索、交叉验证和超参数调优来调整机器学习模型,以实现更好和优化的性能。在后续部分,我们将展示如何使用 ALS 开发一个可扩展的推荐系统,这是一种基于模型的推荐算法的例子。最后,我们将展示一个作为文本聚类技术的主题建模应用。
简而言之,我们将在本章中涵盖以下主题:
-
机器学习最佳实践
-
机器学习模型的超参数调优
-
使用潜在狄利克雷分配(LDA)进行主题建模
-
使用协同过滤的推荐系统
机器学习最佳实践
有时候,建议考虑误差率,而不仅仅是准确率。例如,假设一个机器学习系统的准确率为 99%,但误差率为 50%,这比一个准确率为 90%但误差率为 25%的系统更差。到目前为止,我们已经讨论了以下机器学习主题:
-
回归:这是用于预测线性可分的值
-
异常检测:这是用于寻找不寻常的数据点,通常通过使用聚类算法来实现
-
聚类:这是用于发现数据集中隐藏的结构,以便对同质数据点进行聚类
-
二分类:这是用于预测两类类别的任务
-
多类分类:这是用于预测三个或更多类别的任务
我们也看到了一些适合这些任务的优秀算法。然而,选择正确的算法来解决你的问题类型,对于实现更高和卓越的准确性来说是一个具有挑战性的任务。为此,我们需要在各个阶段采取一些好的实践,也就是说,从数据收集、特征工程、模型构建、评估、调优到部署。考虑到这些,在本节中,我们将提供一些实践性的建议,帮助你在使用 Spark 开发机器学习应用时更为高效。
小心过拟合和欠拟合
一条穿过弯曲散点图的直线可以作为欠拟合的一个典型例子,如图所示。然而,如果这条直线过于贴合数据,就会出现一个相反的问题,称为过拟合。当我们说一个模型对数据集发生过拟合时,我们的意思是它可能在训练数据上的误差率很低,但却无法很好地对数据中的总体分布进行泛化。
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00148.jpeg图 1:过拟合与欠拟合的权衡(来源:《深度学习》一书,作者:Adam Gibson, Josh Patterson)
更技术性地说,如果你在训练数据上评估你的模型,而不是在测试数据或验证数据上进行评估,你可能无法明确指出模型是否发生了过拟合。常见的症状如下:
-
用于训练的数据的预测准确度可能过于准确(也就是说,有时甚至达到 100%)。
-
相较于随机预测,模型可能在新数据上表现出更好的性能。
-
我们喜欢将数据集拟合到某个分布,因为如果数据集与该分布较为接近,我们可以基于该理论分布作出假设,以指导我们如何处理数据。因此,数据中的正态分布使我们可以假设,在指定条件下,统计量的抽样分布是正态分布。正态分布由其均值和标准差定义,通常在所有变体中形状相同。
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00012.jpeg图 2:数据中的正态分布有助于克服过拟合和欠拟合(来源:《深度学习》一书,作者:Adam Gibson, Josh Patterson)
有时,机器学习模型本身会在某些特定的调参或数据点上发生欠拟合,这意味着模型变得过于简化。我们的建议(如同其他人的观点一样)如下:
-
将数据集拆分为两部分以检测过拟合情况——第一部分用于训练和模型选择,称为训练集;第二部分是用于评估模型的测试集,取代机器学习工作流部分中的评估步骤。
-
另外,你也可以通过使用更简单的模型(例如,优先选择线性分类器而非高斯核 SVM)或通过增加机器学习模型的正则化参数(如果可用)来避免过拟合。
-
调整模型的参数值,以避免过拟合和欠拟合。
-
因此,解决欠拟合是首要任务,但大多数机器学习从业者建议投入更多的时间和精力,避免将模型过拟合到数据上。另一方面,许多机器学习从业者推荐将大规模数据集分成三个部分:训练集(50%)、验证集(25%)和测试集(25%)。他们还建议使用训练集构建模型,并使用验证集计算预测误差。测试集则建议用于评估最终模型的泛化误差。如果在监督学习过程中可用的标记数据较少,则不推荐拆分数据集。在这种情况下,使用交叉验证。更具体地说,将数据集分成 10 个(大致)相等的部分;然后,对于这 10 个部分中的每一个,迭代训练分类器,并使用第十部分来测试模型。
请关注 Spark MLlib 和 Spark ML
管道设计的第一步是创建构建模块(作为由节点和边组成的有向或无向图),并在这些模块之间建立链接。然而,作为数据科学家,你也应该专注于对节点(基本元素)进行扩展和优化,以便能够在后续阶段扩展你的应用程序,以处理大规模数据集,使得你的 ML 管道能够稳定地执行。管道过程还将帮助你使模型适应新数据集。不过,这些基本元素中的一些可能会显式地定义为特定领域和数据类型(例如文本、图像、视频、音频以及时空数据)。
除了这些类型的数据外,基本元素还应该适用于通用领域的统计或数学。将你的 ML 模型以这些基本元素的形式表示,将使你的工作流程更加透明、可解释、可访问且易于解释。
最近的一个例子是 ML-matrix,这是一个分布式矩阵库,可以在 Spark 上使用。请参考 JIRA 问题:issues.apache.org/jira/browse/SPARK-3434。
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00109.jpeg**图 3:**保持同步并互操作 ML 和 MLlib
正如我们在前一节中所述,作为开发者,你可以将 Spark MLlib 中的实现技术与在 Spark ML、Spark SQL、GraphX 和 Spark Streaming 中开发的算法无缝结合,作为基于 RDD、DataFrame 和数据集的混合或互操作的 ML 应用程序,正如图 3所示。因此,这里的建议是保持同步或与周围最新的技术保持一致,以便改进你的 ML 应用程序。
为你的应用程序选择正确的算法
“我应该使用什么机器学习算法?”是许多初学者常常问的问题,但答案总是取决于情况。更详细地说:
-
这取决于你必须测试/使用的数据的量、质量、复杂性和性质
-
这取决于外部环境和参数,例如计算系统的配置或底层基础设施
-
这取决于你想用答案做什么
-
这取决于算法的数学和统计公式是如何转化为计算机的机器指令的
-
这取决于你有多少时间
现实情况是,即使是最有经验的数据科学家或数据工程师,在尝试所有算法之前也无法直接推荐哪个机器学习算法会表现最佳。大多数同意或不同意的陈述都会以“这取决于…嗯…”开始。习惯性地,你可能会想知道是否有机器学习算法的备忘单,如果有,我该如何使用那个备忘单?一些数据科学家表示,找到最好的算法唯一的确切方法是尝试所有的算法;所以,没有捷径,伙计!让我们再说得清楚一点;假设你确实有一组数据,并且想做一些聚类。技术上讲,如果你的数据是有标签的,这可以是一个分类或回归问题。但如果你有一个无标签数据集,你将使用聚类技术。那么,你脑海中出现的疑问如下:
-
在选择适当的算法之前,我应该考虑哪些因素?或者我应该随机选择一个算法?
-
我该如何选择任何适用于我的数据的数据预处理算法或工具?
-
我应该使用什么样的特征工程技术来提取有用的特征?
-
有哪些因素可以提高我的机器学习模型的性能?
-
我该如何调整我的机器学习应用以适应新的数据类型?
-
我能否为大规模数据集扩展我的机器学习应用?等等。
在本节中,我们将尝试用我们有限的机器学习知识回答这些问题。
选择算法时的考虑因素
我们在这里提供的建议或推荐是针对刚刚学习机器学习的初学者数据科学家。这些建议对于尝试选择一个最优算法来开始使用 Spark ML API 的专家数据科学家也很有用。别担心,我们会引导你朝着正确的方向前进!我们还建议在选择算法时考虑以下算法特性:
-
准确性:无论是为了获得最佳得分,还是为了在精确度、召回率、F1 得分或 AUC 等指标上获得一个近似解(足够好),同时权衡过拟合问题。
-
训练时间:用于训练模型的时间量(包括模型构建、评估和训练时间)。
-
线性:模型复杂性的一个方面,指的是问题是如何建模的。由于大多数非线性模型通常更复杂,难以理解和调优。
-
参数数量
-
特征数量:当特征数量超过实例数时出现的问题,p>>n问题。这通常需要使用降维或更好的特征工程方法来进行专门的处理或采用专门的技术。
准确性
从你的机器学习应用程序中获得最准确的结果并不总是必不可少的。根据你的使用需求,有时近似结果就足够了。如果情况是这样,你可以通过采用更好估计的方法大幅减少处理时间。当你熟悉 Spark 机器学习 API 的工作流后,你将享受拥有更多近似方法的优势,因为这些近似方法通常能自动避免机器学习模型的过拟合问题。现在,假设你有两个二分类算法,表现如下:
| 分类器 | 精确度 | 召回率 | |
|---|---|---|---|
| X | 96% | 89% | |
| Y | 99% | 84% |
在这里,没有哪个分类器明显优于其他,因此它并不能立即指导你选择最优的一个。F1 分数,作为精确度和召回率的调和平均值,将帮助你。让我们计算它并将其放入表格:
| 分类器 | 精确度 | 召回率 | F1 分数 | |
|---|---|---|---|---|
| X | 96% | 89% | 92.36% | |
| Y | 99% | 84% | 90.885% |
因此,拥有 F1 分数有助于在大量分类器中做出选择。它为所有分类器提供了一个清晰的优先级排序,从而为你的进步提供了明确的方向——那就是分类器X。
训练时间
训练时间通常与模型训练和准确性密切相关。此外,你常常会发现某些算法在数据点数量上比其他算法表现得更为模糊。然而,当你的时间有限,但训练集非常庞大且特征较多时,你可以选择最简单的算法。在这种情况下,你可能需要在准确性上做出妥协。但至少,它将满足你的最低要求。
线性
近年来,许多机器学习算法利用了线性特性(在 Spark MLlib 和 Spark ML 中也可以使用)。例如,线性分类算法假设类可以通过绘制一个分隔的直线或使用更高维度的等价物来分离。而线性回归算法则假设数据趋势简单地遵循一条直线。对于某些机器学习问题,这一假设并不天真;然而,也可能有一些其他情况会导致准确性下降。尽管存在一些风险,线性算法仍然非常受数据工程师和数据科学家的欢迎,因为它们是应对突发问题的首选。更重要的是,这些算法通常简单且快速,能够在整个过程中训练你的模型。
选择算法时请检查你的数据
你可以在 UC Irvine 机器学习库找到许多机器学习数据集。以下数据属性也应优先考虑:
-
参数数量
-
特征数量
-
训练数据集的大小
参数数量
参数或数据属性是数据科学家在设置算法时的抓手。它们是影响算法性能的数字,比如误差容忍度、迭代次数,或者算法行为变体之间的选择。算法的训练时间和准确性有时会非常敏感,这使得找到正确的设置变得困难。通常,具有大量参数的算法需要更多的试验和错误来找到最优的组合。
尽管这是跨越参数空间的一个好方法,但随着参数数量的增加,模型构建或训练时间呈指数增长。这既是一个困境,也是时间与性能之间的权衡。其积极方面是:
-
拥有许多参数通常表示机器学习算法具有更大的灵活性
-
你的机器学习应用实现了更好的准确度
你的训练集有多大?
如果你的训练集较小,低偏差且低方差的分类器,如朴素贝叶斯,在低偏差且高方差的分类器(如k-最近邻算法(kNN))上具有优势(也可以用于回归)。
偏差、方差与 kNN 模型: 实际上,增加 k会减少方差,但增加偏差。另一方面,减少 k会增加方差并减少偏差。随着k的增加,这种变异性被减小。但如果我们过度增加k,那么我们就不再遵循真实的边界线,观察到的是较高的偏差。这就是偏差-方差权衡的本质。
我们已经看到过拟合和欠拟合的问题了。现在,你可以假设,处理偏差和方差就像处理过拟合和欠拟合一样。随着模型复杂度的增加,偏差减少,而方差增加。随着模型中参数的增多,模型的复杂度上升,方差成为我们主要关注的问题,而偏差则稳步下降。换句话说,偏差对模型复杂度的导数为负,而方差则有正斜率。请参考下图以便更好地理解:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00077.jpeg图 4: 偏差和方差对总误差的贡献
因此,后者将会过拟合。然而,低偏差且高方差的分类器则在训练集线性或指数增长时开始占据优势,因为它们具有较低的渐近误差。高偏差的分类器则不足以提供准确的模型。
特征数量
对于某些类型的实验数据集,提取的特征数可能会比数据点本身的数量大得多。通常在基因组学、生物医学或文本数据中会出现这种情况。大量的特征可能会拖慢一些学习算法的速度,使得训练时间极度增加。支持向量机(SVM)在这种情况下特别适用,因为它具有较高的准确性,关于过拟合的良好理论保证,并且使用合适的核函数。
支持向量机与核函数:任务是找到一组权重和偏置,使得可以最大化边距的函数:
y = w*¥(x) + b,
其中 w 是权重,¥ 是特征向量,b 是偏置。现在如果 y > 0,那么我们将数据分类为 1 类,反之为 0 类,而特征向量 ¥(x) 使得数据线性可分。然而,使用核函数使得计算过程更快、更简便,尤其当特征向量 ¥ 包含高维数据时。我们来看一个具体的例子。假设我们有以下 x 和 y 的值:x = (x1, x2, x3) 和 y = (y1, y2, y3),那么对于函数 f(x) = (x1x1, x1x2, x1x3, x2x1, x2x2, x2x3, x3x1, x3x2, x3x3),核函数为 K(x, y) = (<x, y>)²。按照上面的方式,如果 x = (1, 2, 3) 和 y = (4, 5, 6),那么我们得到以下值:
f(x) = (1, 2, 3, 2, 4, 6, 3, 6, 9)
f(y) = (16, 20, 24, 20, 25, 30, 24, 30, 36)
<f(x), f(y)> = 16 + 40 + 72 + 40 + 100 + 180 + 72 + 180 + 324 = 1024
这是一种简单的线性代数操作,将三维空间映射到九维空间。另一方面,核函数是用于支持向量机的相似性度量。因此,建议根据对不变性的先验知识选择合适的核函数值。核函数、核函数参数和正则化参数的选择可以通过优化基于交叉验证的模型选择来自动化。
然而,自动选择核函数及其参数是一个棘手的问题,因为很容易导致模型选择标准的过拟合。这可能导致一个比最初更差的模型。现在,如果我们使用核函数 K(x, y),它给出的结果与传统计算相同,但计算过程要简单得多——即 (4 + 10 + 18) ² = 32² = 1024。
机器学习模型的超参数调优
调整算法参数是一个过程,通过这个过程可以让算法在运行时间和内存使用上表现得最优。在贝叶斯统计中,超参数是先验分布的一个参数。在机器学习中,超参数指的是那些不能通过常规训练过程直接学习到的参数。超参数通常在实际训练过程开始之前就已经确定。这是通过为这些超参数设置不同的值,训练不同的模型,然后通过测试它们来决定哪些效果最好。以下是一些典型的超参数示例:
-
树的叶节点数、箱数或深度
-
迭代次数
-
矩阵分解中的潜在因子数
-
学习率
-
深度神经网络中的隐藏层数
-
k-means 聚类中的聚类数量,等等。
本节将讨论如何使用交叉验证技术和网格搜索进行超参数调优。
超参数调优
超参数调优是一种选择合适超参数组合的技术,基于呈现数据的性能。它是从实际机器学习算法中获得有意义且准确结果的基本要求之一。下图展示了模型调优过程、考虑因素和工作流程:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00343.jpeg图 5:模型调优过程、考虑因素和工作流程
例如,假设我们有两个超参数需要调节,来自第十一章的图 17,学习机器学习 - Spark MLlib 和 Spark ML,一个使用逻辑回归估计器的 Spark ML 管道模型(虚线仅出现在管道拟合过程中)。我们可以看到,我们为每个超参数提供了三个候选值。因此,总共有九种组合。然而,图中只显示了四个,即 Tokenizer、HashingTF、Transformer 和逻辑回归(LR)。现在,我们希望找到最终能得到最佳评估结果的模型。拟合后的模型包括 Tokenizer、HashingTF 特征提取器和拟合后的逻辑回归模型:
如果你回想一下来自第十一章的图 17,学习机器学习 - Spark MLlib 和 Spark ML,那么虚线仅出现在管道拟合过程中。如前所述,拟合后的管道模型是一个 Transformer。Transformer 可以用于预测、模型验证和模型检查。此外,我们还提到,机器学习算法的一个不幸的特点是,通常它们有许多超参数需要调节以提高性能。例如,这些超参数中的正则化程度,与由 Spark MLlib 优化的模型参数是不同的。
因此,没有专家知识的情况下,很难猜测或测量出最佳的超参数组合,因为需要使用的数据和算法类型都很复杂。由于复杂的数据集是基于机器学习问题类型的,管道的规模和超参数的数量可能会呈指数级(或线性)增长;即使对于机器学习专家来说,超参数调优也变得繁琐,更不用说调优结果可能变得不可靠了。
根据 Spark API 文档,Spark ML 的估算器和转换器都使用一个唯一且统一的 API 来指定。ParamMap是由一组(参数,值)对组成的,每个 Param 是一个具有自包含文档的命名参数,由 Spark 提供。技术上,有两种方式可以将参数传递给算法,如下所述:
-
设置参数:如果 LR 是逻辑回归(即 Estimator)的一个实例,你可以调用
setMaxIter()方法,如下所示:LR.setMaxIter(5)。这基本上是指向回归实例的模型拟合,如下所示:LR.fit()。在这个特定示例中,最多会进行五次迭代。 -
第二个选项:这个选项涉及将
ParamMaps传递给fit()或transform()(有关详细信息,请参见图 5)。在这种情况下,任何参数都会被之前通过 setter 方法在 ML 应用程序特定代码或算法中指定的ParamMaps覆盖。
网格搜索参数调优
假设在进行必要的特征工程后,你选择了你的超参数。在这种情况下,全面的网格搜索超参数和特征的空间计算开销过大。因此,你需要在 K 折交叉验证的折叠中执行一次,而不是完全的网格搜索:
-
使用交叉验证对训练集的折叠进行超参数调优,使用所有可用的特征
-
使用这些超参数选择所需的特征
-
对 K 的每个折叠重复计算
-
最终模型使用所有数据构建,使用从每个 CV 折叠中选择的 N 个最常见的特征
有趣的是,超参数也会在交叉验证循环中使用所有数据再次调优。与完全网格搜索相比,这种方法会带来较大的缺点吗?本质上,我在每个自由参数的维度上进行线性搜索(先在一个维度上找到最佳值,保持常数,然后在下一个维度上找到最佳值),而不是每一种参数设置的所有组合。沿单个参数进行搜索,而不是一起优化所有参数的最大缺点,就是你忽略了它们之间的交互作用。
举例来说,多个参数通常会影响模型的复杂度。在这种情况下,你需要查看它们之间的交互作用,以便成功地优化超参数。根据数据集的大小以及比较的模型数量,返回最大观察性能的优化策略可能会遇到问题(无论是网格搜索还是你的策略,都存在此问题)。
其原因是,搜索大量性能估计以找到最大值会削减性能估计的方差:你可能会最终得到一个看似不错的模型和训练/测试拆分组合。更糟的是,你可能会得到多个看似完美的组合,然后优化过程就无法判断选择哪个模型,从而变得不稳定。
交叉验证
交叉验证(也称为旋转估计(RE))是一种用于评估统计分析和结果质量的模型验证技术。其目标是使模型能够泛化到独立的测试集上。交叉验证技术的一个完美应用是从机器学习模型进行预测。它能帮助你估计当你将预测模型部署为机器学习应用时,模型在实际中的表现如何。交叉验证过程中,通常使用已知类型的数据集来训练模型,反之,则使用未知类型的数据集进行测试。
在这方面,交叉验证有助于描述数据集,以便在训练阶段使用验证集来测试模型。有两种交叉验证类型,如下所述:
-
穷尽交叉验证:包括留 P 交叉验证和留一交叉验证。
-
非穷尽交叉验证:包括 K 折交叉验证和重复随机子抽样交叉验证。
在大多数情况下,研究人员/数据科学家/数据工程师使用 10 折交叉验证,而不是在验证集上进行测试。这是跨用例和问题类型中最广泛使用的交叉验证技术,正如下图所示:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00372.gif**图 6:**交叉验证基本上将你的完整可用训练数据分成若干折。这个参数是可以指定的。然后,整个管道会对每一折运行一次,并且每一折都会训练一个机器学习模型。最后,通过投票方案对分类器进行联合,或者通过平均对回归进行联合。
此外,为了减少变异性,会使用不同的分区进行多次交叉验证迭代;最后,验证结果会在各轮中取平均值。下图展示了使用逻辑回归进行超参数调优的示例:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00046.jpeg**图 7:**使用逻辑回归进行超参数调优的示例
使用交叉验证代替常规验证有两个主要优点,概述如下:
-
首先,如果可用的数据不足以在单独的训练集和测试集之间进行划分,就可能会失去重要的建模或测试能力。
-
其次,K 折交叉验证估计器的方差低于单一的保留集估计器。较低的方差限制了变动,这在数据量有限时尤为重要。
在这种情况下,合理的估算模型预测和相关性能的公平方式是使用交叉验证作为一种强大的模型选择和验证技术。如果我们需要手动选择特征和参数调优,那么在此之后,我们可以对整个数据集进行 10 折交叉验证来评估模型。什么策略最合适?我们建议你选择一种提供乐观评分的策略,具体如下:
-
将数据集分为训练集,例如 80%,和测试集 20%或其他你选择的比例
-
使用 K 折交叉验证在训练集上调优模型
-
重复交叉验证,直到找到优化并调优的模型。
现在,使用你的模型对测试集进行预测,以估算模型误差。
信用风险分析 – 超参数调优的示例
在本节中,我们将展示机器学习超参数调优的实际示例,涉及网格搜索和交叉验证技术。更具体地说,首先,我们将开发一个信用风险管道,该管道在银行和信用合作社等金融机构中常见。随后,我们将探讨如何通过超参数调优提高预测准确性。在深入示例之前,让我们快速概述一下什么是信用风险分析,以及它为何重要?
什么是信用风险分析?它为何重要?
当申请人申请贷款并且银行收到该申请时,基于申请人的资料,银行需要决定是否批准该贷款申请。在这方面,银行在贷款申请决策时面临两种类型的风险:
-
申请人属于信用风险较低:这意味着客户或申请人更有可能偿还贷款。如果贷款未被批准,银行可能会因此失去业务。
-
申请人属于信用风险较高:这意味着客户或申请人最有可能无法偿还贷款。在这种情况下,批准贷款将导致银行遭受财务损失。
该机构表示,第二种情况比第一种情况更具风险,因为银行更有可能无法收回借款。因此,大多数银行或信用合作社会评估向客户、申请人或顾客借款所涉及的风险。在商业分析中,最小化风险往往能最大化银行自身的利润。
换句话说,从财务角度来看,最大化利润并最小化损失非常重要。银行通常会根据申请人的不同因素和参数(例如有关贷款申请的各类人口和社会经济条件)来决定是否批准贷款申请。
数据集探索
德国信用数据集从 UCI 机器学习库下载,网址为 archive.ics.uci.edu/ml/machine-learning-databases/statlog/german/。虽然在该链接中可以找到数据集的详细描述,但我们在 表 3 中提供了一些简要的见解。数据包含了关于 21 个变量的信用相关数据,以及对于 1000 名贷款申请人是否被认为是好的或坏的信用风险的分类(即二分类问题)。
以下表格展示了在将数据集公开之前,所考虑的每个变量的详细信息:
| 条目 | 变量 | 说明 |
|---|---|---|
| 1 | creditability | 还款能力:值为 1.0 或 0.0 |
| 2 | balance | 当前余额 |
| 3 | duration | 贷款申请期限 |
| 4 | history | 是否有不良贷款历史? |
| 5 | purpose | 贷款目的 |
| 6 | amount | 申请金额 |
| 7 | savings | 每月储蓄 |
| 8 | employment | 就业状态 |
| 9 | instPercent | 利率百分比 |
| 10 | sexMarried | 性别及婚姻状态 |
| 11 | guarantors | 是否有担保人? |
| 12 | residenceDuration | 当前地址的居住时长 |
| 13 | assets | 净资产 |
| 14 | age | 申请人年龄 |
| 15 | con***redit | 并行信用 |
| 16 | apartment | 住宅状态 |
| 17 | credits | 当前信用 |
| 18 | o***upation | 职业 |
| 19 | dependents | 赡养人数 |
| 20 | hasPhone | 申请人是否使用电话 |
| 21 | foreign | 申请人是否为外国人 |
请注意,尽管 表 3 描述了与变量相关的标题,但数据集本身没有相关标题。在 表 3 中,我们展示了每个变量的名称、位置以及相关的重要性。
步骤示例:使用 Spark ML
在这里,我们将提供一个使用随机森林分类器进行信用风险预测的逐步示例。步骤包括从数据导入、一些统计分析、训练集准备,到最终的模型评估:
步骤 1. 加载并解析数据集到 RDD:
val creditRDD = parseRDD(sc.textFile("data/germancredit.csv")).map(parseCredit)
对于上一行,parseRDD() 方法用于用 , 分割条目,然后将它们转换为 Double 类型(即数值)。该方法如下所示:
def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {
rdd.map(_.split(",")).map(_.map(_.toDouble))
}
另一方面,parseCredit() 方法用于基于 Credit 案例类解析数据集:
def parseCredit(line: Array[Double]): Credit = {
Credit(
line(0), line(1) - 1, line(2), line(3), line(4), line(5),
line(6) - 1, line(7) - 1, line(8), line(9) - 1, line(10) - 1,
line(11) - 1, line(12) - 1, line(13), line(14) - 1, line(15) - 1,
line(16) - 1, line(17) - 1, line(18) - 1, line(19) - 1, line(20) - 1)
}
Credit 案例类如下所示:
case class Credit(
creditability: Double,
balance: Double, duration: Double, history: Double, purpose: Double, amount: Double,
savings: Double, employment: Double, instPercent: Double, sexMarried: Double, guarantors: Double,
residenceDuration: Double, assets: Double, age: Double, con***redit: Double, apartment: Double,
credits: Double, o***upation: Double, dependents: Double, hasPhone: Double, foreign: Double)
步骤 2. 准备 ML 管道的数据框架 - 获取 ML 管道的数据框架
val sqlContext = new SQLContext(sc)
import sqlContext._
import sqlContext.implicits._
val creditDF = creditRDD.toDF().cache()
将它们保存为临时视图,以便于查询:
creditDF.createOrReplaceTempView("credit")
让我们来看看这个数据框的快照:
creditDF.show
上述 show() 方法打印了信贷数据框:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00124.gif图 8: 信贷数据集的快照
步骤 3. 观察相关统计信息 - 首先,让我们看看一些聚合值:
sqlContext.sql("SELECT creditability, avg(balance) as avgbalance, avg(amount) as avgamt, avg(duration) as avgdur FROM credit GROUP BY creditability ").show
让我们看看余额的统计数据:
creditDF.describe("balance").show
现在,让我们查看平均余额的信贷能力:
creditDF.groupBy("creditability").avg("balance").show
三行代码的输出:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00030.gif图 9: 数据集的一些统计信息
步骤 4. 特征向量和标签的创建 - 如你所见,credibility 列是响应列,结果时,我们需要创建特征向量,而不考虑该列。现在,让我们按照以下方式创建特征列:
val featureCols = Array("balance", "duration", "history", "purpose", "amount", "savings", "employment", "instPercent", "sexMarried",
"guarantors", "residenceDuration", "assets", "age", "con***redit",
"apartment", "credits", "o***upation", "dependents", "hasPhone",
"foreign")
让我们使用 VectorAssembler() API 来组合这些选定列的所有特征,如下所示:
val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
val df2 = assembler.transform(creditDF)
现在让我们看看特征向量是什么样子的:
df2.select("features").show
上述行显示了由 VectorAssembler 转换器创建的特征:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00360.gif图 10: 使用 VectorAssembler 为 ML 模型生成特征
现在,让我们使用 StringIndexer 从旧的响应列“creditability”中创建一个新列作为标签,如下所示:
val labelIndexer = new StringIndexer().setInputCol("creditability").setOutputCol("label")
val df3 = labelIndexer.fit(df2).transform(df2)
df3.select("label", "features").show
上述行显示了由 VectorAssembler 转换器创建的特征和标签:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00274.gif图 11: 使用 VectorAssembler 的 ML 模型的对应标签和特征
步骤 5. 准备训练集和测试集:
val splitSeed = 5043
val Array(trainingData, testData) = df3.randomSplit(Array(0.80, 0.20), splitSeed)
步骤 6. 训练随机森林模型 - 首先,实例化模型:
val classifier = new RandomForestClassifier()
.setImpurity("gini")
.setMaxDepth(30)
.setNumTrees(30)
.setFeatureSubsetStrategy("auto")
.setSeed(1234567)
.setMaxBins(40)
.setMinInfoGain(0.001)
有关前面参数的解释,请参考本章中的随机森林算法部分。现在,让我们使用训练集训练模型:
val model = classifier.fit(trainingData)
步骤 7. 计算测试集的原始预测:
val predictions = model.transform(testData)
让我们查看这个数据框的前 20 行:
predictions.select("label","rawPrediction", "probability", "prediction").show()
上述行显示了包含标签、原始预测、概率和实际预测的 DataFrame:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00040.gif图 12: 包含测试集原始和实际预测的 DataFrame
现在,在看到最后一列的预测后,银行可以决定接受哪些申请。
步骤 8. 调整前的模型评估 - 实例化二元评估器:
val binaryClassificationEvaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("rawPrediction")
计算测试集预测准确性如下:
val a***uracy = binaryClassificationEvaluator.evaluate(predictions)
println("The a***uracy before pipeline fitting: " + a***uracy)
管道拟合之前的准确率:0.751921784149243
这次,准确率为 75%,虽然不算很好,但让我们计算二元分类器的其他重要性能指标,如 接收者操作特征曲线下的面积(AUROC)和 精度召回曲线下的面积(AUPRC):
println("Area Under ROC before tuning: " + printlnMetric("areaUnderROC"))
println("Area Under PRC before tuning: "+ printlnMetric("areaUnderPR"))
Area Under ROC before tuning: 0.8453079178885631 Area Under PRC before tuning: 0.751921784149243
printlnMetric() 方法如下:
def printlnMetric(metri***ame: String): Double = {
val metrics = binaryClassificationEvaluator.setMetri***ame(metri***ame)
.evaluate(predictions)
metrics
}
最后,让我们使用 RegressionMetrics() API 计算一些我们在训练过程中使用的随机森林模型的其他性能指标:
val rm = new RegressionMetrics(
predictions.select("prediction", "label").rdd.map(x =>
(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
现在,让我们看看我们的模型如何:
println("MSE: " + rm.meanSquaredError)
println("MAE: " + rm.meanAbsoluteError)
println("RMSE Squared: " + rm.rootMeanSquaredError)
println("R Squared: " + rm.r2)
println("Explained Variance: " + rm.explainedVariance + "\n")
我们得到以下输出:
MSE: 0.2578947368421053
MAE: 0.2578947368421053
RMSE Squared: 0.5078333750770082
R Squared: -0.13758553274682295
Explained Variance: 0.16083102493074794
还不错!不过,也不算满意,对吧?让我们使用网格搜索和交叉验证技术来调整模型。
步骤 9. 使用网格搜索和交叉验证进行模型调优 - 首先,让我们使用 ParamGridBuilder API 构建一个参数网格,在 20 到 70 棵树之间搜索,maxBins 在 25 到 30 之间,maxDepth 在 5 到 10 之间,且不纯度使用熵和基尼系数:
val paramGrid = new ParamGridBuilder()
.addGrid(classifier.maxBins, Array(25, 30))
.addGrid(classifier.maxDepth, Array(5, 10))
.addGrid(classifier.numTrees, Array(20, 70))
.addGrid(classifier.impurity, Array("entropy", "gini"))
.build()
让我们使用训练集来训练交叉验证模型,如下所示:
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(binaryClassificationEvaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(10)
val pipelineFittedModel = cv.fit(trainingData)
计算测试集的原始预测,如下所示:
val predictions2 = pipelineFittedModel.transform(testData)
步骤 10. 调优后的模型评估 - 让我们看一下准确率:
val a***uracy2 = binaryClassificationEvaluator.evaluate(predictions2)
println("The a***uracy after pipeline fitting: " + a***uracy2)
我们得到以下输出:
The a***uracy after pipeline fitting: 0.8313782991202348
现在,准确率超过了 83%。确实有了很大的改进!让我们看看另外两个指标,计算 AUROC 和 AUPRC:
def printlnMetricAfter(metri***ame: String): Double = {
val metrics = binaryClassificationEvaluator.setMetri***ame(metri***ame).evaluate(predictions2)
metrics
}
println("Area Under ROC after tuning: " + printlnMetricAfter("areaUnderROC"))
println("Area Under PRC after tuning: "+ printlnMetricAfter("areaUnderPR"))
我们得到以下输出:
Area Under ROC after tuning: 0.8313782991202345
Area Under PRC after tuning: 0.7460301367852662
现在,根据 RegressionMetrics API,计算其他指标:
val rm2 = new RegressionMetrics(predictions2.select("prediction", "label").rdd.map(x => (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
println("MSE: " + rm2.meanSquaredError)
println("MAE: " + rm2.meanAbsoluteError)
println("RMSE Squared: " + rm2.rootMeanSquaredError)
println("R Squared: " + rm2.r2)
println("Explained Variance: " + rm2.explainedVariance + "\n")
我们得到以下输出:
MSE: 0.268421052631579
MAE: 0.26842105263157895
RMSE Squared: 0.5180936716768301
R Squared: -0.18401759530791795
Explained Variance: 0.16404432132963992
步骤 11. 查找最佳的交叉验证模型 - 最后,让我们找到最佳的交叉验证模型信息:
pipelineFittedModel
.bestModel.asInstanceOf[org.apache.spark.ml.PipelineModel]
.stages(0)
.extractParamMap
println("The best fitted model:" + pipelineFittedModel.bestModel.asInstanceOf[org.apache.spark.ml.PipelineModel].stages(0))
我们得到以下输出:
The best fitted model:RandomForestClassificationModel (uid=rfc_1fcac012b37c) with 70 trees
一个基于 Spark 的推荐系统
推荐系统试图根据其他用户的历史预测用户可能感兴趣的潜在项目。基于模型的协同过滤在许多公司中被广泛使用,例如 ***flix。值得注意的是,***flix 是一家美国娱乐公司,由 Reed Hastings 和 Marc Randolph 于 1997 年 8 月 29 日在加利福尼亚州的 Scotts Valley 创立。它专注于并提供在线流媒体和视频点播服务,以及通过邮寄方式提供 DVD。2013 年,***flix 扩展到电影和电视制作以及在线分发。到 2017 年,该公司将总部设在加利福尼亚州的 Los Gatos(来源:维基百科)。***flix 是一个实时电影推荐的推荐系统。在本节中,我们将看到一个完整的例子,了解它如何为新用户推荐电影。
基于 Spark 的模型推荐
Spark MLlib 中的实现支持基于模型的协同过滤。在基于模型的协同过滤技术中,用户和产品通过一小组因子来描述,这些因子也称为 潜在因子 (LFs)。从下图中,你可以对不同的推荐系统有一些了解。图 13 说明了为什么我们要使用基于模型的协同过滤进行电影推荐示例:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00284.gif图 13:不同推荐系统的对比视图
然后使用 LF 来预测缺失的条目。Spark API 提供了交替最小二乘法(也称为 ALS)算法的实现,该算法用于通过考虑六个参数来学习这些潜在因子,包括:
-
numBlocks:这是用于并行计算的块数(设置为 -1 以自动配置)。
-
rank:这是模型中潜在因子的数量。
-
iterations:这是 ALS 运行的迭代次数。ALS 通常在 20 次迭代或更少的次数内收敛到一个合理的解。
-
lambda:这指定了 ALS 中的正则化参数。
-
implicitPrefs:该参数指定是否使用 显式反馈 ALS 变体,或者使用适应于 隐式反馈 数据的变体。
-
alpha:这是一个适用于 ALS 隐式反馈变体的参数,控制在偏好观测中的 基线 信心。
请注意,要构建具有默认参数的 ALS 实例;您可以根据需求设置该值。默认值如下:numBlocks: -1、rank: 10、iterations: 10、lambda: 0.01、implicitPrefs: false 和 alpha: 1.0。
数据探索
电影及其相应的评分数据集是从 MovieLens 网站下载的 (movielens.org)。根据 MovieLens 网站上的数据描述,所有评分都记录在 ratings.csv 文件中。该文件中的每一行(紧跟在标题之后)代表一位用户对一部电影的评分。
CSV 数据集包含以下列:userId、movieId、rating 和 timestamp,如 图 14 所示。行按 userId 排序,用户内再按 movieId 排序。评分是基于五颗星的评分标准,每次增量为半颗星(从 0.5 星到 5.0 星)。时间戳表示自 1970 年 1 月 1 日午夜协调世界时(UTC)以来的秒数,我们有来自 668 个用户对 10,325 部电影的 105,339 条评分:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00244.gif图 14: 评分数据集快照
另一方面,电影信息包含在 movies.csv 文件中。除了标题信息外,该文件的每一行代表一部电影,包含以下列:movieId、title 和 genres(见 图 14)。电影标题可以手动创建或插入,或者从电影数据库网站 www.themoviedb.org/ 导入。然而,发行年份会显示在括号中。由于电影标题是手动插入的,因此可能存在一些错误或不一致。建议读者检查 IMDb 数据库 (www.ibdb.***/),确保没有与相应发行年份不一致或错误的标题。
类型是一个分隔列表,可以从以下类型类别中选择:
-
动作片、冒险片、动画片、儿童片、喜剧片、犯罪片
-
纪录片、剧情片、幻想片、黑色电影、恐怖片、音乐剧
-
悬疑片、浪漫片、科幻片、惊悚片、西部片、战争片
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00356.gif图 15:前 20 部电影的标题和类型
使用 ALS 进行电影推荐
在本小节中,我们将通过从数据收集到电影推荐的逐步示例,展示如何为其他用户推荐电影。
步骤 1. 加载、解析和探索电影和评分数据集 - 以下是代码示例:
val ratigsFile = "data/ratings.csv"
val df1 = spark.read.format("***.databricks.spark.csv").option("header", true).load(ratigsFile)
val ratingsDF = df1.select(df1.col("userId"), df1.col("movieId"), df1.col("rating"), df1.col("timestamp"))
ratingsDF.show(false)
该代码段应返回评分的 DataFrame。另一方面,以下代码段显示了电影的 DataFrame:
val moviesFile = "data/movies.csv"
val df2 = spark.read.format("***.databricks.spark.csv").option("header", "true").load(moviesFile)
val moviesDF = df2.select(df2.col("movieId"), df2.col("title"), df2.col("genres"))
步骤 2. 将两个 DataFrame 注册为临时表以便于查询 - 为了注册这两个数据集,我们可以使用以下代码:
ratingsDF.createOrReplaceTempView("ratings")
moviesDF.createOrReplaceTempView("movies")
这将通过在内存中创建一个临时视图作为表来加速内存查询。使用createOrReplaceTempView()方法创建的临时表的生命周期与用于创建该 DataFrame 的[[SparkSession]]相绑定。
步骤 3. 探索和查询相关统计信息 - 让我们检查与评分相关的统计信息。只需要使用以下代码行:
val numRatings = ratingsDF.count()
val numUsers = ratingsDF.select(ratingsDF.col("userId")).distinct().count()
val numMovies = ratingsDF.select(ratingsDF.col("movieId")).distinct().count()
println("Got " + numRatings + " ratings from " + numUsers + " users on " + numMovies + " movies.")
你应该会发现,用户 668 对 10,325 部电影进行了 105,339 次评分。现在,让我们获取最大和最小评分,并统计评分电影的用户数。为此,你需要在前一步创建的内存中的评分表上执行 SQL 查询。这里进行查询很简单,类似于从 MySQL 数据库或关系型数据库管理系统(RDBMS)执行查询。如果你不熟悉基于 SQL 的查询,建议查阅 SQL 查询规范,了解如何使用SELECT从特定表中选择数据,如何使用ORDER进行排序,以及如何使用JOIN关键字进行联接操作。
好吧,如果你知道 SQL 查询,你应该能通过以下复杂的 SQL 查询获得一个新的数据集:
// Get the max, min ratings along with the count of users who have rated a movie.
val results = spark.sql("select movies.title, movierates.maxr, movierates.minr, movierates.***tu "
+ "from(SELECT ratings.movieId,max(ratings.rating) as maxr,"
+ "min(ratings.rating) as minr,count(distinct userId) as ***tu "
+ "FROM ratings group by ratings.movieId) movierates "
+ "join movies on movierates.movieId=movies.movieId "
+ "order by movierates.***tu desc")
results.show(false)
我们得到了以下输出:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00073.gif图 16: 最大、最小评分及评分电影的用户数
为了获得更多的洞察,我们需要了解更多关于用户及其评分的信息。现在,让我们找出最活跃的用户以及他们评分电影的次数:
// Show the top 10 mostactive users and how many times they rated a movie
val mostActiveUsersSchemaRDD = spark.sql("SELECT ratings.userId, count(*) as ct from ratings "
+ "group by ratings.userId order by ct desc limit 10")
mostActiveUsersSchemaRDD.show(false)
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00262.jpeg图 17: 最活跃的 10 个用户以及他们评分电影的次数
让我们查看某个特定用户,找到例如用户 668 评分高于 4 的电影:
// Find the movies that user 668 rated higher than 4
val results2 = spark.sql(
"SELECT ratings.userId, ratings.movieId,"
+ "ratings.rating, movies.title FROM ratings JOIN movies"
+ "ON movies.movieId=ratings.movieId"
+ "where ratings.userId=668 and ratings.rating > 4")
results2.show(false)
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00035.gif图 18: 用户 668 评分高于 4 的电影
步骤 4. 准备训练和测试评分数据并查看数量 - 以下代码将评分 RDD 划分为训练数据 RDD(75%)和测试数据 RDD(25%)。这里的种子值是可选的,但为了可重复性目的,推荐使用:
// Split ratings RDD into training RDD (75%) & test RDD (25%)
val splits = ratingsDF.randomSplit(Array(0.75, 0.25), seed = 12345L)
val (trainingData, testData) = (splits(0), splits(1))
val numTraining = trainingData.count()
val numTest = testData.count()
println("Training: " + numTraining + " test: " + numTest)
你应该会发现训练集中有 78,792 条评分,测试集中有 26,547 条评分
DataFrame。
步骤 5. 为构建使用 ALS 的推荐模型准备数据 - ALS 算法使用Rating类型的 RDD 作为训练数据。以下代码演示了如何使用 API 构建推荐模型:
val ratingsRDD = trainingData.rdd.map(row => {
val userId = row.getString(0)
val movieId = row.getString(1)
val ratings = row.getString(2)
Rating(userId.toInt, movieId.toInt, ratings.toDouble)
})
ratingsRDD 是一个包含来自训练数据集的 userId、movieId 和相应评分的评分 RDD。另一方面,还需要一个测试 RDD 来评估模型。以下 testRDD 也包含来自前一步准备的测试 DataFrame 的相同信息:
val testRDD = testData.rdd.map(row => {
val userId = row.getString(0)
val movieId = row.getString(1)
val ratings = row.getString(2)
Rating(userId.toInt, movieId.toInt, ratings.toDouble)
})
步骤 6. 构建 ALS 用户产品矩阵 - 基于ratingsRDD构建 ALS 用户矩阵模型,通过指定最大迭代次数、块数、alpha、rank、lambda、种子和implicitPrefs。本质上,这种技术通过其他用户对其他电影的评分来预测特定用户对特定电影的缺失评分:
val rank = 20
val numIterations = 15
val lambda = 0.10
val alpha = 1.00
val block = -1
val seed = 12345L
val implicitPrefs = false
val model = new ALS()
.setIterations(numIterations)
.setBlocks(block)
.setAlpha(alpha)
.setLambda(lambda)
.setRank(rank)
.setSeed(seed)
.setImplicitPrefs(implicitPrefs)
.run(ratingsRDD)
最后,我们将模型迭代学习了 15 次。通过这个设置,我们获得了良好的预测准确度。建议读者进行超参数调优,以便了解这些参数的最佳值。此外,将用户块和产品块的数量设置为 -1,以自动配置块数并并行化计算。该值为 -1。
步骤 7. 进行预测 - 让我们为用户 668 获取前六个电影预测。可以使用以下源代码来进行预测:
// Making Predictions. Get the top 6 movie predictions for user 668
println("Rating:(UserID, MovieID, Rating)")
println("----------------------------------")
val topRecsForUser = model.re***mendProducts(668, 6)
for (rating <- topRecsForUser) {
println(rating.toString())
}
println("----------------------------------")
上述代码段产生了以下输出,包含了带有UserID、MovieID及相应Rating的电影评分预测:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00376.gif图 19:用户 668 的前六个电影预测
步骤 8. 评估模型 - 为了验证模型的质量,使用均方根误差(RMSE)来衡量模型预测值与实际观测值之间的差异。默认情况下,计算出的误差越小,模型越好。为了测试模型的质量,使用测试数据(如第 4 步中所拆分的数据)。根据许多机器学习从业者的说法,RMSE 是衡量准确度的一个好方法,但仅限于比较同一变量不同模型的预测误差,而不能用于不同变量之间的比较,因为它是依赖于尺度的。以下代码行计算了使用训练集训练的模型的 RMSE 值:
var rmseTest = ***puteRmse(model, testRDD, true)
println("Test RMSE: = " + rmseTest) //Less is better
需要注意的是,***puteRmse()是一个 UDF,具体如下:
def ***puteRmse(model: MatrixFactorizationModel, data: RDD[Rating], implicitPrefs: Boolean): Double = {
val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
val predictionsAndRatings = predictions.map { x => ((x.user, x.product), x.rating)
}.join(data.map(x => ((x.user, x.product), x.rating))).values
if (implicitPrefs) {
println("(Prediction, Rating)")
println(predictionsAndRatings.take(5).mkString("\n"))
}
math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean())
}
上述方法计算 RMSE 用以评估模型。RMSE 越小,模型及其预测能力越好。
对于之前的设置,我们得到了以下输出:
Test RMSE: = 0.9019872589764073
我们认为,前述模型的性能还可以进一步提高。感兴趣的读者可以访问此网址,了解有关调优基于 ML 的 ALS 模型的更多信息:spark.apache.org/docs/preview/ml-collaborative-filtering.html。
主题建模技术广泛应用于从大量文档中挖掘文本的任务。这些主题可以用来总结和组织包含主题词及其相对权重的文档。在下一部分,我们将展示使用潜在狄利克雷分配(LDA)算法进行主题建模的示例。
主题建模 - 文本聚类的最佳实践
主题建模技术广泛应用于从大量文档集合中挖掘文本的任务。这些主题随后可以用来总结和组织包含主题词及其相对权重的文档。这个示例所使用的数据集只是纯文本格式,然而,它是一个非结构化格式。现在,具有挑战性的部分是通过 LDA 进行主题建模,从数据中找到有用的模式。
LDA 是如何工作的?
LDA 是一种从文本集合中推断主题的主题模型。LDA 可以看作是一种聚类算法,其中主题对应于聚类中心,文档对应于数据集中的实例(行)。主题和文档都存在于一个特征空间中,其中特征向量是词频向量(词袋模型)。与传统距离估计聚类不同,LDA 使用基于文本文档生成统计模型的函数。
LDA 通过 setOptimizer 函数支持不同的推断算法。EMLDAOptimizer 使用期望最大化对似然函数进行学习,并提供全面的结果,而 OnlineLDAOptimizer 使用迭代的 mini-batch 采样进行在线变分推断,通常更节省内存。LDA 输入一组文档,作为词频向量,并使用以下参数(通过构建器模式设置):
-
k:主题数量(即聚类中心)。 -
optimizer:用于学习 LDA 模型的优化器,可以是EMLDAOptimizer或OnlineLDAOptimizer。 -
do***oncentration:Dirichlet 参数,用于定义文档在主题分布上的先验。较大的值有助于生成更平滑的推断分布。 -
topi***oncentration:Dirichlet 参数,用于定义主题在词汇(单词)上的分布的先验。较大的值有助于生成更平滑的推断分布。 -
maxIterations:迭代次数的限制。 -
checkpointInterval:如果使用检查点(在 Spark 配置中设置),该参数指定创建检查点的频率。如果maxIterations值较大,使用检查点可以帮助减少磁盘上 shuffle 文件的大小,并有助于故障恢复。
特别地,我们希望讨论从大量文本集合中,人们最常谈论的主题。自 Spark 1.3 版本发布以来,MLlib 支持 LDA,这是一种在文本挖掘和自然语言处理(NLP)领域广泛使用的主题建模技术。此外,LDA 也是第一个采用 Spark GraphX 的 MLlib 算法。
要了解有关 LDA 背后理论的更多信息,请参阅 David M. Blei、Andrew Y. Ng 和 Michael I. Jordan 的《潜在狄利克雷分配》,机器学习研究期刊 3(2003)993-1022。
下图展示了从随机生成的推文文本中得到的主题分布:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00165.jpeg
图 20:主题分布及其外观
在本节中,我们将通过使用 Spark MLlib 的 LDA 算法处理非结构化的原始推文数据集来演示主题建模的一个示例。请注意,这里我们使用的是 LDA,这是最常用的文本挖掘主题建模算法之一。我们还可以使用更强大的主题建模算法,如概率潜在情感分析(pLSA)、八股分配模型(PAM)或层次狄利克雷过程(HDP)算法。
然而,pLSA 存在过拟合问题。另一方面,HDP 和 PAM 是更复杂的主题建模算法,通常用于复杂的文本挖掘任务,例如从高维文本数据或非结构化文本文档中挖掘主题。此外,至今为止,Spark 只实现了一种主题建模算法,即 LDA。因此,我们必须合理使用 LDA。
使用 Spark MLlib 进行主题建模
在本小节中,我们展示了一种使用 Spark 进行半自动化主题建模的技术。使用默认的其他选项,我们在从 GitHub URL 下载的数据集上训练 LDA:github.***/minghui/Twitter-LDA/tree/master/data/Data4Model/test。以下步骤展示了从数据读取到打印主题的主题建模过程,同时显示每个主题的术语权重。下面是主题建模管道的简短工作流程:
object topicModellingwithLDA {
def main(args: Array[String]): Unit = {
val lda = new LDAforTM() // actual ***putations are done here
val defaultParams = Params().copy(input = "data/docs/")
// Loading the parameters
lda.run(defaultParams) // Training the LDA model with the default
parameters.
}
}
主题建模的实际计算是在LDAforTM类中完成的。Params是一个案例类,用于加载训练 LDA 模型的参数。最后,我们通过Params类设置的参数来训练 LDA 模型。现在,我们将逐步解释每个步骤及其源代码:
步骤 1. 创建 Spark 会话 - 让我们通过定义计算核心数、SQL 仓库和应用程序名称来创建 Spark 会话,如下所示:
val spark = SparkSession
.builder
.master("local[*]")
.config("spark.sql.warehouse.dir", "E:/Exp/")
.appName("LDA for topic modelling")
.getOrCreate()
步骤 2. 创建词汇表,令牌计数以便在文本预处理后训练 LDA - 首先,加载文档,并为 LDA 做准备,如下所示:
// Load documents, and prepare them for LDA.
val preprocessStart = System.nanoTime()
val (corpus, vocabArray, actualNumTokens) = preprocess(params.input, params.vocabSize, params.stopwordFile)
预处理方法用于处理原始文本。首先,使用wholeTextFiles()方法读取整个文本,如下所示:
val initialrdd = spark.sparkContext.wholeTextFiles(paths).map(_._2)
initialrdd.cache()
在前面的代码中,paths 是文本文件的路径。然后,我们需要根据词干文本从原始文本准备一个形态学 RDD,如下所示:
val rdd = initialrdd.mapPartitions { partition =>
val morphology = new Morphology()
partition.map { value => helperForLDA.getLemmaText(value, morphology) }
}.map(helperForLDA.filterSpecialCharacters)
这里,helperForLDA类中的getLemmaText()方法提供了在过滤掉特殊字符(如("""[! @ # $ % ^ & * ( ) _ + - − , " ' ; : . ` ? --])之后的词干文本,使用filterSpaecialChatacters()方法作为正则表达式进行过滤。
需要注意的是,Morphology()类计算英语单词的基础形式,方法是去除词尾变化(而非派生形态)。也就是说,它仅处理名词的复数形式、代词的格、动词的时态和数等,而不涉及比较级形容词或派生名词等内容。这一方法来自斯坦福 NLP 小组。要使用它,你需要在主类文件中添加以下导入:edu.stanford.nlp.process.Morphology。在pom.xml文件中,你需要将以下条目作为依赖项包含:
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.6.0</version>
<classifier>models</classifier>
</dependency>
方法实现如下:
def getLemmaText(document: String, morphology: Morphology) = {
val string = new StringBuilder()
val value = new Document(document).sentences().toList.flatMap { a =>
val words = a.words().toList
val tags = a.posTags().toList
(words zip tags).toMap.map { a =>
val newWord = morphology.lemma(a._1, a._2)
val addedWoed = if (newWord.length > 3) {
newWord
} else { "" }
string.append(addedWoed + " ")
}
}
string.toString()
}
filterSpecialCharacters()的实现如下:
def filterSpecialCharacters(document: String) = document.replaceAll("""[! @ # $ % ^ & * ( ) _ + - − , " ' ; : . ` ? --]""", " ")。一旦我们手头有了去除特殊字符的 RDD,我们就可以创建一个 DataFrame,用于构建文本分析管道:
rdd.cache()
initialrdd.unpersist()
val df = rdd.toDF("docs")
df.show()
因此,DataFrame 仅包含文档标签。DataFrame 的快照如下:
https://github.***/OpenDoc***/freelearn-ds-pt5-zh/raw/master/docs/scl-spk-bgdt-anal/img/00332.gif图 21:原始文本
如果你仔细检查前面的 DataFrame,你会发现我们仍然需要对项目进行分词。此外,在像这样的 DataFrame 中存在停用词,因此我们也需要将它们移除。首先,让我们使用RegexTokenizer API 按以下方式对其进行分词:
val tokenizer = new RegexTokenizer().setInputCol("docs").setOutputCol("rawTokens")
现在,让我们按以下方式移除所有停用词:
val stopWordsRemover = new StopWordsRemover().setInputCol("rawTokens").setOutputCol("tokens")
stopWordsRemover.setStopWords(stopWordsRemover.getStopWords ++ customizedStopWords)
此外,我们还需要应用计数胜利来从词元中仅提取重要的特征。这将有助于在管道阶段将管道链式连接。我们按以下方式操作:
val countVectorizer = new CountVectorizer().setVocabSize(vocabSize).setInputCol("tokens").setOutputCol("features")
现在,通过以下方式链式连接变换器(tokenizer、stopWordsRemover和countVectorizer)创建管道:
val pipeline = new Pipeline().setStages(Array(tokenizer, stopWordsRemover, countVectorizer))
让我们拟合并转换管道,以适应词汇表和词元数量:
val model = pipeline.fit(df)
val documents = model.transform(df).select("features").rdd.map {
case Row(features: MLVector) =>Vectors.fromML(features)
}.zipWithIndex().map(_.swap)
最后,按以下方式返回词汇表和词元计数对:
(documents, model.stages(2).asInstanceOf[CountVectorizerModel].vocabulary, documents.map(_._2.numActives).sum().toLong)
现在,让我们查看训练数据的统计信息:
println()
println("Training corpus summary:")
println("-------------------------------")
println("Training set size: " + actualCorpusSize + " documents")
println("Vocabulary size: " + actualVocabSize + " terms")
println("Number of tockens: " + actualNumTokens + " tokens")
println("Preprocessing time: " + preprocessElapsed + " sec")
println("-------------------------------")
println()
我们得到以下输出:
Training corpus summary:
-------------------------------
Training set size: 18 documents
Vocabulary size: 21607 terms
Number of tockens: 75758 tokens
Preprocessing time: 39.768580981 sec
**-------------------------------**
步骤 4. 在训练之前实例化 LDA 模型
val lda = new LDA()
步骤 5: 设置 NLP 优化器
为了从 LDA 模型中获得更好且经过优化的结果,我们需要为 LDA 模型设置优化器。在这里,我们使用EMLDAOPtimizer优化器。你也可以使用OnlineLDAOptimizer()优化器。不过,你需要在MiniBatchFraction中加入(1.0/actualCorpusSize),以便在小数据集上更加健壮。整个操作如下。首先,按以下方式实例化EMLDAOptimizer:
val optimizer = params.algorithm.toLowerCase match {
case "em" => new EMLDAOptimizer
case "online" => new OnlineLDAOptimizer().setMiniBatchFraction(0.05 + 1.0 / actualCorpusSize)
case _ => throw new IllegalArgumentException("Only em is supported, got ${params.algorithm}.")
}
现在通过以下方式使用 LDA API 中的setOptimizer()方法设置优化器:
lda.setOptimizer(optimizer)
.setK(params.k)
.setMaxIterations(params.maxIterations)
.setDo***oncentration(params.do***oncentration)
.setTopi***oncentration(params.topi***oncentration)
.setCheckpointInterval(params.checkpointInterval)
Params案例类用于定义训练 LDA 模型的参数。其结构如下:
//Setting the parameters before training the LDA model
case class Params(input: String = "",
k: Int = 5,
maxIterations: Int = 20,
do***oncentration: Double = -1,
topi***oncentration: Double = -1,
vocabSize: Int = 2900000,
stopwordFile: String = "data/stopWords.txt",
algorithm: String = "em",
checkpointDir: Option[String] = None,
checkpointInterval: Int = 10)
为了获得更好的结果,你可以以简单的方式设置这些参数。或者,你可以选择交叉验证以获得更好的性能。如果你想保存当前的参数,请使用以下代码行:
if (params.checkpointDir.nonEmpty) {
spark.sparkContext.setCheckpointDir(params.checkpointDir.get)
}
步骤 6. 训练 LDA 模型:
val startTime = System.nanoTime()
//Start training the LDA model using the training corpus
val ldaModel = lda.run(corpus)
val elapsed = (System.nanoTime() - startTime) / 1e9
println(s"Finished training LDA model. Summary:")
println(s"t Training time: $elapsed sec")
对于我们所拥有的文本,LDA 模型训练时间为 6.309715286 秒。请注意,这些时间代码是可选的。我们提供这些代码仅供参考,以了解训练时间。
步骤 7. 测量数据的似然性 - 现在,为了获得更多关于数据的统计信息,如最大似然或对数似然,我们可以使用以下代码:
if (ldaModel.isInstanceOf[DistributedLDAModel]) {
val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]
val avgLogLikelihood = distLDAModel.logLikelihood / actualCorpusSize.toDouble
println("The average log likelihood of the training data: " + avgLogLikelihood)
println()
}
前述代码计算了 LDA 模型作为分布式版本的实例时的平均对数似然。我们得到了以下输出:
The average log-likelihood of the training data: -208599.21351837728
似然函数在数据可用后用于描述给定结果的参数(或参数向量)的函数。这在从一组统计数据估计参数时特别有用。有关似然度量的更多信息,感兴趣的读者可以参考en.wikipedia.org/wiki/Likelihood_function。
步骤 8. 准备感兴趣的主题 - 准备前五个主题,每个主题包含 10 个词条。包括这些词条及其对应的权重。
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)
println(topicIndices.length)
val topics = topicIndices.map {case (terms, termWeights) => terms.zip(termWeights).map { case (term, weight) => (vocabArray(term.toInt), weight) } }
步骤 9. 主题建模 - 打印前十个主题,展示每个主题的权重最高的词条。同时,列出每个主题的总权重,如下所示:
var sum = 0.0
println(s"${params.k} topics:")
topics.zipWithIndex.foreach {
case (topic, i) =>
println(s"TOPIC $i")
println("------------------------------")
topic.foreach {
case (term, weight) =>
println(s"$termt$weight")
sum = sum + weight
}
println("----------------------------")
println("weight: " + sum)
println()
现在,让我们看看 LDA 模型在主题建模方面的输出:
5 topics:
TOPIC 0
------------------------------
think 0.0105511077762379
look 0.010393384083882656
know 0.010121680765600402
***e 0.009999416569525854
little 0.009880422850906338
make 0.008982740529851225
take 0.007061048216197747
good 0.007040301924830752
much 0.006273732732002744
well 0.0062484438391950895
----------------------------
weight: 0.0865522792882307
TOPIC 1
------------------------------
look 0.008658099588372216
***e 0.007972622171954474
little 0.007596460821298818
hand 0.0065409990798624565
know 0.006314616294309573
lorry 0.005843633203040061
upon 0.005545300032552888
make 0.005391780686824741
take 0.00537353581562707
time 0.005030870790464942
----------------------------
weight: 0.15082019777253794
TOPIC 2
------------------------------
captain 0.006865463831587792
nautilus 0.005175561004431676
make 0.004910586984657019
hepzibah 0.004378298053191463
water 0.004063096964497903
take 0.003959626037381751
nemo 0.0037687537789531005
phoebe 0.0037683642100062313
pyncheon 0.003678496229955977
seem 0.0034594205003318193
----------------------------
weight: 0.19484786536753268
TOPIC 3
------------------------------
fogg 0.009552022075897986
rodney 0.008705705501603078
make 0.007016635545801613
take 0.00676049232003675
passepartout 0.006295907851484774
leave 0.005565220660514245
find 0.005077555215275536
time 0.004852923943330551
luke 0.004729546554304362
upon 0.004707181805179265
----------------------------
weight: 0.2581110568409608
TOPIC 4
------------------------------
dick 0.013754147765988699
thus 0.006231933402776328
ring 0.0052746290878481926
bear 0.005181637978658836
fate 0.004739983892853129
shall 0.0046221874997173906
hand 0.004610810387565958
stand 0.004121100025638923
name 0.0036093879729237
trojan 0.0033792362039766505
----------------------------
weight: 0.31363611105890865
从前述输出中,我们可以看到输入文档的主题为主题 5,权重最大为0.31363611105890865。该主题讨论了诸如 love、long、shore、shower、ring、bring、bear 等词汇。现在,为了更好地理解流程,以下是完整的源代码:
package ***.chapter11.SparkMachineLearning
import edu.stanford.nlp.process.Morphology
import edu.stanford.nlp.simple.Document
import org.apache.log4j.{ Level, Logger }
import scala.collection.JavaConversions._
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature._
import org.apache.spark.ml.linalg.{ Vector => MLVector }
import org.apache.spark.mllib.clustering.{ DistributedLDAModel, EMLDAOptimizer, LDA, OnlineLDAOptimizer }
import org.apache.spark.mllib.linalg.{ Vector, Vectors }
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ Row, SparkSession }
object topicModellingwithLDA {
def main(args: Array[String]): Unit = {
val lda = new LDAforTM() // actual ***putations are done here
val defaultParams = Params().copy(input = "data/docs/")
// Loading the parameters to train the LDA model
lda.run(defaultParams) // Training the LDA model with the default
parameters.
}
}
//Setting the parameters before training the LDA model
caseclass Params(input: String = "",
k: Int = 5,
maxIterations: Int = 20,
do***oncentration: Double = -1,
topi***oncentration: Double = -1,
vocabSize: Int = 2900000,
stopwordFile: String = "data/docs/stopWords.txt",
algorithm: String = "em",
checkpointDir: Option[String] = None,
checkpointInterval: Int = 10)
// actual ***putations for topic modeling are done here
class LDAforTM() {
val spark = SparkSession
.builder
.master("local[*]")
.config("spark.sql.warehouse.dir", "E:/Exp/")
.appName("LDA for topic modelling")
.getOrCreate()
def run(params: Params): Unit = {
Logger.getRootLogger.setLevel(Level.WARN)
// Load documents, and prepare them for LDA.
val preprocessStart = System.nanoTime()
val (corpus, vocabArray, actualNumTokens) = preprocess(params
.input, params.vocabSize, params.stopwordFile)
val actualCorpusSize = corpus.count()
val actualVocabSize = vocabArray.length
val preprocessElapsed = (System.nanoTime() - preprocessStart) / 1e9
corpus.cache() //will be reused later steps
println()
println("Training corpus summary:")
println("-------------------------------")
println("Training set size: " + actualCorpusSize + " documents")
println("Vocabulary size: " + actualVocabSize + " terms")
println("Number of tockens: " + actualNumTokens + " tokens")
println("Preprocessing time: " + preprocessElapsed + " sec")
println("-------------------------------")
println()
// Instantiate an LDA model
val lda = new LDA()
val optimizer = params.algorithm.toLowerCase match {
case "em" => new EMLDAOptimizer
// add (1.0 / actualCorpusSize) to MiniBatchFraction be more
robust on tiny datasets.
case "online" => new OnlineLDAOptimizer()
.setMiniBatchFraction(0.05 + 1.0 / actualCorpusSize)
case _ => thrownew IllegalArgumentException("Only em, online are
supported but got ${params.algorithm}.")
}
lda.setOptimizer(optimizer)
.setK(params.k)
.setMaxIterations(params.maxIterations)
.setDo***oncentration(params.do***oncentration)
.setTopi***oncentration(params.topi***oncentration)
.setCheckpointInterval(params.checkpointInterval)
if (params.checkpointDir.nonEmpty) {
spark.sparkContext.setCheckpointDir(params.checkpointDir.get)
}
val startTime = System.nanoTime()
//Start training the LDA model using the training corpus
val ldaModel = lda.run(corpus)
val elapsed = (System.nanoTime() - startTime) / 1e9
println("Finished training LDA model. Summary:")
println("Training time: " + elapsed + " sec")
if (ldaModel.isInstanceOf[DistributedLDAModel]) {
val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]
val avgLogLikelihood = distLDAModel.logLikelihood /
actualCorpusSize.toDouble
println("The average log likelihood of the training data: " +
avgLogLikelihood)
println()
}
// Print the topics, showing the top-weighted terms for each topic.
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)
println(topicIndices.length)
val topics = topicIndices.map {case (terms, termWeights) =>
terms.zip(termWeights).map { case (term, weight) =>
(vocabArray(term.toInt), weight) } }
var sum = 0.0
println(s"${params.k} topics:")
topics.zipWithIndex.foreach {
case (topic, i) =>
println(s"TOPIC $i")
println("------------------------------")
topic.foreach {
case (term, weight) =>
term.replaceAll("\\s", "")
println(s"$term\t$weight")
sum = sum + weight
}
println("----------------------------")
println("weight: " + sum)
println()
}
spark.stop()
}
//Pre-processing of the raw texts
import org.apache.spark.sql.functions._
def preprocess(paths: String, vocabSize: Int, stopwordFile: String): (RDD[(Long, Vector)], Array[String], Long) = {
import spark.implicits._
//Reading the Whole Text Files
val initialrdd = spark.sparkContext.wholeTextFiles(paths).map(_._2)
initialrdd.cache()
val rdd = initialrdd.mapPartitions { partition =>
val morphology = new Morphology()
partition.map {value => helperForLDA.getLemmaText(value,
morphology)}
}.map(helperForLDA.filterSpecialCharacters)
rdd.cache()
initialrdd.unpersist()
val df = rdd.toDF("docs")
df.show()
//Customizing the stop words
val customizedStopWords: Array[String] = if(stopwordFile.isEmpty) {
Array.empty[String]
} else {
val stopWordText = spark.sparkContext.textFile(stopwordFile)
.collect()
stopWordText.flatMap(_.stripMargin.split(","))
}
//Tokenizing using the RegexTokenizer
val tokenizer = new RegexTokenizer().setInputCol("docs")
.setOutputCol("rawTokens")
//Removing the Stop-words using the Stop Words remover
val stopWordsRemover = new StopWordsRemover()
.setInputCol("rawTokens").setOutputCol("tokens")
stopWordsRemover.setStopWords(stopWordsRemover.getStopWords ++
customizedStopWords)
//Converting the Tokens into the CountVector
val countVectorizer = new CountVectorizer().setVocabSize(vocabSize)
.setInputCol("tokens").setOutputCol("features")
val pipeline = new Pipeline().setStages(Array(tokenizer,
stopWordsRemover, countVectorizer))
val model = pipeline.fit(df)
val documents = model.transform(df).select("features").rdd.map {
case Row(features: MLVector) => Vectors.fromML(features)
}.zipWithIndex().map(_.swap)
//Returning the vocabulary and tocken count pairs
(documents, model.stages(2).asInstanceOf[CountVectorizerModel]
.vocabulary, documents.map(_._2.numActives).sum().toLong)
}
}
object helperForLDA {
def filterSpecialCharacters(document: String) =
document.replaceAll("""[! @ # $ % ^ & * ( ) _ + - − ,
" ' ; : . ` ? --]""", " ")
def getLemmaText(document: String, morphology: Morphology) = {
val string = new StringBuilder()
val value =new Document(document).sentences().toList.flatMap{a =>
val words = a.words().toList
val tags = a.posTags().toList
(words zip tags).toMap.map { a =>
val newWord = morphology.lemma(a._1, a._2)
val addedWoed = if (newWord.length > 3) {
newWord
} else { "" }
string.append(addedWoed + " ")
}
}
string.toString()
}
}
LDA 的可扩展性
上述示例展示了如何使用 LDA 算法作为独立应用程序进行主题建模。LDA 的并行化并不简单,已有许多研究论文提出了不同的策略。关键的障碍在于所有方法都涉及大量的通信。根据 Databricks 网站上的博客(databricks.***/blog/2015/03/25/topic-modeling-with-lda-mllib-meets-graphx.html),以下是实验过程中使用的数据集及相关训练集和测试集的统计信息:
-
训练集大小:460 万篇文档
-
词汇量:110 万个词条
-
训练集大小:110 亿个标记(约 239 个词/文档)
-
100 个主题
-
16 工作节点 EC2 集群,例如,M4.large 或 M3.medium,具体取决于预算和需求
对于前述设置,10 次迭代的平均时间结果为 176 秒/迭代。从这些统计数据可以看出,LDA 对于非常大规模的语料库也具有很好的可扩展性。
总结
在这一章中,我们提供了有关 Spark 中一些高级机器学习主题的理论和实践方面的内容。我们还提供了关于机器学习最佳实践的一些建议。接下来,我们展示了如何使用网格搜索、交叉验证和超参数调优来调优机器学习模型,以获得更好和更优化的性能。在后续部分,我们展示了如何使用 ALS 开发一个可扩展的推荐系统,这是一个基于模型的协同过滤方法的模型推荐系统示例。最后,我们展示了如何开发一个文本聚类技术的主题建模应用。
有兴趣了解更多机器学习最佳实践的读者,可以参考名为*《Spark 大规模机器学习》*的书籍,网址为www.packtpub.***/big-data-and-business-intelligence/large-scale-machine-learning-spark。
在下一章中,我们将进入 Spark 的更高级应用。尽管我们已经讨论并提供了二分类和多分类的对比分析,但我们将进一步了解其他 Spark 中的多项式分类算法,如朴素贝叶斯、决策树和一对多分类器(One-vs-Rest)。