Skip to content

Spark 学习日记(初探)

历史

hadoop 1.x

HDFS:用 NameNode 管理 DataNodeMap-Reduce:用 JobTracker 管理(调度) TaskTracker

hadoop 1.x Map-Reduce 的缺点

Map-Reduce 是基于数据集的计算,是面向数据的。

  1. 基本的运算规则从介质中获取,计算后再存储到介质中。所以主要应用于一次性计算。在当前的大数据环境中是无法接受的。不适合数据挖掘和机器学习这样的迭代计算。
  2. 基于文件,IO 效率慢。
  3. hadoop 紧密耦合,无法替换。

hadoop 2.x Yarn 的出现

  • RM(ResourceManager)

  • NM(NodeManager)

  • AM(ApplicationMaster)

  • RM -> NM -> Container -> Task

  • RM -> AM -> Driver -> Task

UTOOLS1592321344280.png

Spark 的出现是为了改善 hadoop 1.x 的不足

基于内存,使用 Scala 语言,适合迭代计算

UTOOLS1592321615415.png

HDFS + Yarn + Spark

Spark 模块

  • Spark Core
  • Spark SQL
  • Spark Streaming
  • Spark MLib
  • Spark GraphX

SBT 管理 jar 包依赖

可以使用下面的方式来定义一个依赖,其中 groupId,artifactId 和 revision 都是字符串:

ini
libraryDependencies += groupId % artifactId % revision
libraryDependencies += groupId % artifactId % revision

使用国内镜像加速:

创建修改 ~/.sbt/repositories 加入阿里的镜像

ini
[repositories]
local
aliyun: https://maven.aliyun.com/repository/public
[repositories]
local
aliyun: https://maven.aliyun.com/repository/public

Spark 统计单词数量

新建一个 sbt 项目

sbt:

ini
name := "spark"

version := "0.1"

scalaVersion := "2.11.12"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.6"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.6"
name := "spark"

version := "0.1"

scalaVersion := "2.11.12"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.6"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.6"

input:

ini
Hello World
Hello Scala
Hello Spark
Hello World
Hello Scala
Hello Spark

SparkWordCount:

scala
import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCount {
  def main(args: Array[String]) {
    val logFile = "input" // Should be some file on your system
    val conf = new SparkConf()
    conf.setAppName("SparkWordCount") // 设置应用名称
    conf.setMaster("local[1]") // 设置本地模式,1个进程
    val sc = new SparkContext(conf)
    val data = sc.textFile(logFile, 2).cache()
    val words = data.flatMap(_.split(" "))
    val wordOnce = words.map((_, 1))
    val wordWithCountList = wordOnce.reduceByKey((a, b) => a + b)
    wordWithCountList.foreach(println)
    sc.stop()
  }
}
import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCount {
  def main(args: Array[String]) {
    val logFile = "input" // Should be some file on your system
    val conf = new SparkConf()
    conf.setAppName("SparkWordCount") // 设置应用名称
    conf.setMaster("local[1]") // 设置本地模式,1个进程
    val sc = new SparkContext(conf)
    val data = sc.textFile(logFile, 2).cache()
    val words = data.flatMap(_.split(" "))
    val wordOnce = words.map((_, 1))
    val wordWithCountList = wordOnce.reduceByKey((a, b) => a + b)
    wordWithCountList.foreach(println)
    sc.stop()
  }
}

输出:

ini
(Hello,3)
(World,1)
(Scala,1)
(Spark,1)
(Hello,3)
(World,1)
(Scala,1)
(Spark,1)

Spark 和 Yarn 联合使用

在 spark 目录中修改 spark-env.sh

添加 YARN_CONF_DIR=/xxx/hadoop/etc/hadoop

最后编辑时间:

Version 4.0 (framework-1.0.0-rc.20)