阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Flink开发IDEA环境搭建与测试

470次阅读
没有评论

共计 9439 个字符,预计需要花费 24 分钟才能阅读完成。

一.IDEA 开发环境

1.pom 文件设置

<properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <Hadoop.version>2.7.6</hadoop.version>
        <flink.version>1.6.1</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.22</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <!-- <arg>-make:transitive</arg> -->
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.apache.spark.WordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

2.flink 开发流程

Flink 具有特殊类 DataSetDataStream在程序中表示数据。您可以将它们视为可以包含重复项的不可变数据集合。DataSet 数据有限 的情况下,对于一个 DataStream 元素的数量可以是无界的。

这些集合在某些关键方面与常规 Java 集合不同。首先,它们是不可变的,这意味着一旦创建它们就无法添加或删除元素。你也不能简单地检查里面的元素。

集合最初通过在弗林克程序添加源创建和新的集合从这些通过将它们使用 API 方法如衍生 mapfilter 等等。

Flink 程序看起来像是转换数据集合的常规程序。每个程序包含相同的基本部分:

1. 获取 execution environment,

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2. 加载 / 创建初始化数据

DataStream<String> text = env.readTextFile("file:///path/to/file");

3. 指定此数据的转换

val mapped = input.map {x => x.toInt}

4. 指定放置计算结果的位置

writeAsText(String path)

print()

5. 触发程序执行

在 local 模式下执行程序

execute()

将程序达成 jar 运行在线上

./bin/flink run \

-m node21:8081 \

./examples/batch/WordCount.jar \

–input  hdfs:///user/admin/input/wc.txt \

–output  hdfs:///user/admin/output2  \

二. Wordcount 案例

1.Scala 代码

package com.xyg.streaming

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
  * Author: Mr.Deng
  * Date: 2018/10/15
  * Desc:
  */
object SocketWindowWordCountScala {def main(args: Array[String]) : Unit = {// 定义一个数据类型保存单词出现的次数
    case class WordWithCount(word: String, count: Long)
    // port 表示需要连接的端口
    val port: Int = try {ParameterTool.fromArgs(args).getInt("port")
    } catch {case e: Exception => {System.err.println("No port specified. Please run'SocketWindowWordCount --port <port>'")
        return
      }
    }
    // 获取运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 连接此 socket 获取输入数据
    val text = env.socketTextStream("node21", port, '\n')
    //需要加上这一行隐式转换 否则在调用 flatmap 方法的时候会报错
    import org.apache.flink.api.scala._
    // 解析数据, 分组, 窗口化, 并且聚合求 SUM
    val windowCounts = text
      .flatMap {w => w.split("\\s") }
      .map {w => WordWithCount(w, 1) }
      .keyBy("word")
      .timeWindow(Time.seconds(5), Time.seconds(1))
      .sum("count")
    // 打印输出并设置使用一个并行度
    windowCounts.print().setParallelism(1)
    env.execute("Socket Window WordCount")
  }
}

2.Java 代码

package com.xyg.streaming;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * Author: Mr.Deng
 * Date: 2018/10/15
 * Desc: 使用 flink 对指定窗口内的数据进行实时统计,最终把结果打印出来
 *       先在 node21 机器上执行 nc -l 9000
 */
