博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用Spark进行搜狗日志分析实例——map join的使用
阅读量:5082 次
发布时间:2019-06-13

本文共 2501 字,大约阅读时间需要 8 分钟。

map join相对reduce join来说,可以减少在shuff阶段的网络传输,从而提高效率,所以大表与小表关联时,尽量将小表数据先用广播变量导入内存,后面各个executor都可以直接使用

package sogologimport org.apache.hadoop.io.{LongWritable, Text}import org.apache.hadoop.mapred.TextInputFormatimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}class RddFile {  def readFileToRdd(path: String): RDD[String] = {    val conf = new SparkConf().setMaster("local").setAppName("sougoDemo")    val sc = new SparkContext(conf);    //使用这种方法能够避免中文乱码   readFileToRdd(path,sc)  }  def readFileToRdd(path: String,sc :SparkContext): RDD[String] = {    //使用这种方法能够避免中文乱码    sc.hadoopFile(path,classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).map{      pair =>  new String(pair._2.getBytes, 0, pair._2.getLength, "GBK")}  }}

 

package sogologimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.RDDimport scala.collection.mutable.ArrayBufferobject MapSideJoin {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local").setAppName("sougoDemo")    val sc = new SparkContext(conf);    val userRdd = new RddFile().readFileToRdd("J:\\scala\\workspace\\first-spark-demo\\sougofile\\user",sc)    //解析用户信息    val userMapRDD:RDD[(String,String)] = userRdd.map(line=>(line.split("\t")(0),line.split("\t")(1)))    //将用户信息设置为广播变量,方便各个任务引用    val userMapBroadCast =sc.broadcast(userMapRDD.collectAsMap())    val searchLogRdd = new RddFile().readFileToRdd("J:\\scala\\workspace\\first-spark-demo\\sougofile\\SogouQ.reduced",sc)    val joinResult = searchLogRdd.mapPartitionsWithIndex((index,f)=>{      val userMap = userMapBroadCast.value      var result = ArrayBuffer[String]()      var count = 0      //搜索日志表join用户表      //原来日志列为:时间 用户ID 关键词 排名 URL      //新的日志列为:时间 用户ID 用户名 关键词 排名 URL      f.foreach( log=>{        count=count+1;        val lineArrs = log.split("\t")        val uid = lineArrs(1)        val newLine:StringBuilder = new StringBuilder()        if(userMap.contains(uid)){          newLine.append(lineArrs(0)).append("\t")          newLine.append(lineArrs(1)).append("\t")          newLine.append(userMap.get(uid).get).append("\t") //从广播变量中根据用户ID获取用户名          for (i<- 2 to lineArrs.length-1){            newLine.append(lineArrs(i)).append("\t")          }          result .+= (newLine.toString())        }      })      println("partition_"+index+"处理的行数为:"+count)      result.iterator    })    //打印结果    joinResult.collect().foreach(println)  }}

 

结果展示:

 

转载于:https://www.cnblogs.com/wbh1000/p/9827344.html

你可能感兴趣的文章
HDU 5510 Bazinga KMP
查看>>
[13年迁移]Firefox下margin-top问题
查看>>
Zookeeper常用命令 (转)
查看>>
Java程序IP v6与IP v4的设置
查看>>
RUP(Rational Unified Process),统一软件开发过程
查看>>
数据库链路创建方法
查看>>
Enterprise Library - Data Access Application Block 6.0.1304
查看>>
重构代码 —— 函数即变量(Replace temp with Query)
查看>>
Bootstrap栅格学习
查看>>
程序员的数学
查看>>
聚合与组合
查看>>
jQuery如何获得select选中的值?input单选radio选中的值
查看>>
设计模式 之 享元模式
查看>>
如何理解汉诺塔
查看>>
洛谷 P2089 烤鸡【DFS递归/10重枚举】
查看>>
15 FFT及其框图实现
查看>>
Linux基本操作
查看>>
osg ifc ifccolumn
查看>>
C++ STL partial_sort
查看>>
3.0.35 platform 设备资源和数据
查看>>