其他测试框架 测试开发之路--Spark 之旅 (二):基础操作

孙高飞 · 2017年03月19日 · 最后由 孙高飞 回复于 2018年04月13日 · 3830 次阅读
本帖已被设为精华帖!

前言

上一次我们介绍了大数据的一些历史历程和基础。今天我们来讲一讲 spark 的基础吧。 首先 spark 支持 java,scala,R 和 python。大家一定对 java 或者 python 已经熟悉了。 不过我还是建议大家使用 python 或者 scala 来进行 spark 的开发。因为 spark 的 API 大量的使用了函数式编程。java 的语法真的不太适合搞 spark。

搭建学习环境

下载 spark

我使用的是 spark1.6.2。 下载地址:http://spark.apache.org/downloads.html

我们直接下载,然后解压。我们看看里面的目录

python-shell

我们运行 bin/pyspark 之后就进入了 spark 的 python shell。我们为了验证是否成功了,可以运行下面的代码

lines = sc.textFile("README.md")
print lines.first()

接下来就会看到打印出一条信息:# Apache Spark。 spark 提供的 python shell 是我们良好的学习平台。我们可以在里面随意的调用 spark 提供的 API。

IDE 环境

可能有些同学已经习惯了 IDE 带来的好处 (例如我),所以也希望能通过 IDE 来进行学习和开发。 但是 spark 并没有提供任何 python 模块给我们下载使用, 也就是说,你无法通过 pip install 的方式下载 spark 模块。 这一点就不如 java 和 scala 了,maven 是可以直接集成 spark 的。 所以我们要做一点额外的事情以让 pycharm 能够拥有开发 spark 程序的能力。

  1. 在 pycharm 找到 Project Structure 把解压的目录中的 python 目录加进去

  2. 添加 run-->Edit configurations。 添加一个运行配置。并配置 SPARK_HOME 环境变量为解压目录。然后配置 PYTHONPATH 环境变量为解压目录中的 python 目录。

然后各位就可以在 pycharm 上编写 spark 代码并运行了。

"""SimpleApp"""

from pyspark import SparkContext

logFile = "/Users/sungaofei/Documents/spark/README.md"
sc = SparkContext("local","Simple App")
logData = sc.textFile(logFile).cache()

numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
temp = logData.first()

print temp

print("Lines with a: %i, lines with b: %i"%(numAs, numBs))

从 demo 中学习

"""SimpleApp"""

from pyspark import SparkContext

logFile = "/Users/sungaofei/Documents/spark/README.md"
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)
logData = sc.textFile(logFile).cache()

numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
temp = logData.first()

print temp

print("Lines with a: %i, lines with b: %i"%(numAs, numBs))

在全局的 level,spark 的应用是由一个驱动程序在集群中发起并行的多个操作。这个驱动程序包含了你的应用的 main 函数并定义了你的分布式数据集合。在上面的例子中,我们可以理解 SparkContext 对象就是 spark 的驱动。它负责连接集群,我们用local模式告诉 spark 我们使用本地集群模式以方便我们学习和调试。在你有了这个驱动之后,我们就可以随意的创建 RDD 了,RDD 是 Spark 的分布式数据集合的定义,我们暂时只需要知道它是存储数据的地方,之后会详细说明一下。 sc.textFile() 是从一个文本中读取数据并转换为 RDD 的方法。当我们有了 RDD 之后,就可以随意调用 spark 提供给我们的方法。例如上面例子中的 filter(熟悉 python 的朋友一定觉得这个方法很熟悉) 以及 count 方法。 在我们上面的操作中。 驱动程序会在集群中管理一定数量的 executor。 例如当我们调用 count() 方法的时候。集群中不同的机器会各自读取这个文件中的一部分并分别计算各自的行数,当然了现在我们使用的是 local 模式。所以我们的应用是运行在单机上的。整个过程差不多是下面这个样子的。