public class StreamingWindowWordCountJava {public static void main(String[] args) throws Exception {//定义 socket 的端口号
    int port;
    try{ParameterTool parameterTool = ParameterTool.fromArgs(args);
        port = parameterTool.getInt("port");
    }catch (Exception e){System.err.println("没有指定 port 参数,使用默认值 9000");
        port = 9000;
    }
    //获取运行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //连接 socket 获取输入的数据
    DataStreamSource<String> text = env.socketTextStream("node21", port, "\n");
    //计算数据
    DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {public void flatMap(String value, Collector<WordWithCount> out) throws Exception {String[] splits = value.split("\\s");
            for (String word:splits) {out.collect(new WordWithCount(word,1L));
            }
        }
    })//打平操作,把每行的单词转为 <word,count> 类型的数据
            //针对相同的 word 数据进行分组
            .keyBy("word")
            //指定计算数据的窗口大小和滑动窗口大小
            .timeWindow(Time.seconds(2),Time.seconds(1))
            .sum("count");
    //把数据打印到控制台, 使用一个并行度
    windowCount.print().setParallelism(1);
    //注意:因为 flink 是懒加载的,所以必须调用 execute 方法,上面的代码才会执行
    env.execute("streaming word count");
}

    /**
     * 主要为了存储单词以及单词出现的次数
     */
    public static class WordWithCount{public String word;
        public long count;
        public WordWithCount(){}
        public WordWithCount(String word, long count) {this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }

}

3.运行测试

首先,使用 nc 命令启动一个本地监听,命令是:

[admin@node21 ~]$ nc -l 9000

通过 netstat 命令观察 9000 端口。netstat -anlp | grep 9000,启动监听 如果报错:-bash: nc: command not found,请先安装 nc,在线安装命令:yum -y install nc

然后,IDEA 上运行 flink 官方案例程序

node21 上输入

Flink 开发 IDEA 环境搭建与测试

IDEA 控制台输出如下

Flink 开发 IDEA 环境搭建与测试

4.集群测试

这里单机测试官方案例

[admin@node21 flink-1.6.1]$ pwd
/opt/flink-1.6.1
[admin@node21 flink-1.6.1]$ ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node21.
Starting taskexecutor daemon on host node21.
[admin@node21 flink-1.6.1]$ jps
2100 StandaloneSessionClusterEntrypoint
2518 TaskManagerRunner
2584 Jps
[admin@node21 flink-1.6.1]$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

程序连接到套接字并等待输入。您可以检查 Web 界面以验证作业是否按预期运行:

Flink 开发 IDEA 环境搭建与测试

Flink 开发 IDEA 环境搭建与测试

单词在 5 秒的时间窗口(处理时间,翻滚窗口)中计算并打印到 stdout 监视 TaskManager 的输出文件并写入一些文本nc(输入在点击后逐行发送到 Flink):

Flink 开发 IDEA 环境搭建与测试

Flink 开发 IDEA 环境搭建与测试

三. 使用 IDEA 开发离线程序

Dataset 是 flink 的常用程序,数据集通过 source 进行初始化,例如读取文件或者序列化集合,然后通过 transformation(filtering、mapping、joining、grouping)将数据集转成,然后通过 sink 进行存储,既可以写入 hdfs 这种分布式文件系统,也可以打印控制台,flink 可以有很多种运行方式,如 local、flink 集群、yarn 等.

1. scala程序

package com.xyg.batch

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

/**
  * Author: Mr.Deng
  * Date: 2018/10/19
  * Desc:
  */
object WordCountScala{def main(args: Array[String]) {//初始化环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //从字符串中加载数据
    val text = env.fromElements("Who's there?",
      "I think I hear them. Stand, ho! Who's there?")
    //分割字符串、汇总 tuple、按照 key 进行分组、统计分组后 word 个数
    val counts = text.flatMap {_.toLowerCase.split("\\W+") filter {_.nonEmpty} }
      .map {(_, 1) }
      .groupBy(0)
      .sum(1)
    //打印
    counts.print()}
}

2. java程序

package com.xyg.batch;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * Author: Mr.Deng
 * Date: 2018/10/19
 * Desc:
 */
