其他测试框架 测试开发之路--Spark 之旅 (二):基础操作

ycwdaaaa · 2017年03月19日 · 最后由 ycwdaaaa 回复于 2018年04月13日 · 1009 次阅读
本帖已被设为精华帖!

前言

上一次我们介绍了大数据的一些历史历程和基础。今天我们来讲一讲spark的基础吧。 首先spark支持java,scala,R和python。大家一定对java或者python已经熟悉了。 不过我还是建议大家使用python或者scala来进行spark的开发。因为spark的API大量的使用了函数式编程。java的语法真的不太适合搞spark。

搭建学习环境

下载spark

我使用的是spark1.6.2。 下载地址:http://spark.apache.org/downloads.html

我们直接下载,然后解压。我们看看里面的目录

python-shell

我们运行bin/pyspark之后就进入了spark的python shell。我们为了验证是否成功了,可以运行下面的代码

lines = sc.textFile("README.md")
print lines.first()

接下来就会看到打印出一条信息:# Apache Spark。 spark提供的python shell是我们良好的学习平台。我们可以在里面随意的调用spark提供的API。

IDE环境

可能有些同学已经习惯了IDE带来的好处(例如我),所以也希望能通过IDE来进行学习和开发。 但是spark并没有提供任何python 模块给我们下载使用, 也就是说,你无法通过pip install的方式下载spark模块。 这一点就不如java和scala了,maven是可以直接集成spark的。 所以我们要做一点额外的事情以让pycharm能够拥有开发spark程序的能力。

  1. 在pycharm找到Project Structure 把解压的目录中的python目录加进去

  2. 添加run-->Edit configurations。 添加一个运行配置。并配置SPARK_HOME环境变量为解压目录。然后配置PYTHONPATH环境变量为解压目录中的python目录。

然后各位就可以在pycharm上编写spark代码并运行了。

"""SimpleApp"""

from pyspark import SparkContext

logFile = "/Users/sungaofei/Documents/spark/README.md"
sc = SparkContext("local","Simple App")
logData = sc.textFile(logFile).cache()

numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
temp = logData.first()

print temp

print("Lines with a: %i, lines with b: %i"%(numAs, numBs))

从demo中学习

"""SimpleApp"""

from pyspark import SparkContext

logFile = "/Users/sungaofei/Documents/spark/README.md"
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)
logData = sc.textFile(logFile).cache()

numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
temp = logData.first()

print temp

print("Lines with a: %i, lines with b: %i"%(numAs, numBs))

在全局的level,spark的应用是由一个驱动程序在集群中发起并行的多个操作。这个驱动程序包含了你的应用的main函数并定义了你的分布式数据集合。在上面的例子中,我们可以理解SparkContext对象就是spark的驱动。它负责连接集群,我们用local模式告诉spark我们使用本地集群模式以方便我们学习和调试。在你有了这个驱动之后,我们就可以随意的创建RDD了,RDD是Spark的分布式数据集合的定义,我们暂时只需要知道它是存储数据的地方,之后会详细说明一下。 sc.textFile()是从一个文本中读取数据并转换为RDD的方法。当我们有了RDD之后,就可以随意调用spark提供给我们的方法。例如上面例子中的filter(熟悉python的朋友一定觉得这个方法很熟悉)以及count方法。 在我们上面的操作中。 驱动程序会在集群中管理一定数量的executor。 例如当我们调用count()方法的时候。集群中不同的机器会各自读取这个文件中的一部分并分别计算各自的行数,当然了现在我们使用的是local模式。所以我们的应用是运行在单机上的。整个过程差不多是下面这个样子的。

在这个demo中,我们是可以看到spark是支持函数式编程的,大部分的方法都要求传递一个函数进去。例如上面的filter方法。这是一个过滤函数,上面的demo中我们分别取包含字母a和b的行。熟悉python的小伙伴一定对lambda表达式不陌生了。

RDD基础

上面提到过RDD,它是spark定义的固定不变的分布式数据集合

  1. 说它固定不变是因为它一经创建后你就无法改变它的内容了。你只能通过当前的RDD调用一些方法来生成新的RDD,但是你永远都无法真正改变一个RDD的数据。例如刚才的demo,我们调用filter方法过滤掉一些数据,但我们并没有改变原有RDD的数据,你在其他地方调用原RDD的时候仍然是全量的未经过滤的数据。 filter方法返回的是一个新的RDD
  2. 说它是分布式数据集合是因为每一个RDD都由多个partitions(分片)组成。上一篇我们讲到HDFS,所以首先数据是分布在不同的机器上的。在spark读取数据的时候会根据一定的规则(可以是默认64M一个partition,也可以指定partition数量)。 这些分布在不同partition也就是数据分片组成了RDD。spark在运行的时候,每个partition都会生成一个task。他们会跑在不同的计算资源上。我们知道java中万物皆对象,在spark中所有数据皆RDD。可以说RDD就是spark的一切,就如MapReduce就是Haddop的一切一样。
