[Flink]wordcount

news/2024/9/8 5:19:19/

一、有界流

1、代码

package wc;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class BoundedStreamWordCount {public static void main(String[] args) throws Exception {//TODO 1.创建流式的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//TODO 2.读取文件DataStreamSource<String> lineDS = env.readTextFile("input/words.txt");//TODO 3.处理数据:切分、转换、分组、求和//flatMap方法的参数是一个接口,该接口需要重写flatMap方法//这里使用的是匿名实现类//value为读入的每条数据的,数据类型//out为采集器,用来返回数据SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {Tuple2<String, Integer> wordAndOne = Tuple2.of(word, 1); //将每个单词转换成2元组out.collect(wordAndOne);//使用Collector向下游发送数据}}});//TODO 4.按照word分组//new KeySelector<Tuple2<String, Integer>, String> 第一个类型指的是传入的数据的类型,第二个类型指的是key的数据类型KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});//TODO 5.聚合SingleOutputStreamOperator<Tuple2<String, Integer>> sumDS = wordAndOneKS.sum(1);//TODO 6.打印sumDS.print();//TODO 7.执行env.execute(); //默认核数为电脑的所有核数}
}

2、说明

假如接口A,里面有一个方法a()
1)正常写法:定义一个class B,去实现接口A,并且实现它的方法a()
B b=new B()
2)匿名实现类写法

new A(){
  实现a(){ }
}

二、无界流

1、代码

package wc;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class StreamWordCount {public static void main(String[] args) throws Exception {//TODO 1.创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//TODO 2.读取数据:socketDataStreamSource<String> lineDataStream = env.socketTextStream("hadoop1",7777);//TODO 3.处理数据SingleOutputStreamOperator<Tuple2<String, Integer>> sum = lineDataStream.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split("\\s+");for (String word : words) {out.collect(Tuple2.of(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)) //存在泛型擦除的问题,需要指定flatmap.keyBy((value) -> value.f0).sum(1);       //value:只有一个参数的时候,类型可以不写//TODO 4.打印sum.print();//TODO 5.启动执行env.execute(); //默认核数为电脑的所有核数}
}

2、在hadoop上启动

nc -lk 7777

3、报错

 1)报错原因:泛型擦除

没有指定Collector的类型

2)解决方法:增加returns方法,指定Collector的类型


http://www.ppmy.cn/news/636163.html

相关文章

sunpinyin uint.h

Ubuntu下Ibus输入平台安装SunPinYin步骤及出现的系列问题解决方案 http://www.pczgc.com 编辑&#xff1a;chinabet 时间&#xff1a; 2011-10-14 阅读&#xff1a; 10 Ubuntu默认自带的是ibus输入平台&#xff0c;提供一个叫pinyin的输入法。本人不是很喜欢用&#xff0c;听说…

shell脚本中如何获取命令的参数(2) ----处理命令参数

1 找出选项 1.1 处理简单选项 主要可以通过shfit工具对获取的到$1变量对比程序允许的变量值判断&#xff1b; 1.2 从参数中分离选项 一般参数可能在后面跟上适当的参数值&#xff0c;例如 sed -f script data 所有如何把选项的参数与选项有效的分隔开也是一种很重要的技…

liunx下的组管理

首先了解一下 ***************** /etc/group文件当中每行的具体含义&#xff1a; eg: sudo:x:27:jiangjian 1:组用户名 2:密码用x替代,真正的密码在/etc/gshadow 3:用户组ID 4:组成员列表 ***************** /etc/gshadow文件每行的具体含义 rootjiangjian-K42JZ:/e…

sed基本用法介绍1

sed常用语对文本流进行处理&#xff0c;以行为单位进行筛选&#xff0c; 格式&#xff1a; sed options script file 常用参数&#xff1a; -e 表示多个命令&#xff0c;命令之间用分号分隔 rootjiangjian-K42JZ:/home/jiangjian/sh# cat data this is a test of…

linux组管理

首先了解一下 ***************** /etc/group文件当中每行的具体含义&#xff1a; eg: sudo:x:27:jiangjian 1:组用户名 2:密码用x替代,真正的密码在/etc/gshadow 3:用户组ID 4:组成员列表 ***************** /etc/gshadow文件每行的具体含义 rootjiangjian-K42JZ:/etc# cat…

shell-for,sed,bc,expr,(())

转载 linux系统计算从1加到100之和思路风暴_老男孩linux培训的技术博客_51CTO博客 ①.C语言型 for循环结构及(())计算式shell脚本 [oldboystudent ~]$ cat for1.sh #!/bin/sh j0 for((i0; i<100; i)) do ((jji)) done echo $j 写成一行的命令行写法&#xff1a; fo…

搭建Hadoop高可用框架分布式集群

搭建Hadoop高可用框架分布式集群 一.基础配置 1.创建虚拟机&#xff0c;修改虚拟机的主机名 2.修改网络配置 master:192.168.6.200 slave1:192.168.6.201 slave2:192.168.6.202 3.互ping测试 4.sudo授权 5.安装vim编辑器 6.配置网络映射 master配置映射 master向slave1传递映…

拓扑排序与关键路径

