Spark 学习日记(初探)

# 历史

# hadoop 1.x

HDFS:用 NameNode 管理 DataNode。 Map-Reduce:用 JobTracker 管理(调度) TaskTracker

# hadoop 1.x Map-Reduce 的缺点

Map-Reduce 是基于数据集的计算,是面向数据的。

  1. 基本的运算规则从介质中获取,计算后再存储到介质中。所以主要应用于一次性计算。在当前的大数据环境中是无法接受的。不适合数据挖掘和机器学习这样的迭代计算。
  2. 基于文件,IO 效率慢。
  3. 和 hadoop 紧密耦合,无法替换。

# hadoop 2.x Yarn 的出现

RM(ResourceManager) NM(NodeManager) AM(ApplicationMaster)

RM -> NM -> Container -> Task RM -> AM -> Driver -> Task

UTOOLS1592321344280.png

# Spark 的出现是为了改善 hadoop 1.x 的不足

基于内存,使用 Scala 语言,适合迭代计算

UTOOLS1592321615415.png

HDFS + Yarn + Spark

# Spark 模块

  • Spark Core
  • Spark SQL
  • Spark Streaming
  • Spark MLib
  • Spark GraphX

# SBT 管理 jar 包依赖

可以使用下面的方式来定义一个依赖,其中 groupId,artifactId 和 revision 都是字符串:

libraryDependencies += groupId % artifactId % revision
1

使用国内镜像加速:

创建修改 ~/.sbt/repositories 加入阿里的镜像

[repositories]
local
aliyun: https://maven.aliyun.com/repository/public
1
2
3

# Spark 统计单词数量

新建一个 sbt 项目

sbt:

name := "spark"

version := "0.1"

scalaVersion := "2.11.12"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.6"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.6"
1
2
3
4
5
6
7
8

input:

Hello World
Hello Scala
Hello Spark
1
2
3

SparkWordCount:

import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCount {
  def main(args: Array[String]) {
    val logFile = "input" // Should be some file on your system
    val conf = new SparkConf()
    conf.setAppName("SparkWordCount") // 设置应用名称
    conf.setMaster("local[1]") // 设置本地模式,1个进程
    val sc = new SparkContext(conf)
    val data = sc.textFile(logFile, 2).cache()
    val words = data.flatMap(_.split(" "))
    val wordOnce = words.map((_, 1))
    val wordWithCountList = wordOnce.reduceByKey((a, b) => a + b)
    wordWithCountList.foreach(println)
    sc.stop()
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

输出:

(Hello,3)
(World,1)
(Scala,1)
(Spark,1)
1
2
3
4

# Spark 和 Yarn 联合使用

在 spark 目录中修改 spark-env.sh

添加 YARN_CONF_DIR=/xxx/hadoop/etc/hadoop