我们可以使用两种方式创建RDD
  1. 通过sc.textFile()从外部文件中读取。就如我们的demo一样
  2. 通过从一个集合中初始化一个RDD。如下:
lines = sc.parallelize(["pandas", "i like pandas"])

transformations and actions

RDD支持两种操作,transformation和action。ransformation的操会返回一个新的RDD,就如我们在demo中看到的filter()方法,是一种组织和准备数据的方式。为之后的action执行计算提供数据基础。action的操作是真正产生一个计算操作的过程。例如demo中的count()。 action不会返回一个RDD,它会返回实际的操作结果或者将数据保存到外部文件中。spark提供了很多函数,如果你分不清哪些是transformation哪些是action。 只要看它的返回值就好了。返回一个新RDD的就是transformation,不返回的就是action。 区分一个函数是哪种操作很重要,因为spark处理这两种操作的方式很不一样。

transformation

transformation是一种返回一个新的RDD的方法。它遵循延迟计算的规则。也就是说spark在运行的时候遇到transformation的时候并不会真正的执行它,直到碰到一个action的时候才会真正的执行。我们稍后会专门讨论延迟计算的规则。这里我们知道有这个概念就好。大部分transformation都是按行元素处理,就是说他们同一时间只处理一行数据(有少数transformation不是的)。就像上面说的,spark大部分的函数都是函数式编程,要求我们传递一个函数作为参数。那么所有transformation都是需要传递至少一个函数作为参数的, 这个参数就是我们指定的如何处理数据的逻辑。spark会将数据拆成一行一行的并作为参数调用我们指定的函数。就如demo中的filter,spark会将RDD的每一行作为参数传递给我们自定义的函数。

action

像之前说的RDD可以使用很多的transformation来组织和准备数据,但是光准备数据还是不行得,我们终究要用数据计算一些东西,这时候就需要我们的action,就如我们demo中的count()用来计算数据的行数. 我们还可以使用frist()取出第一条数据,用take(n)来取出前n条数据,saveAsTextFile()用来把数据存储到外部文件。也就是说action是我们真正使用数据来进行计算的方式,真正实现数据的价值的方式。

延迟计算

之前提到过,transformation的操作是延迟计算的。意思是说spark在运行的时候,运行到transformation的时候实际上并不会真正的执行transformations。直到碰到了这个RDD的action的时候,才会一股脑的执行之前所有的操作。也许这对刚接触大数据处理的同学来说有点难以理解,但如果我们仔细的想一想就会发现其实这样的设计相当的合理。 因为我们在实际情况中面对的是非常庞大的数据。如果我们在一开始就执行所有的数据操作并将数据载入内存中那将是一种很大的浪费。例如在demo中,如果我们使用的不是count这种操作全部数据的方式而是使用了first()或者take(n)这种只取了一部分数据的操作。那么事先就执行transformation的操作并将所有数据载入内存的话,那将是极大的浪费。所以取而代之的,spark在每次遇到transformation的时候并不会立刻执行,而是通过一些元数据记录RDD的操作轨迹,在遇到action的时候再推断出最优的解决方案。

常见的transformation

map

除了我们在demo中看到的filter()方法来过滤数据,我们还可以使用map()这种MapReduce时代保留下来的函数。看下面的demo

nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect() for num in squared:
print "%i " % (num)

刚才我们说大部分spark的transformation是单行处理的。所以当我们把lambda定义的匿名函数传递给map的时候。 map()会把数据中的每一行取出来作为参数进行调用。它和filter的区别可以用下图来表示。

flatMap()

与map()很相似的一个方法是flatMap()。map的操作是处理每一行的同时,返回的也是一行数据。 flatMap不一样,它返回的是一个可迭代的对象。也就是说map是一行数据转换成一行数据,flatMap是一行数据转换成多行数据。例如下面的demo

lines = sc.parallelize(["hello world", "hi"]) 
words = lines.flatMap(lambda line: line.split(" ")) 
words.first() # returns "hello"
words.count() # returns 3

下图可以表示map和flatMap的区别

集合操作


上图表示了4种集合操作。

action

reduce

最常用的action操作是我们在MapReduce时期就熟悉reduce操作,此操作是一个聚合方法。demo如下:

rdd = sc.parallelize([1,2,3,4,5])
sum = rdd.reduce(lambda x, y: x + y)

reduce接受一个函数当做参数,而这个函数也接受两个参数x和y。 这俩个参数代表着RDD中的两行,reduce是聚合函数。 它会不断的将之前计算出的两行传递给函数进行聚合计算。上面demo中的sum为15.因为reduce做了一个累加的操作。

其他