在这个 demo 中,我们是可以看到 spark 是支持函数式编程的,大部分的方法都要求传递一个函数进去。例如上面的 filter 方法。这是一个过滤函数,上面的 demo 中我们分别取包含字母 a 和 b 的行。熟悉 python 的小伙伴一定对 lambda 表达式不陌生了。

RDD 基础

上面提到过 RDD,它是 spark 定义的固定不变的分布式数据集合

  1. 说它固定不变是因为它一经创建后你就无法改变它的内容了。你只能通过当前的 RDD 调用一些方法来生成新的 RDD,但是你永远都无法真正改变一个 RDD 的数据。例如刚才的 demo,我们调用 filter 方法过滤掉一些数据,但我们并没有改变原有 RDD 的数据,你在其他地方调用原 RDD 的时候仍然是全量的未经过滤的数据。 filter 方法返回的是一个新的 RDD
  2. 说它是分布式数据集合是因为每一个 RDD 都由多个 partitions(分片) 组成。上一篇我们讲到 HDFS,所以首先数据是分布在不同的机器上的。在 spark 读取数据的时候会根据一定的规则 (可以是默认 64M 一个 partition,也可以指定 partition 数量)。 这些分布在不同 partition 也就是数据分片组成了 RDD。spark 在运行的时候,每个 partition 都会生成一个 task。他们会跑在不同的计算资源上。我们知道 java 中万物皆对象,在 spark 中所有数据皆 RDD。可以说 RDD 就是 spark 的一切,就如 MapReduce 就是 Haddop 的一切一样。
我们可以使用两种方式创建 RDD
  1. 通过 sc.textFile() 从外部文件中读取。就如我们的 demo 一样
  2. 通过从一个集合中初始化一个 RDD。如下:
lines = sc.parallelize(["pandas", "i like pandas"])

transformations and actions

RDD 支持两种操作,transformation 和 action。ransformation 的操会返回一个新的 RDD,就如我们在 demo 中看到的 filter() 方法,是一种组织和准备数据的方式。为之后的 action 执行计算提供数据基础。action 的操作是真正产生一个计算操作的过程。例如 demo 中的 count()。 action 不会返回一个 RDD,它会返回实际的操作结果或者将数据保存到外部文件中。spark 提供了很多函数,如果你分不清哪些是 transformation 哪些是 action。 只要看它的返回值就好了。返回一个新 RDD 的就是 transformation,不返回的就是 action。 区分一个函数是哪种操作很重要,因为 spark 处理这两种操作的方式很不一样。

transformation

transformation 是一种返回一个新的 RDD 的方法。它遵循延迟计算的规则。也就是说 spark 在运行的时候遇到 transformation 的时候并不会真正的执行它,直到碰到一个 action 的时候才会真正的执行。我们稍后会专门讨论延迟计算的规则。这里我们知道有这个概念就好。大部分 transformation 都是按行元素处理,就是说他们同一时间只处理一行数据 (有少数 transformation 不是的)。就像上面说的,spark 大部分的函数都是函数式编程,要求我们传递一个函数作为参数。那么所有 transformation 都是需要传递至少一个函数作为参数的, 这个参数就是我们指定的如何处理数据的逻辑。spark 会将数据拆成一行一行的并作为参数调用我们指定的函数。就如 demo 中的 filter,spark 会将 RDD 的每一行作为参数传递给我们自定义的函数。

action

像之前说的 RDD 可以使用很多的 transformation 来组织和准备数据,但是光准备数据还是不行得,我们终究要用数据计算一些东西,这时候就需要我们的 action,就如我们 demo 中的 count() 用来计算数据的行数. 我们还可以使用 frist() 取出第一条数据,用 take(n) 来取出前 n 条数据,saveAsTextFile() 用来把数据存储到外部文件。也就是说 action 是我们真正使用数据来进行计算的方式,真正实现数据的价值的方式。

延迟计算

