当前位置: 首页 > news >正文

网站数据丢失网址搜索引擎

网站数据丢失,网址搜索引擎,百度广告点击一次多少钱,node 做的大型网站一、创建Spark Streaming 环境 二、读取数据(监听端口) 三、任务处理 四、启动程序 我这里写的是简单的单词数量统计 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.{SparkConf, SparkConte…

一、创建Spark Streaming 环境

二、读取数据(监听端口)

三、任务处理

四、启动程序

我这里写的是简单的单词数量统计

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Durations, StreamingContext}object Demo1WordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]")conf.setAppName("wc")val sc = new SparkContext(conf)//1、创建spark streaming环境//指定处理的间隔时间val ssc = new StreamingContext(sc, Durations.seconds(5))//2、读取数据//nc -lk 8888//yum install ncval linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)//统计单词的数量val countDS: DStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)countDS.print()//启动spark streaming程序ssc.start()ssc.awaitTermination()}
}

UpdateStateByKey(有状态算子)能统计之前的单词数量,可做实时更新 

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object Demo2UpdateStateByKey {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]")conf.setAppName("wc")val sc = new SparkContext(conf)//1、创建spark streaming环境//指定处理的间隔时间val ssc = new StreamingContext(sc, Durations.seconds(5))//设置checkpoint路径//用于保存状态ssc.checkpoint("data/checkpoint")//2、读取数据//nc -lk 8888//yum install ncval linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)//统计单词的数量val kvDS: DStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))//updateStateByKey(有状态算子): 每一次计算更新每一个key的状态(单词的数量)val countDS: DStream[(String, Int)] = kvDS.updateStateByKey {/*** seq: 当前批次一个key所有value* state: 之前的结果(状态:之前的单词的数量)*/case (seq: Seq[Int], state: Option[Int]) =>println(seq)println(state)//计算当前批次单词的数量val sum: Int = seq.sum//获取之前单词的数量val count: Int = state match {case Some(count) => countcase None => 0}//计算新的单词的数量并返回Option(sum + count)}countDS.print()//启动spark streaming程序ssc.start()ssc.awaitTermination()}
}

http://www.fp688.cn/news/145871.html

相关文章:

  • 青岛网站建设 上流上海搜索引擎优化1
  • 一个超链接 多个网站广告策划
  • b站推广网站2024年不用下载网络游戏推广员
  • 网站即时客服系统网站建设方案范文
  • 优化网站速度的要点seo是什么工作
  • 网站建设哪家公司靠谱手机搜索引擎排行榜
  • 上海外贸网站开发企业qq怎么申请
  • 制作网站中英文字体不能为seo推广公司
  • 用手机怎么做网站国内优秀个人网站欣赏
  • 雄安网站建设公司百度企业查询
  • Linux哪个版本做网站好微信公众号小程序怎么做
  • 广东的网站建设买转发链接
  • 魏县做网站的建个网站费用多少
  • 全国拿货最便宜的网站软文推广策划方案
  • 专做特产的网站上海职业技能培训机构一览表
  • 淄博那里有做网站的模板网站如何建站
  • 商城在线描述优化方法
  • 山东省建设执业师之家官方网站企业网站推广优化公司
  • 太原市手机网站建设2021年搜索引擎排名
  • 做网站的好框架如何自己制作一个网站
  • 网站建设理由和目的腾讯朋友圈广告怎么投放
  • 保险平台有哪些高州网站seo
  • b2b模式的网站老司机们用的关键词有哪些
  • 西山区城市建设局网站做网站找哪个公司好
  • 安卓4.3网站开发兼容怎么做好网络销售
  • 跨境电商网站搭建数据分析师培训
  • 一个人做网站建设需掌握关键词排名优化公司
  • 做外贸网站价格各大免费推广网站
  • 如何建立自己网站平台郑州网站建设哪家好
  • 特色的南昌网站建设廊坊首页霸屏优化