分类目录归档:大数据

spark rdd 学习笔记

1. RDD:Resilient Distributed Dataset 弹性 分布式 数据集

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging

1)RDD是一个抽象类
2)带泛型的,可以支持多种类型: String、Person、User

RDD:Resilient Distributed Dataset 弹性 分布式 数据集

Represents an

    immutable:不可变
    partitioned collection of elements :分区
        Array(1,2,3,4,5,6,7,8,9,10)  3个分区: (1,2,3) (4,5,6) (7,8,9,10)
    that can be operated on in parallel: 并行计算的问题

2. RDD的特性

Internally, each RDD is characterized by five main properties:
- A list of partitions
    一系列的分区/分片

- A function for computing each split/partition
    y = f(x)
    rdd.map(_+1)  

- A list of dependencies on other RDDs
    rdd1 ==> rdd2 ==> rdd3 ==> rdd4
    dependencies: *****

    rdda = 5个partition
    ==>map
    rddb = 5个partition

- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

- Optionally, a list of preferred locations to compute each split on (e.g. 
block locations for an HDFS file)
数据在哪优先把作业调度到数据所在的节点进行计算:移动数据不如移动计算
为什么location有s?

五大特性源码体现:

def compute(split: Partition, context: TaskContext): Iterator[T] 特性二
def getPartitions: Array[Partition]  特性一
def getDependencies: Seq[Dependency[_]] = deps  特性三
def getPreferredLocations(split: Partition): Seq[String] = Nil  特性五
val partitioner: Option[Partitioner] = None  特性四

3. RDD 常用基础算子

RDD Operation
    transformations: create a new dataset from an existing one
        RDDA ---transformation--> RDDB

        y = f(x)
        rddb = rdda.map(....)

        lazy(*****)

        rdda.map().filter()......collect

        map/filter/group by/distinct/.....

    actions: 
        return a value to the driver program after running a computation on the dataset
        count/reduce/collect......

    1) transformation are lazy, nothing actually happens until an action is called;
    2) action triggers the computation;
    3) action returns values to driver or writes data to external storage;

map: 
    map(func)
    将func函数作用到数据集的每一个元素上,生成一个新的分布式的数据集返回

    word => (word,1)

filter:
    filter(func)
    选出所有func返回值为true的元素,生成一个新的分布式的数据集返回

flatMap
    flatMap(func)
    输入的item能够被map到0或者多个items输出,返回值是一个Sequence

groupByKey:把相同的key的数据分发到一起
    ['hello', 'spark', 'hello', 'world', 'hello', 'world']
    ('hello',1) ('spark',1)........

reduceByKey: 把相同的key的数据分发到一起并进行相应的计算
     mapRdd.reduceByKey(lambda a,b:a+b)
     [1,1]  1+1
     [1,1,1]  1+1=2+1=3
     [1]    1

union:

join: 
    inner join
    outer join:left/right/full

4. 代码实践

# 我爱 Spark 因为它能够快速处理大量数据
# 配置远程登录 https://www.jianshu.com/p/2f84e9fddf91 
import pyspark 
sc = pyspark.SparkContext('local[*]')
!cat hello.txt
hello hello spark spark
hello hello spark
java python spark
python python flask
python pipy flask
rdd = sc.textFile('hello.txt') # 支持读取文件夹,模糊匹配
# 单词计数 wordCount
rdd.flatMap(lambda x:x.split(' ')).map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b).collect()
[('java', 1),
 ('python', 4),
 ('pipy', 1),
 ('hello', 4),
 ('spark', 4),
 ('flask', 2)]
# 如何提交任务
# ./spark-submit --master local[2] --name spark_app_name app.py argv[0] argv[1] ..
# 根据 value 排序
rdd.flatMap(lambda x:x.split(' ')).map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b)\
.map(lambda x:(x[1],x[0])).sortByKey().map(lambda x:(x[1],x[0])).collect()
[('java', 1),
 ('pipy', 1),
 ('flask', 2),
 ('python', 4),
 ('hello', 4),
 ('spark', 4)]
# 根据 value 排序并且取出前五名
rdd.flatMap(lambda x:x.split(' ')).map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b)\
.map(lambda x:(x[1],x[0])).sortByKey().map(lambda x:(x[1],x[0])).take(5)
[('java', 1), ('pipy', 1), ('flask', 2), ('python', 4), ('hello', 4)]
!cat id2age.txt
3 96
4 44
5 67
6 4
7 98
rdd = sc.textFile('id2age.txt')
# 计算顾客平均年龄
age_data = rdd.map(lambda x:int(x.split(" ")[1]))
sum_age = age_data.reduce(lambda a,b:a+b) # reduce is a action 
people_num = rdd.count()
print(sum_age,people_num,sum_age/people_num)
309 5 61.8