快上网专注成都网站设计 成都网站制作 成都网站建设
成都网站建设公司服务热线:028-86922220

网站建设知识

十年网站开发经验 + 多家企业客户 + 靠谱的建站团队

量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决

SparkShell和IDEA中如何编写Spark程序-创新互联

这篇文章主要讲解了“SparkShell和IDEA中如何编写Spark程序”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“SparkShell和IDEA中如何编写Spark程序”吧!

让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:域名注册网站空间、营销软件、网站建设、武清网站维护、网站推广。

spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用Scala编写Spark程序。spark-shell程序一般用作Spark程序测试练习来用。spark-shell属于Spark的特殊应用程序,我们可以在这个特殊的应用程序中提交应用程序

spark-shell启动有两种模式,local模式和cluster模式,分别为

local模式:

spark-shell

local模式仅在本机启动一个SparkSubmit进程,没有与集群建立联系,虽然进程中有SparkSubmit但是不会被提交到集群红

SparkShell和IDEA中如何编写Spark程序

Cluster模式(集群模式):

spark-shell \
--master spark://hadoop01:7077 \
--executor-memory 512m \
--total-executor-cores 1

后两个命令不是必须的 --master这条命令是必须的(除非在jar包中已经指可以不指定,不然就必须指定)

退出shell

千万不要ctrl+c spark-shell 正确退出 :quit 千万不要ctrl+c退出 这样是错误的 若使用了ctrl+c退出 使用命令查看监听端口 netstat - apn | grep 4040 在使用kill -9 端口号 杀死即可

3.25.11 spark2.2shell和spark1.6shell对比

SparkShell和IDEA中如何编写Spark程序

ps:启动spark-shell若是集群模式,在webUI会有一个一直执行的任务

通过IDEA创建Spark工程

ps:工程创建之前步骤省略,在scala中已经讲解,直接默认是创建好工程的

对工程中的pom.xml文件配置

 

        1.8
        1.8
        UTF-8
        2.11.8
        2.2.0
        2.7.1
        2.11
    

    
        
            org.scala-lang
            scala-library
            ${scala.version}
        
        
        org.apache.spark
        spark-core_2.11
        ${spark.version}
    
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
        
     

Spark实现WordCount程序

Scala版本
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("dri/wordcount").setMaster("local[*]")
    //创建sparkContext对象
    val sc =  new SparkContext(conf)
    //通过sparkcontext对象就可以处理数据
    //读取文件 参数是一个String类型的字符串 传入的是路径
    val lines: RDD[String] = sc.textFile(“dir/wordcount”)
    //切分数据
    val words: RDD[String] = lines.flatMap(_.split(" "))
    //将每一个单词生成元组 (单词,1)
    val tuples: RDD[(String, Int)] = words.map((_,1))
    //spark中提供一个算子 reduceByKey 相同key 为一组进行求和 计算value
    val sumed: RDD[(String, Int)] = tuples.reduceByKey(_+_)
    //对当前这个结果进行排序 sortBy 和scala中sotrBy是不一样的 多了一个参数
    //默认是升序  false就是降序
    val sorted: RDD[(String, Int)] = sumed.sortBy(_._2,false)
    //将数据提交到集群存储 无法返回值
     sorted.foreach(println)
    //回收资源停止sc,结束任务
    sc.stop()
  }
}

Java版本

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List; 
public class JavaWordCount {
    public static void main(String[] args) {
//1.先创建conf对象进行配置主要是设置名称,为了设置运行模式
        SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
//2.创建context对象
        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaRDD lines = jsc.textFile("dir/file");
//进行切分数据 flatMapFunction是具体实现类
        JavaRDD words = lines.flatMap(new FlatMapFunction() {
            @Override
            public Iterator call(String s) throws Exception {
                List splited = Arrays.asList(s.split(" "));
                return splited.iterator();
            }

        });
//将数据生成元组
//第一个泛型是输入的数据类型 后两个参数是输出参数元组的数据
        JavaPairRDD tuples = words.mapToPair(new PairFunction                Integer>() {
            @Override
            public Tuple2 call(String s) throws Exception {
                return new Tuple2(s, 1);
            }
        });
//聚合
        JavaPairRDD sumed = tuples.reduceByKey(new Function2                Integer>() {
            @Override
//第一个Integer是相同key对应的value
//第二个Integer是相同key 对应的value
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
//因为Java api没有提供sortBy算子,此时需要将元组中的数据进行位置调换,然后在排序,排完序在换回
//第一次交换是为了排序
        JavaPairRDD swaped = sumed.mapToPair(new PairFunction                Integer>, Integer, String>() {
            @Override
            public Tuple2 call(Tuple2 tup) throws Exception {
                return tup.swap();
            }
        });
//排序
        JavaPairRDD sorted = swaped.sortByKey(false);
//第二次交换是为了最终结果 <单词,数量>
        JavaPairRDD res = sorted.mapToPair(new PairFunction                String>, String, Integer>() {
            @Override
            public Tuple2 call(Tuple2 tuple2) throws Exception
            {
                return tuple2.swap();
            }
        });
        System.out.println(res.collect());
        res.saveAsTextFile("out1");
        jsc.stop();
    }
}

感谢各位的阅读,以上就是“SparkShell和IDEA中如何编写Spark程序”的内容了,经过本文的学习后,相信大家对SparkShell和IDEA中如何编写Spark程序这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!


本文题目:SparkShell和IDEA中如何编写Spark程序-创新互联
网站URL:http://6mz.cn/article/ddgjdh.html

其他资讯