当前位置: 首页 > news >正文

网站的广告语应该怎么做百度关键词刷搜索量

网站的广告语应该怎么做,百度关键词刷搜索量,有做赌博网站的么,影视公司名字取名背景: flink中常见的需求如下:统计某个页面一天内的点击率,每10秒输出一次,我们如果采用ProcessWindowFunction 结合自定义触发器如何实现呢?如果这样实现问题是什么呢? ProcessWindowFunction 结合自定义触发器实现…

背景:

flink中常见的需求如下:统计某个页面一天内的点击率,每10秒输出一次,我们如果采用ProcessWindowFunction 结合自定义触发器如何实现呢?如果这样实现问题是什么呢?

ProcessWindowFunction 结合自定义触发器实现统计点击率

关键代码:
在这里插入图片描述
在这里插入图片描述
完整代码参见:

package wikiedits.func;import java.text.SimpleDateFormat;
import java.util.Date;import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import wikiedits.func.model.KeyCount;public class ProcessWindowFunctionAndTiggerDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 使用处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);env.setStateBackend(new FsStateBackend("file:///D:/tmp/flink/checkpoint/windowtrigger"));// 并行度为1env.setParallelism(1);// 设置数据源,一共三个元素DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {int xxxNum = 0;int yyyNum = 0;for (int i = 1; i < Integer.MAX_VALUE; i++) {// 只有XXX和YYY两种nameString name = (0 == i % 2) ? "XXX" : "YYY";// 更新aaa和bbb元素的总数if (0 == i % 2) {xxxNum++;} else {yyyNum++;}// 使用当前时间作为时间戳long timeStamp = System.currentTimeMillis();// 将数据和时间戳打印出来,用来验证数据if(xxxNum % 2000==0){System.out.println(String.format("source,%s, %s,    XXX total : %d,    YYY total : %d\n", name,time(timeStamp), xxxNum, yyyNum));}// 发射一个元素,并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, 1), timeStamp);// 每发射一次就延时1秒Thread.sleep(1);}}@Overridepublic void cancel() {}});// 将数据用5秒的滚动窗口做划分,再用ProcessWindowFunctionSingleOutputStreamOperator<String> mainDataStream = dataStream// 以Tuple2的f0字段作为key,本例中实际上key只有aaa和bbb两种.keyBy(value -> value.f0)// 5秒一次的滚动窗口.timeWindow(Time.minutes(5))// 10s触发一次计算,更新统计结果.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))// 统计每个key当前窗口内的元素数量,然后把key、数量、窗口起止时间整理成字符串发送给下游算子.process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {// 自定义状态private ValueState<KeyCount> state;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化状态,name是myStatestate = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", KeyCount.class));}public void clear(Context context) {ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));contextWindowValueState.clear();}@Overridepublic void process(String s, Context context, Iterable<Tuple2<String, Integer>> iterable,Collector<String> collector) throws Exception {// 从backend取得当前单词的myState状态KeyCount current = state.value();// 如果myState还从未没有赋值过,就在此初始化if (current == null) {current = new KeyCount();current.key = s;current.count = 0;}int count = 0;// iterable可以访问该key当前窗口内的所有数据,// 这里简单处理,只统计了元素数量for (Tuple2<String, Integer> tuple2 : iterable) {count++;}// 更新当前key的元素总数current.count += count;// 更新状态到backendstate.update(current);ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));KeyCount windowValue = contextWindowValueState.value();if (windowValue == null) {windowValue = new KeyCount();windowValue.key = s;windowValue.count = 0;}windowValue.count += count;contextWindowValueState.update(windowValue);// 将当前key及其窗口的元素数量,还有窗口的起止时间整理成字符串String value = String.format("window, %s, %s - %s, %d, windowStateCount :%d,   total : %d",// 当前keys,// 当前窗口的起始时间time(context.window().getStart()),// 当前窗口的结束时间time(context.window().getEnd()),// 当前key在当前窗口内元素总数count,// 当前key所在窗口的总数contextWindowValueState.value().count,// 当前key出现的总数current.count);// 发射到下游算子collector.collect(value);}});// 打印结果,通过分析打印信息,检查ProcessWindowFunction中可以处理所有key的整个窗口的数据mainDataStream.print();env.execute("processfunction demo : processwindowfunction");}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}}

这里采用ProcessWindowFunction 结合ContinuousProcessingTimeTrigger的方式确实可以实现统计至今为止某个页面点击率的目的,不过这其中需要注意点的点是:
每隔10s触发public void process(String s, Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<String> collector)方法时,iterable对象是包含了一天的窗口内收到的所有消息,也就是当前触发时iterable集合是前10s触发时iterable集合的超集,包含前10s触发时的所有的消息集合。
到这里所引起的问题也自然而然的出来了:对于ProcessWindowFunction 实现而言,flink内部是通过ListState的形式保存窗口内收到的所有消息的,注意这里flink内部会使用ListState保存每一条分配到以天为单位的窗口内的消息,这会导致状态膨胀,想一下,一天内所有的消息都会当成状态保存起来,这对于状态后端的压力是有多大!这些保存在ListState中的消息只有在窗口结束后才会清理:具体参见WindowOperator.clearAllState,那有解决方案吗?使用Agg/Reduce处理函数替ProcessWindowFunction作为处理函数可以实现吗?请看下一篇文章

参考文章:
https://www.cnblogs.com/Springmoon-venn/p/13667023.html

http://www.ritt.cn/news/18550.html

相关文章:

  • 百度做网站吗百度广告公司
  • 东莞公司网站设计友情链接qq群
  • wordpress user_idseo网络推广技术
  • 网站建设公司广州增城关键词调价工具哪个好
  • 秦皇岛做网站优化公司太原百度推广排名优化
  • 网站设计沟通58同城如何发广告
  • 怎么建设自己收费网站新网站 seo
  • 网站风格包括什么百度官网认证多少钱
  • 襄阳市做网站的公司指数计算器
  • 做淘宝主要看哪些网站百度品牌广告多少钱
  • 做系统的网站网络营销的特点包括
  • 南京做网站咨询南京乐识上海网站建设方案
  • 杭州江干建设局网站附近哪里有计算机培训班
  • 云网站开发推广普通话手抄报内容怎么写
  • 织梦cms wordpress常用的seo工具的是有哪些
  • 不使用域名做网站今日百度小说排行榜风云榜
  • 如何做自己的网站链接竞价
  • 北京学做网站网站没有友情链接
  • h5响应式音乐网站模板友情链接有哪些展现形式
  • 跨境电商独立站是什么意思免费站推广网站2022
  • 做电子商务网站需要办理什么证营销方案案例
  • 网页制作图片显示不出来win10系统优化软件
  • 做网站域名不备案会怎么样网站开发
  • 做营销策划的上哪个网站好百度关键词推广可以自己做吗
  • 网站安全评估怎么做短视频推广平台
  • 自己网站视频直播怎么做企业邮箱登录入口
  • 射阳做网站的公司关键词优化方法
  • 做网站 还是淘宝店东莞网络优化调查公司
  • 求推荐在哪个网站做德语翻译员短视频营销推广方案
  • 代购网站项目描述中国优秀网页设计案例