public class WordCountJava {public static void main(String[] args) throws Exception {//构建环境
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //通过字符串构建数据集
        DataSet<String> text = env.fromElements("Who's there?",
                "I think I hear them. Stand, ho! Who's there?");
        //分割字符串、按照 key 进行分组、统计相同的 key 个数
        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .groupBy(0)
                .sum(1);
        //打印
        wordCounts.print();}
    //分割字符串的方法
    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {for (String word : line.split(" ")) {out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

3.运行

Flink 开发 IDEA 环境搭建与测试

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计9439字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中

星哥玩云

星哥玩云
星哥玩云
分享互联网知识
用户数
4
文章数
19350
评论数
4
阅读量
7960680
文章搜索
热门文章
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
我把用了20年的360安全卫士卸载了

我把用了20年的360安全卫士卸载了

我把用了 20 年的 360 安全卫士卸载了 是的,正如标题你看到的。 原因 偷摸安装自家的软件 莫名其妙安装...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛 NAS 中安装 Navidrome 音乐文件中文标签乱码问题解决、安装 FntermX 终端 问题背景 ...
阿里云CDN
阿里云CDN-提高用户访问的响应速度和成功率
随机文章
300元就能买到的”小钢炮”?惠普7L四盘位小主机解析

300元就能买到的”小钢炮”?惠普7L四盘位小主机解析

  300 元就能买到的 ” 小钢炮 ”?惠普 7L 四盘位小主机解析 最近...
星哥带你玩飞牛NAS-16:不再错过公众号更新,飞牛NAS搭建RSS

星哥带你玩飞牛NAS-16:不再错过公众号更新,飞牛NAS搭建RSS

  星哥带你玩飞牛 NAS-16:不再错过公众号更新,飞牛 NAS 搭建 RSS 对于经常关注多个微...
星哥带你玩飞牛NAS-2:飞牛配置RAID磁盘阵列

星哥带你玩飞牛NAS-2:飞牛配置RAID磁盘阵列

星哥带你玩飞牛 NAS-2:飞牛配置 RAID 磁盘阵列 前言 大家好,我是星哥之前星哥写了《星哥带你玩飞牛 ...
星哥带你玩飞牛NAS硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话?

星哥带你玩飞牛NAS硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话?

星哥带你玩飞牛 NAS 硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话? 前言 在选择 NAS 用预...
免费无广告!这款跨平台AI RSS阅读器,拯救你的信息焦虑

免费无广告!这款跨平台AI RSS阅读器,拯救你的信息焦虑

  免费无广告!这款跨平台 AI RSS 阅读器,拯救你的信息焦虑 在算法推荐主导信息流的时代,我们...

免费图片视频管理工具让灵感库告别混乱

一言一句话
-「
手气不错
多服务器管理神器 Nexterm 横空出世!NAS/Win/Linux 通吃,SSH/VNC/RDP 一站式搞定

多服务器管理神器 Nexterm 横空出世!NAS/Win/Linux 通吃,SSH/VNC/RDP 一站式搞定

多服务器管理神器 Nexterm 横空出世!NAS/Win/Linux 通吃,SSH/VNC/RDP 一站式搞...
星哥带你玩飞牛NAS-16:不再错过公众号更新,飞牛NAS搭建RSS

星哥带你玩飞牛NAS-16:不再错过公众号更新,飞牛NAS搭建RSS

  星哥带你玩飞牛 NAS-16:不再错过公众号更新,飞牛 NAS 搭建 RSS 对于经常关注多个微...
小白也能看懂:什么是云服务器?腾讯云 vs 阿里云对比

小白也能看懂:什么是云服务器?腾讯云 vs 阿里云对比

小白也能看懂:什么是云服务器?腾讯云 vs 阿里云对比 星哥玩云,带你从小白到上云高手。今天咱们就来聊聊——什...
星哥带你玩飞牛NAS硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话?

星哥带你玩飞牛NAS硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话?

星哥带你玩飞牛 NAS 硬件 01:捡垃圾的最爱双盘,暴风二期矿渣为何成不老神话? 前言 在选择 NAS 用预...
星哥带你玩飞牛NAS-16:飞牛云NAS换桌面,fndesk图标管理神器上线!

星哥带你玩飞牛NAS-16:飞牛云NAS换桌面,fndesk图标管理神器上线!

  星哥带你玩飞牛 NAS-16:飞牛云 NAS 换桌面,fndesk 图标管理神器上线! 引言 哈...