Spark 与 QingStor 集成
更新时间:2025-01-17 11:03:13
PDF
有两种方式可以启动 Spark job: 通过 spark-shell 交互式运行和通过 spark-submit 提交 job 到 Spark 集群运行,这两种方式都需要通过选项 "--jars $SPARK_S3" 来指定使用 S3 API相关的 jar 包。
假设您在 QingStor 上的 bucket 为 my-bucket
, 下面以 spark-shell 为例, 列出常见的 Spark 与 QingStor 集成场景。
存储到 QingStor
在 Spark 中读取到 HDFS 上的文件后将其存储到 QingStor。
-
将本地的一个测试文件上传到 QingMR 集群的 HDFS 存储节点上。
$ cd /opt/hadoop $ bin/hdfs dfs -mkdir /input $ bin/hdfs dfs -put /opt/spark/README.md /input/
-
启动 spark-shell,并执行如下命令,将会读取 HDFS 上的 README.md 文件,将其存为 QingStor 中 "my-bucket" 下的 test 文件。
$ cd /opt/spark $ bin/spark-shell --master spark://< 主节点IP >:7077 --jars $SPARK_S3 val qs_file = sc.textFile("hdfs://<hdfs-master-ip>:9000/input/README.md") qs_file.saveAsTextFile("s3a://my-bucket/test")
存储到 HDFS 文件系统
在 Spark 中读取 QingStor 上的文件,处理后再存储到 HDFS 文件系统中。
val qs_file = sc.textFile("s3a://my-bucket/test")
qs_file.count()
qs_file.saveAsTextFile("hdfs://<hdfs-master-ip>:9000/output/")
文件处理后,存储回 QingStor
在 Spark 中读取 QingStor 上的文件, 经过处理后将结果存回 QingStor。
如下代码将会读取 QingStor 中 my-bucket 下的 test 文件, 从中选出包含字符串 "Spark" 的行, 最后将结果存储到 my-bucket 下的 qingstor-output 文件中。
val qs_file = sc.textFile("s3a://my-bucket/test").filter(line => line.contains("Spark"))
qs_file.saveAsTextFile("s3a://my-bucket/output1")
在 Spark 中创建元素值为 1 到 1000 的数组, 找出其中的奇数并对其求平方, 最后将结果存储到 QingStor 上的文件中。
val data = for (i <- 1 to 1000) yield i
sc.parallelize(data).filter(_%2 != 0).map(x=>x*x).saveAsTextFile("s3a://my-bucket/output2")