之前提到过,transformation 的操作是延迟计算的。意思是说 spark 在运行的时候,运行到 transformation 的时候实际上并不会真正的执行 transformations。直到碰到了这个 RDD 的 action 的时候,才会一股脑的执行之前所有的操作。也许这对刚接触大数据处理的同学来说有点难以理解,但如果我们仔细的想一想就会发现其实这样的设计相当的合理。 因为我们在实际情况中面对的是非常庞大的数据。如果我们在一开始就执行所有的数据操作并将数据载入内存中那将是一种很大的浪费。例如在 demo 中,如果我们使用的不是 count 这种操作全部数据的方式而是使用了 first() 或者 take(n) 这种只取了一部分数据的操作。那么事先就执行 transformation 的操作并将所有数据载入内存的话,那将是极大的浪费。所以取而代之的,spark 在每次遇到 transformation 的时候并不会立刻执行,而是通过一些元数据记录 RDD 的操作轨迹,在遇到 action 的时候再推断出最优的解决方案。

常见的 transformation

map

除了我们在 demo 中看到的 filter() 方法来过滤数据,我们还可以使用 map() 这种 MapReduce 时代保留下来的函数。看下面的 demo

nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect() for num in squared:
print "%i " % (num)

刚才我们说大部分 spark 的 transformation 是单行处理的。所以当我们把 lambda 定义的匿名函数传递给 map 的时候。 map() 会把数据中的每一行取出来作为参数进行调用。它和 filter 的区别可以用下图来表示。

flatMap()

与 map() 很相似的一个方法是 flatMap()。map 的操作是处理每一行的同时,返回的也是一行数据。 flatMap 不一样,它返回的是一个可迭代的对象。也就是说 map 是一行数据转换成一行数据,flatMap 是一行数据转换成多行数据。例如下面的 demo

lines = sc.parallelize(["hello world", "hi"]) 
words = lines.flatMap(lambda line: line.split(" ")) 
words.first() # returns "hello"
words.count() # returns 3

下图可以表示 map 和 flatMap 的区别

集合操作


上图表示了 4 种集合操作。

action

reduce

最常用的 action 操作是我们在 MapReduce 时期就熟悉 reduce 操作,此操作是一个聚合方法。demo 如下:

rdd = sc.parallelize([1,2,3,4,5])
sum = rdd.reduce(lambda x, y: x + y)

reduce 接受一个函数当做参数,而这个函数也接受两个参数 x 和 y。 这俩个参数代表着 RDD 中的两行,reduce 是聚合函数。 它会不断的将之前计算出的两行传递给函数进行聚合计算。上面 demo 中的 sum 为 15.因为 reduce 做了一个累加的操作。

其他

此外还有我们早就见过的 count(),以及一些其他的例如:

  • collect():返回 RDD 中所有的数据
  • countByValue():统计每一个 value 出现的次数
  • take(n):取出前 N 行数据
  • foreach:循环 RDD 中的每一行数据并执行一个操作

persist

我们上面说过从性能上考虑 RDD 是延迟计算的,每遇到一个 action 都会从头开始执行。这样是不够的,因为有的时候我们需要重复使用一个 RDD 很多次。如果这个 RDD 的每一个 action 都要重新载入那么多的数据,那也是很蛋疼的。 所以 spark 提供了 persist 函数来让我们缓存 RDD。

lines = sc.parallelize(["hello world", "hi"]) 
a = lines.flatMap(lambda line: line.split(" ")).persist() 
a.count()
a.take(10)

上面我们使用 persist 函数缓存了 RDD。所以再调用 count() 和 take() 的时候,spark 并没有重新执行一次 RDD 的 transformation。spark 有很多缓存的级别。可以参考下面的图表

可以使用 persist(storageLevel='MEMORY_AND_DISK'),像这样的方式指定缓存级别。 默认是 MEMORY_ONLY。

总结