一.有向无环图描述表达式 有向无环图&#xff1a;若一个有向图中不存在环&#xff0c;则称为有向无环图&#xff0c;简称DAG图。 有向无环图是描述含有公共子式的表达式的有效工具。例如表达式((ab)*(b*(cd))(cd)*e)*((cd)*e) 可以用之前描述的二叉树来表示&#xff0c;仔细…

视频怎么转音频mp3?

视频怎么转音频mp3&#xff1f;大家可以回忆一下&#xff0c;在我们日常的生活中是不是经常遇到这种情况&#xff0c;在刷短视频或者是看一些MV的时候&#xff0c;我们是不是有将视频中的音频提取出来单独使用的需求呢&#xff0c;这个时候相信很多小伙伴会一边播放视频一边进行…

怎么将视频转为音频mp3格式?

怎么将视频转为音频mp3格式&#xff1f;这个问题是前几天有个师兄问我的&#xff0c;因为他特别热爱mv&#xff0c;他跟我说他觉得mv里的音质更贴近生活&#xff0c;更能把自己拉进那个场景中&#xff0c;随着观看mv次数的增多&#xff0c;视频场景已然印在心中&#xff0c;久而…

怎么把视频转成mp3音频?

怎么把视频转成mp3音频&#xff1f;现在很多人都在从事自媒体相关的工作&#xff0c;如果你也是&#xff0c;那么肯定会使用很多的视频或者音频素材&#xff0c;而且还经常需要视频文件转换成音频文件后再使用&#xff0c;相信很多人都遇到过&#xff0c;不过很多人还不知道如何…

视频里的声音怎么转换成音频

视频里的声音怎么转换成音频&#xff1f;很多人为了方便创作&#xff0c;就需要收集很多的素材&#xff0c;例如自媒体从业者就非常注重对音频素材的收集&#xff0c;一段好的音频可以让创作出来的作品更有生命力&#xff0c;非常的重要。而收集音频素材的方法有很多种&#xf…

如何将视频转换成音频MP3格式

平常我们看电视如果想要把喜欢的演员声音&#xff08;对白&#xff09;保存下来&#xff0c;但是视频文件就太大比较占内存&#xff0c;而音频文件就比较小方便保存、不占内存还可以设置成手机铃声。这样一来&#xff0c;大家知道为什么那么多人会把视频文件转换成音频文件了吗…

视频怎么转成音频,视频转音频快速完成

视频怎么转成音频&#xff1f;对于一些经常制作影音文件的小伙伴来说&#xff0c;平时对素材的收集是非常重要的&#xff0c;丰富的素材才能帮助我们制作出优秀的作品。素材中就包含音频素材&#xff0c;有些音频素材可以直接在网上下载得到&#xff0c;还有一些音频素材是从视…

视频怎么转换成音频?

短视频已经成为我们生活中不可或缺的一部分&#xff0c;大多数小伙伴在空余时间都会打开抖音&#xff0c;快手等媒体平台看一些自己感兴趣的视频内容。在评论区也可以经常的看到来自不同伙伴的提问&#xff1a;“这个背景音乐的名称叫什么”&#xff0c;然后会根据音乐的名称去…

怎么把视频中的声音提取成音频文件

对于一个初次接触提取视频音频的人来说&#xff0c;将视频中的音频提取出来是一件很复杂的事情&#xff0c;这个时候我们就可以使用一款工具来完成&#xff0c;其实提取视频音频也没有什么难&#xff0c;只要大家掌握了方法&#xff0c;都是非常简单的&#xff0c;那么接下来就…

怎样把视频中的音频提取成mp3?

视频由由音频和图像组成&#xff0c;有时我们在观看一些视频时&#xff0c;经常会听到一些非常好听的背景音乐&#xff0c;想要保存成mp3 时&#xff0c;发现很多音乐平台要么是付费下载的&#xff0c;要么就是没有合适的版本&#xff0c;那么如何把视频里的背景音乐提取出来呢…

如何迅速的转换音视频格式?

视频的格式都是可以转换的&#xff0c;可以转换成mp4格式&#xff0c;也可以转换成其它视频格式&#xff0c;视频格式的转换由专门的软件进行&#xff0c;迅捷视频转换器就能够迅速的转换视频格式。ogg格式完全开源&#xff0c;完全免费&#xff0c; 和mp3不相上下的新格式。 与…

视频怎么转换成音频mp3?教你几种转换方法

视频怎么转换成音频mp3&#xff1f;MP3是一种有损压缩音频格式&#xff0c;全称为MPEG-1 Audio Layer 3。MP3格式可以在保证高质量的同时&#xff0c;采用比WAV更高效的压缩方式&#xff0c;降低文件大小。MP3格式广泛应用于数字音乐播放器、音频流媒体、网络广播等方面。虽然M…

如何把视频转换成mp3格式

如何把视频转换成mp3格式&#xff0c;为什么会有这样的想法出现&#xff1f;其实很多工作和生活中会遇到这样的情况&#xff1a;你对于视频的背景原音有需求剪辑或是单独音频播放的&#xff0c;但是无法直接下载到音频文件。在于很多会议记录或是访谈&#xff0c;我们都会后期需…