快上网专注成都网站设计 成都网站制作 成都网站建设
成都网站建设公司服务热线:028-86922220

网站建设知识

十年网站开发经验 + 多家企业客户 + 靠谱的建站团队

量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决

spark应用程序如何在Java项目中运行-创新互联

这篇文章将为大家详细讲解有关spark应用程序如何在Java项目中运行,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

成都创新互联是网站建设技术企业,为成都企业提供专业的成都网站设计、成都做网站,网站设计,网站制作,网站改版等技术服务。拥有10年丰富建站经验和众多成功案例,为您定制适合企业的网站。10年品质,值得信赖!

如下所示:

package org.shirdrn.spark.job;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.shirdrn.spark.job.maxmind.Country;
import org.shirdrn.spark.job.maxmind.LookupService;
import scala.Serializable;
import scala.Tuple2;
public class IPAddressStats implements Serializable {
  private static final long serialVersionUID = 8533489548835413763L;
  private static final Log LOG = LogFactory.getLog(IPAddressStats.class);
  private static final Pattern SPACE = Pattern.compile(" ");
  private transient LookupService lookupService;
  private transient final String geoIPFile;
  public IPAddressStats(String geoIPFile) {
   this.geoIPFile = geoIPFile;
   try {
    // lookupService: get country code from a IP address
    File file = new File(this.geoIPFile);
    LOG.info("GeoIP file: " + file.getAbsolutePath());
    lookupService = new AdvancedLookupService(file, LookupService.GEOIP_MEMORY_CACHE);
   } catch (IOException e) {
    throw new RuntimeException(e);
   }
  }
  @SuppressWarnings("serial")
  public void stat(String[] args) {
   JavaSparkContext ctx = new JavaSparkContext(args[0], "IPAddressStats",
     System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(IPAddressStats.class));
   JavaRDD lines = ctx.textFile(args[1], 1);
   // splits and extracts ip address filed
   JavaRDD words = lines.flatMap(new FlatMapFunction() {
    @Override
    public Iterable call(String s) {
     // 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
     // ip address
     return Arrays.asList(SPACE.split(s)[0]);
    }
   });
   // map
   JavaPairRDD ones = words.map(new PairFunction() {
    @Override
    public Tuple2 call(String s) {
     return new Tuple2(s, 1);
    }
   });
   // reduce
   JavaPairRDD counts = ones.reduceByKey(new Function2() {
    @Override
    public Integer call(Integer i1, Integer i2) {
     return i1 + i2;
    }
   });
   List> output = counts.collect();
   // sort statistics result by value
   Collections.sort(output, new Comparator>() {
    @Override
    public int compare(Tuple2 t1, Tuple2 t2) {
     if(t1._2 < t2._2) {
       return 1;
     } else if(t1._2 > t2._2) {
       return -1;
     }
     return 0;
    }
   });
   writeTo(args, output);
  }
  private void writeTo(String[] args, List> output) {
   for (Tuple2<?, ?> tuple : output) {
    Country country = lookupService.getCountry((String) tuple._1);
    LOG.info("[" + country.getCode() + "] " + tuple._1 + "\t" + tuple._2);
   }
  }
  public static void main(String[] args) {
   // ./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
   if (args.length < 3) {
    System.err.println("Usage: IPAddressStats   ");
    System.err.println(" Example: org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat");
    System.exit(1);
   }
   String geoIPFile = args[2];
   IPAddressStats stats = new IPAddressStats(geoIPFile);
   stats.stat(args);
   System.exit(0);
  }
}

本文标题:spark应用程序如何在Java项目中运行-创新互联
本文来源:http://6mz.cn/article/ijdeo.html

其他资讯