windowns使用PySpark环境配置和基本操作

下载依赖

首先需要下载hadoop和spark,解压,然后设置环境变量。
hadoop清华源下载
spark清华源下载

HADOOP_HOME => /path/hadoop
SPARK_HOME => /path/spark

安装pyspark。

pip install pyspark

基本使用

可以在shell终端,输入pyspark,有如下回显:

windowns使用PySpark环境配置和基本操作

输入以下指令进行测试,并创建SparkContext,SparkContext是任何spark功能的入口点。

>>> from pyspark import SparkContext
>>> sc = SparkContext(\"local\", \"First App\")

如果以上不会报错,恭喜可以开始使用pyspark编写代码了。
不过,我这里使用IDE来编写代码,首先我们先在终端执行以下代码关闭SparkContext。

>>> sc.stop()

下面使用pycharm编写代码,如果修改了环境变量需要先重启pycharm。
在pycharm运行如下程序,程序会起本地模式的spark计算引擎,通过spark统计abc.txt文件中a和b出现行的数量,文件路径需要自己指定。

from pyspark import SparkContext

sc = SparkContext(\"local\", \"First App\")
logFile = \"abc.txt\"
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: \'a\' in s).count()
numBs = logData.filter(lambda s: \'b\' in s).count()
print(\"Line with a:%i,line with b:%i\" % (numAs, numBs))

运行结果如下:

20/03/11 16:15:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
Using Spark\’s default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to \”WARN\”.
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/03/11 16:15:58 WARN Utils: Service \’SparkUI\’ could not bind on port 4040. Attempting port 4041.
Line with a:3,line with b:1

这里说一下,同样的工作使用python可以做,spark也可以做,使用spark主要是为了高效的进行分布式计算。
戳pyspark教程
戳spark教程

RDD

RDD代表Resilient Distributed Dataset,它们是在多个节点上运行和操作以在集群上进行并行处理的元素,RDD是spark计算的操作对象。
一般,我们先使用数据创建RDD,然后对RDD进行操作。
对RDD操作有两种方法:
Transformation(转换) – 这些操作应用于RDD以创建新的RDD。例如filter,groupBy和map。
Action(操作) – 这些是应用于RDD的操作,它指示Spark执行计算并将结果发送回驱动程序,例如count,collect等。

创建RDD

parallelize是从列表创建RDD,先看一个例子:

from pyspark import SparkContext


sc = SparkContext(\"local\", \"count app\")
words = sc.parallelize(
    [\"scala\",
     \"java\",
     \"hadoop\",
     \"spark\",
     \"akka\",
     \"spark vs hadoop\",
     \"pyspark\",
     \"pyspark and spark\"
     ])
print(words)

结果中我们得到一个对象,就是我们列表数据的RDD对象,spark之后可以对他进行操作。

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

Count

count方法返回RDD中的元素个数。

from pyspark import SparkContext


sc = SparkContext(\"local\", \"count app\")
words = sc.parallelize(
    [\"scala\",
     \"java\",
     \"hadoop\",
     \"spark\",
     \"akka\",
     \"spark vs hadoop\",
     \"pyspark\",
     \"pyspark and spark\"
     ])
print(words)

counts = words.count()
print(\"Number of elements in RDD -> %i\" % counts)

返回结果:

Number of elements in RDD -> 8

Collect

collect返回RDD中的所有元素。

from pyspark import SparkContext


sc = SparkContext(\"local\", \"collect app\")
words = sc.parallelize(
    [\"scala\",
     \"java\",
     \"hadoop\",
     \"spark\",
     \"akka\",
     \"spark vs hadoop\",
     \"pyspark\",
     \"pyspark and spark\"
     ])
coll = words.collect()
print(\"Elements in RDD -> %s\" % coll)

返回结果:

Elements in RDD -> [\’scala\’, \’java\’, \’hadoop\’, \’spark\’, \’akka\’, \’spark vs hadoop\’, \’pyspark\’, \’pyspark and spark\’]

foreach

每个元素会使用foreach内的函数进行处理,但是不会返回任何对象。
下面的程序中,我们定义的一个累加器accumulator,用于储存在foreach执行过程中的值。

from pyspark import SparkContext
sc = SparkContext(\"local\", \"ForEach app\")

accum = sc.accumulator(0)
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)


def increment_counter(x):
    print(x)
    accum.add(x)
 return 0

s = rdd.foreach(increment_counter)
print(s)  # None
print(\"Counter value: \", accum)

返回结果:

None
Counter value:  15

filter

返回一个包含元素的新RDD,满足过滤器的条件。

from pyspark import SparkContext
sc = SparkContext(\"local\", \"Filter app\")
words = sc.parallelize(
    [\"scala\",
     \"java\",
     \"hadoop\",
     \"spark\",
     \"akka\",
     \"spark vs hadoop\",
     \"pyspark\",
     \"pyspark and spark\"]
)
words_filter = words.filter(lambda x: \'spark\' in x)
filtered = words_filter.collect()
print(\"Fitered RDD -> %s\" % (filtered))

 

Fitered RDD -> [\'spark\', \'spark vs hadoop\', \'pyspark\', \'pyspark and spark\']

也可以改写成这样:

from pyspark import SparkContext
sc = SparkContext(\"local\", \"Filter app\")
words = sc.parallelize(
    [\"scala\",
     \"java\",
     \"hadoop\",
     \"spark\",
     \"akka\",
     \"spark vs hadoop\",
     \"pyspark\",
     \"pyspark and spark\"]
)


def g(x):
    for i in x:
        if \"spark\" in x:
            return i

words_filter = words.filter(g)
filtered = words_filter.collect()
print(\"Fitered RDD -> %s\" % (filtered))

map

将函数应用于RDD中的每个元素并返回新的RDD。

from pyspark import SparkContext
sc = SparkContext(\"local\", \"Map app\")
words = sc.parallelize(
    [\"scala\",
     \"java\",
     \"hadoop\",
     \"spark\",
     \"akka\",
     \"spark vs hadoop\",
     \"pyspark\",
     \"pyspark and spark\"]
)
words_map = words.map(lambda x: (x, 1, \"_{}\".format(x)))
mapping = words_map.collect()
print(\"Key value pair -> %s\" % (mapping))

返回结果:

Key value pair -> [(\’scala\’, 1, \’_scala\’), (\’java\’, 1, \’_java\’), (\’hadoop\’, 1, \’_hadoop\’), (\’spark\’, 1, \’_spark\’), (\’akka\’, 1, \’_akka\’), (\’spark vs hadoop\’, 1, \’_spark vs hadoop\’), (\’pyspark\’, 1, \’_pyspark\’), (\’pyspark and spark\’, 1, \’_pyspark and spark\’)]

Reduce

执行指定的可交换和关联二元操作后,然后返回RDD中的元素。

from pyspark import SparkContext
from operator import add


sc = SparkContext(\"local\", \"Reduce app\")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print(\"Adding all the elements -> %i\" % (adding))

 这里的add是python内置的函数,可以使用ide查看:

def add(a, b):
    \"Same as a + b.\"
    return a + b

reduce会依次对元素相加,相加后的结果加上其他元素,最后返回结果(RDD中的元素)。

Adding all the elements -> 15

Join

返回RDD,包含两者同时匹配的键,键包含对应的所有元素。

from pyspark import SparkContext


sc = SparkContext(\"local\", \"Join app\")
x = sc.parallelize([(\"spark\", 1), (\"hadoop\", 4), (\"python\", 4)])
y = sc.parallelize([(\"spark\", 2), (\"hadoop\", 5)])
print(\"x =>\", x.collect())
print(\"y =>\", y.collect())
joined = x.join(y)
final = joined.collect()
print( \"Join RDD -> %s\" % (final))

返回结果:

x => [(\’spark\’, 1), (\’hadoop\’, 4), (\’python\’, 4)]
y => [(\’spark\’, 2), (\’hadoop\’, 5)]
Join RDD -> [(\’hadoop\’, (4, 5)), (\’spark\’, (1, 2))]

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容