参考资料:

知识图谱:

简介

Spark的官方介绍是:Apache Spark is a fast and general-purpose cluster computing system。这体现了Spark两个关键的特点:快速和通用。快速是指Spark与Hadoop相比具有可以在内存内快速处理数据的特点;通用是指Spark是一个大一统的软件栈,能使用一套API完成以前需要多个平台才能完成的任务,如:交互查询(Spark Shell),离线批量数据处理(RDD、Spark SQL),在线的流式处理(Spark Streaming),以及在大型数据集上运行迭代算法(ML任务)。

从工程角度看Spark具有下面一下特点:

Spark能完成的一些任务:

基本概念

开发环境配置

详细参考官方文档,这里主要介绍Yarn模式

配置

配置上面提到的几种模式,这里重点提一下可以设置的地方。参考配置文档。一个技巧,可以在spark-submit时加上—verbose来打印参数的来源或者在Spark Web UI上查看使用的设置。

启动

./bin/spark-submit \
  --master yarn \
  --deploy-mode client \  # 或者cluster
  --executor-memory 20G \
  --num-executors 50 \
  --queue spark-default # Yarn模式下 指定队列
  --conf <key>=<value>
  --jars PATH/utils.jar # 制定了第三方的jar包
  --class org.apache.spark.examples.SparkPi \ # 指定jar包中运行的Main Class
  /path/to/examples.jar \ #<application-jar>
  1000 #[application-arguments]

配置的介绍

基本API

RDD

RDD(Resilient Distributed Dataset)是不可变的分布式对象集合。是Spark最重要的概念,是对分布式数据的一个抽象表示。重点理解:

创建RDD

RDD的分类(重点)

核心API

参考官方API手册

技巧:由于Scala的隐式转换特性,查找API文档时需要注意,对于RDD的一下操作,除了RDD中通用操作,还有一下其他的类需要查看XXRDDFunctions相关的类,如DoubleRDDFunctionsPairRDDFunctions

核心概念(重点)

注意点

  1. 回调函数引用问题:RDD的操作函数中有大量的传入回调函数的情况(如map),由于回调函数会发送到Executor中去运行,所以需要注意回调函数的引用外层对象this的问题,会出现NotSerializableException的错误。参考文档,总结一下,推荐使用下面两种形式:
    • 使用匿名回调函数作为参数
    • 使用object(全局对象)中的函数作为参数
    • 注意:函数中如果引用类的成员变量,请先copy到层本地变量,如: val field_ = this.field
  2. map与mapValues对分区的影响
  3. 使用rdd.coalesce的一个坑:如果我们的代码类似rdd.map(xxx)....coalesce(1).write...这样的操作吧结果输出到一个文件中,活导致先合并在执行map操作的问题。这是由于spark的stage合并机制导致的,参考文章.
  4. 如同类型的RDD之间的隐式转化:在spark1.3.0之后版本,无需使用import org.apache.spark.SparkContext._,RDD会自动转换转换对应XXRDDFunctions。

I/O方式

讨论一下Spark读入文件的方式。一般我们需要的处理的数据都是位于某个文件系统上的某个(些)文件。Spark可以访问下面这些数据。

一个Q&A:哪些接口是基于原始接口(hadoopfile)构造出来的高层接口?

共享变量

累加器

广播变量

Spark SQL

理解RDD DataFrame DataSet几种API的本质区别

核心API

I/O方式

常见需求最佳实践

  1. 活用UDF:对于一些自定义要求很高的需求,我们最常见的思路是:使用UDF解决。通常配合下面的函数

    • withColumn方法:新增一列,可以使用udf对多个列进行计算,然后生成新列。df.withColumn("new_col",my_udf(col("col1"),col(col2)))
    • select方法:提取某些列,然后使用udf计算,不保留原来的表df.select(my_udf(col("col1"),col("col2")) as "my_name", col("other_col"))
    • filter方法:比较复杂的过滤。df.filter(my_udf(col("col1"),col("col2")))。这里的my_udf返回True/False
    • 注意:groupBy之后暂时还不能用UDF,应当使用UDAF。
  2. 单表group操作:应当注意的是表的GroupBy操作,返回的是GroupedData对象,不是DataFrame。后面必须接上聚合操作才能返回聚合后的DataFrame:

    • 常见操作,只计算一个值。df.groupBy("col1","col2").count/max/min/sum/mean()
    • 复杂些的,计算多个值:df.groupBy("col1","col2").agg()
      • df.groupBy("col1","col2").agg(max("age"),sum("expense"))
      • df.groupBy("col1","col2").agg(Map("age"-> "max", "expense" ->"sum"))
    • 自定义agg操作:UDAF,新功能。
  3. 多表join(filter in another table):df1.join(df2,condition_express,"type_string")

    • join的三种方式。参考Beyond traditional join with Apache Spark
      • 默认只包含公共的key:inner
      • left_outer/right_outer
      • fullouter
    • join之后的列名字
      • 先重命名withColumnRenamed,再join
      • 直接显示指定df1("col")==df2("col")
    • 空值问题==>DataFrame的na方法
    • 多列相等的join
    // 显示指定col	 
    val res = df1.join(df2,df1("uid") === df2
          ("uid") && df1("time") === df2
          ("time"),"left_outer").
          na.fill(Map(
          "click_cnt" ->0
        )).drop(df2("uid"))
      
    
  4. 寻找不在另外一个表中的行(filter not in another table)

    • df1.join( df2.select($"id".alias("id_")), $"id" === $"id_", "leftouter") .where($"id_".isNull).drop("id_")
    • 如果另外一个集合比较小,可以考虑用filter配合UDF过滤set(可能还需要broadcast)
  5. 空值处理:DataFrame的na方法

    • fill
    • drop
    • replace
  6. 使用lit方法生成常量列,方便统计一些值。

  7. 同时Group多组col,并统计行数(count操作)

    • 问题:使用groupby方法只能对一组col进行group操作,如'name'&&'age',如果要同时对'name'&&'sex'进行group,这需要再对df进行一次group。如何对df进行一次操作就可以完成多组的group并输出结果呢?

Spark Streaming

简单介绍一下,以后详细介绍

其他

性能优化的基本技巧

总结

至此,简单地介绍了Spark的配置和常用API,用来解决遇到的常见问题。其中配置部分我们很少关心,毕竟在业务开发中我们往往使用公司配置好了的Spark环境,但是调优和解决问题往往能从中入手,是理解Spark的重要一环,这个我会在了解Spark的内部原理之后再详细说明配置项的含义。API编程层面主要是RDD与DataSet/DataFrame的操作,熟悉常用的function工具与一下场景实践,同时介绍了Shuffle,Partition和Cache的概念。最后简单提了一下Spark Streaming性能优化。后期,工程方面我会在这几个方面深入研究: