网创优客建站品牌官网
为成都网站建设公司企业提供高品质网站建设
热线:028-86922220
成都专业网站建设公司

定制建站费用3500元

符合中小企业对网站设计、功能常规化式的企业展示型网站建设

成都品牌网站建设

品牌网站建设费用6000元

本套餐主要针对企业品牌型网站、中高端设计、前端互动体验...

成都商城网站建设

商城网站建设费用8000元

商城网站建设因基本功能的需求不同费用上面也有很大的差别...

成都微信网站建设

手机微信网站建站3000元

手机微信网站开发、微信官网、微信商城网站...

建站知识

当前位置:首页 > 建站知识

spark应用程序如何在Java项目中运行-创新互联

这篇文章将为大家详细讲解有关spark应用程序如何在Java项目中运行,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

成都创新互联是网站建设技术企业,为成都企业提供专业的成都网站设计、成都做网站,网站设计,网站制作,网站改版等技术服务。拥有10年丰富建站经验和众多成功案例,为您定制适合企业的网站。10年品质,值得信赖!

如下所示:

package org.shirdrn.spark.job;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.shirdrn.spark.job.maxmind.Country;
import org.shirdrn.spark.job.maxmind.LookupService;
import scala.Serializable;
import scala.Tuple2;
public class IPAddressStats implements Serializable {
  private static final long serialVersionUID = 8533489548835413763L;
  private static final Log LOG = LogFactory.getLog(IPAddressStats.class);
  private static final Pattern SPACE = Pattern.compile(" ");
  private transient LookupService lookupService;
  private transient final String geoIPFile;
  public IPAddressStats(String geoIPFile) {
   this.geoIPFile = geoIPFile;
   try {
    // lookupService: get country code from a IP address
    File file = new File(this.geoIPFile);
    LOG.info("GeoIP file: " + file.getAbsolutePath());
    lookupService = new AdvancedLookupService(file, LookupService.GEOIP_MEMORY_CACHE);
   } catch (IOException e) {
    throw new RuntimeException(e);
   }
  }
  @SuppressWarnings("serial")
  public void stat(String[] args) {
   JavaSparkContext ctx = new JavaSparkContext(args[0], "IPAddressStats",
     System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(IPAddressStats.class));
   JavaRDD lines = ctx.textFile(args[1], 1);
   // splits and extracts ip address filed
   JavaRDD words = lines.flatMap(new FlatMapFunction() {
    @Override
    public Iterable call(String s) {
     // 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
     // ip address
     return Arrays.asList(SPACE.split(s)[0]);
    }
   });
   // map
   JavaPairRDD ones = words.map(new PairFunction() {
    @Override
    public Tuple2 call(String s) {
     return new Tuple2(s, 1);
    }
   });
   // reduce
   JavaPairRDD counts = ones.reduceByKey(new Function2() {
    @Override
    public Integer call(Integer i1, Integer i2) {
     return i1 + i2;
    }
   });
   List> output = counts.collect();
   // sort statistics result by value
   Collections.sort(output, new Comparator>() {
    @Override
    public int compare(Tuple2 t1, Tuple2 t2) {
     if(t1._2 < t2._2) {
       return 1;
     } else if(t1._2 > t2._2) {
       return -1;
     }
     return 0;
    }
   });
   writeTo(args, output);
  }
  private void writeTo(String[] args, List> output) {
   for (Tuple2<?, ?> tuple : output) {
    Country country = lookupService.getCountry((String) tuple._1);
    LOG.info("[" + country.getCode() + "] " + tuple._1 + "\t" + tuple._2);
   }
  }
  public static void main(String[] args) {
   // ./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
   if (args.length < 3) {
    System.err.println("Usage: IPAddressStats   ");
    System.err.println(" Example: org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat");
    System.exit(1);
   }
   String geoIPFile = args[2];
   IPAddressStats stats = new IPAddressStats(geoIPFile);
   stats.stat(args);
   System.exit(0);
  }
}

分享题目:spark应用程序如何在Java项目中运行-创新互联
分享网址:http://bjjierui.cn/article/ijdeo.html

其他资讯