此外还有我们早就见过的count(),以及一些其他的例如:

  • collect():返回RDD中所有的数据
  • countByValue():统计每一个value出现的次数
  • take(n):取出前N行数据
  • foreach:循环RDD中的每一行数据并执行一个操作

persist

我们上面说过从性能上考虑RDD是延迟计算的,每遇到一个action都会从头开始执行。这样是不够的,因为有的时候我们需要重复使用一个RDD很多次。如果这个RDD的每一个action都要重新载入那么多的数据,那也是很蛋疼的。 所以spark提供了persist函数来让我们缓存RDD。

lines = sc.parallelize(["hello world", "hi"]) 
a = lines.flatMap(lambda line: line.split(" ")).persist() 
a.count()
a.take(10)

上面我们使用persist函数缓存了RDD。所以再调用count()和take()的时候,spark并没有重新执行一次RDD的transformation。spark有很多缓存的级别。可以参考下面的图表

可以使用persist(storageLevel='MEMORY_AND_DISK'),像这样的方式指定缓存级别。 默认是MEMORY_ONLY。

总结

spark的一些基础就讲的差不多了。 知道这些基本的知识基本上就可以写写demo了。 之后的帖子我会讲讲spark sql,键值对的RDD,shuffle等方面的东西。 可能有的同学觉得学这些没多大用,我们测试用不着这些。 恩,一般的测试类型确实用不着。 我讲这些也是给数据测试和人工智能测试做个铺垫。 因为大数据是这些测试类型的基础。

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
共收到 15 条回复 时间 点赞

看的速度都跟不上你研发的速度了。

Lihuazhang 回复

额,最近在家比较闲

😂 天哪噜,测试开发这么难啊😂 ,spark都玩上了

843633513 回复

没有。。。。测试类型不一样。。。我这边是做数据测试。所以才用这玩意。你不做这种测试就随便看看了解一下就行了。。。

汗颜,学习文章的速度跟不上LZ更新的速度。还得fighting啊!

seveniruby 将本帖设为了精华贴 03月21日 00:54

好文,收藏

—— 来自TesterHome官方 安卓客户端

高产 tester啊

😀 学习了

ycwdaaaa 回复

数据测试行内积累太少了,基本只能跟着开发走自己摸索,唉

这里有几个坑要踩:

  1. pip 的源,这里要设置下,否则好慢 https://blog.csdn.net/furzoom/article/details/53897318
  2. 如果你使用的是java9,降级到java8吧
  3. 如果你用pycharm,那么还得再配置个java8的JAVA_HOME

所以总的来说,如果你在mac上的下载了,spark 2.3,然后你也用pycharm的话,那么:

  1. 先创建个项目
  2. 在这个项目的project interpreter里找到pip,给他换个源:https://pypi.tuna.tsinghua.edu.cn/simple/
  3. 然后安装 pyspark
  4. 然后再安装java8,装好之后,给pycharm加环境变量,只要加个JAVA_HOME 指向java8就可以了。
  5. 然后就可以用了。

否则你会遇到下面的异常:

/Users/lihuazhang/code/spark-learn/one/venv/bin/python /Users/lihuazhang/code/spark-learn/one/SimpleApp.py
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/Users/lihuazhang/code/spark-learn/one/venv/lib/python2.7/site-packages/pyspark/jars/hadoop-auth-2.7.3.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2018-03-25 00:44:51 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2018-03-25 00:44:51 WARN  Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Traceback (most recent call last):
  File "/Users/lihuazhang/code/spark-learn/one/SimpleApp.py", line 9, in <module>
    numAs = logData.filter(logData.value.contains('a')).count()
  File "/Users/lihuazhang/code/spark-learn/one/venv/lib/python2.7/site-packages/pyspark/sql/dataframe.py", line 455, in count
    return int(self._jdf.count())
  File "/Users/lihuazhang/code/spark-learn/one/venv/lib/python2.7/site-packages/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/lihuazhang/code/spark-learn/one/venv/lib/python2.7/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/Users/lihuazhang/code/spark-learn/one/venv/lib/python2.7/site-packages/py4j/protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o35.count.
: java.lang.IllegalArgumentException
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:449)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:432)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:432)
    at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:262)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:261)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:261)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2066)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
    at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770)
    at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
    at org.apache.spark.sql.Dataset.count(Dataset.scala:2769)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.base/java.lang.Thread.run(Thread.java:844)


Process finished with exit code 1


Lihuazhang 回复

赞,补充的很全面

ycwdaaaa 回复

飞哥,不知道有没有兴趣加个微信(我的微信:wz63650406)。 关注你也比较久了,我之前一直做大数据组件方面(azkaban、spark等)的测试,可能马上要去阿里那边做数据算法方面的测试了,有机会想跟你多交流交流。感觉很多关于测试的观点、看法跟你都非常相似。

zni.feng 回复

加你了~

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册