spark 的一些基础就讲的差不多了。 知道这些基本的知识基本上就可以写写 demo 了。 之后的帖子我会讲讲 spark sql,键值对的 RDD,shuffle 等方面的东西。 可能有的同学觉得学这些没多大用,我们测试用不着这些。 恩,一般的测试类型确实用不着。 我讲这些也是给数据测试和人工智能测试做个铺垫。 因为大数据是这些测试类型的基础。

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
共收到 15 条回复 时间 点赞
ABEE ycwdaaaa (孙高飞) 在 TesterHome 的发帖整理 中提及了此贴 01月12日 13:47
孙高飞 专栏文章:大数据测试场景科普 -- 流计算篇 (上) 中提及了此贴 12月17日 20:46
Nisir 回复

加你了~

孙高飞 回复

飞哥,不知道有没有兴趣加个微信(我的微信:wz63650406)。 关注你也比较久了,我之前一直做大数据组件方面(azkaban、spark 等)的测试,可能马上要去阿里那边做数据算法方面的测试了,有机会想跟你多交流交流。感觉很多关于测试的观点、看法跟你都非常相似。

恒温 回复

赞,补充的很全面

这里有几个坑要踩:

  1. pip 的源,这里要设置下,否则好慢 https://blog.csdn.net/furzoom/article/details/53897318
  2. 如果你使用的是 java9,降级到 java8 吧
  3. 如果你用 pycharm,那么还得再配置个 java8 的 JAVA_HOME

所以总的来说,如果你在 mac 上的下载了,spark 2.3,然后你也用 pycharm 的话,那么:

  1. 先创建个项目
  2. 在这个项目的 project interpreter 里找到 pip,给他换个源:https://pypi.tuna.tsinghua.edu.cn/simple/
  3. 然后安装 pyspark
  4. 然后再安装 java8,装好之后,给 pycharm 加环境变量,只要加个 JAVA_HOME 指向 java8 就可以了。
  5. 然后就可以用了。

否则你会遇到下面的异常:

/Users/lihuazhang/code/spark-learn/one/venv/bin/python /Users/lihuazhang/code/spark-learn/one/SimpleApp.py
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/Users/lihuazhang/code/spark-learn/one/venv/lib/python2.7/site-packages/pyspark/jars/hadoop-auth-2.7.3.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2018-03-25 00:44:51 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2018-03-25 00:44:51 WARN  Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Traceback (most recent call last):
  File "/Users/lihuazhang/code/spark-learn/one/SimpleApp.py", line 9, in <module>
    numAs = logData.filter(logData.value.contains('a')).count()
  File "/Users/lihuazhang/code/spark-learn/one/venv/lib/python2.7/site-packages/pyspark/sql/dataframe.py", line 455, in count
    return int(self._jdf.count())
  File "/Users/lihuazhang/code/spark-learn/one/venv/lib/python2.7/site-packages/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/lihuazhang/code/spark-learn/one/venv/lib/python2.7/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/Users/lihuazhang/code/spark-learn/one/venv/lib/python2.7/site-packages/py4j/protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o35.count.
: java.lang.IllegalArgumentException
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:449)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:432)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:432)
    at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:262)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:261)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:261)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2066)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
    at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770)
    at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
    at org.apache.spark.sql.Dataset.count(Dataset.scala:2769)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.base/java.lang.Thread.run(Thread.java:844)


Process finished with exit code 1


https://stackoverflow.com/questions/34685905/how-to-link-pycharm-with-pyspark
更新下,现在的 pycharm 的配置方式有些变化。

孙高飞 回复

数据测试行内积累太少了,基本只能跟着开发走自己摸索,唉

高产 tester 啊

好文,收藏

—— 来自 TesterHome 官方 安卓客户端

思寒_seveniruby 将本帖设为了精华贴 03月21日 00:54

汗颜,学习文章的速度跟不上 LZ 更新的速度。还得 fighting 啊!

天琴圣域 回复

没有。。。。测试类型不一样。。。我这边是做数据测试。所以才用这玩意。你不做这种测试就随便看看了解一下就行了。。。

😂 天哪噜,测试开发这么难啊😂 ,spark 都玩上了

恒温 回复

额,最近在家比较闲

看的速度都跟不上你研发的速度了。

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册