阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Flink开发IDEA环境搭建与测试

173次阅读
没有评论

共计 9439 个字符,预计需要花费 24 分钟才能阅读完成。

一.IDEA 开发环境

1.pom 文件设置

<properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <Hadoop.version>2.7.6</hadoop.version>
        <flink.version>1.6.1</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.22</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <!-- <arg>-make:transitive</arg> -->
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.apache.spark.WordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

2.flink 开发流程

Flink 具有特殊类 DataSetDataStream在程序中表示数据。您可以将它们视为可以包含重复项的不可变数据集合。DataSet 数据有限 的情况下,对于一个 DataStream 元素的数量可以是无界的。

这些集合在某些关键方面与常规 Java 集合不同。首先,它们是不可变的,这意味着一旦创建它们就无法添加或删除元素。你也不能简单地检查里面的元素。

集合最初通过在弗林克程序添加源创建和新的集合从这些通过将它们使用 API 方法如衍生 mapfilter 等等。

Flink 程序看起来像是转换数据集合的常规程序。每个程序包含相同的基本部分:

1. 获取 execution environment,

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2. 加载 / 创建初始化数据

DataStream<String> text = env.readTextFile("file:///path/to/file");

3. 指定此数据的转换

val mapped = input.map {x => x.toInt}

4. 指定放置计算结果的位置

writeAsText(String path)

print()

5. 触发程序执行

在 local 模式下执行程序

execute()

将程序达成 jar 运行在线上

./bin/flink run \

-m node21:8081 \

./examples/batch/WordCount.jar \

–input  hdfs:///user/admin/input/wc.txt \

–output  hdfs:///user/admin/output2  \

二. Wordcount 案例

1.Scala 代码

package com.xyg.streaming

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
  * Author: Mr.Deng
  * Date: 2018/10/15
  * Desc:
  */
object SocketWindowWordCountScala {def main(args: Array[String]) : Unit = {// 定义一个数据类型保存单词出现的次数
    case class WordWithCount(word: String, count: Long)
    // port 表示需要连接的端口
    val port: Int = try {ParameterTool.fromArgs(args).getInt("port")
    } catch {case e: Exception => {System.err.println("No port specified. Please run'SocketWindowWordCount --port <port>'")
        return
      }
    }
    // 获取运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 连接此 socket 获取输入数据
    val text = env.socketTextStream("node21", port, '\n')
    //需要加上这一行隐式转换 否则在调用 flatmap 方法的时候会报错
    import org.apache.flink.api.scala._
    // 解析数据, 分组, 窗口化, 并且聚合求 SUM
    val windowCounts = text
      .flatMap {w => w.split("\\s") }
      .map {w => WordWithCount(w, 1) }
      .keyBy("word")
      .timeWindow(Time.seconds(5), Time.seconds(1))
      .sum("count")
    // 打印输出并设置使用一个并行度
    windowCounts.print().setParallelism(1)
    env.execute("Socket Window WordCount")
  }
}

2.Java 代码

package com.xyg.streaming;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * Author: Mr.Deng
 * Date: 2018/10/15
 * Desc: 使用 flink 对指定窗口内的数据进行实时统计,最终把结果打印出来
 *       先在 node21 机器上执行 nc -l 9000
 */
public class StreamingWindowWordCountJava {public static void main(String[] args) throws Exception {//定义 socket 的端口号
    int port;
    try{ParameterTool parameterTool = ParameterTool.fromArgs(args);
        port = parameterTool.getInt("port");
    }catch (Exception e){System.err.println("没有指定 port 参数,使用默认值 9000");
        port = 9000;
    }
    //获取运行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //连接 socket 获取输入的数据
    DataStreamSource<String> text = env.socketTextStream("node21", port, "\n");
    //计算数据
    DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {public void flatMap(String value, Collector<WordWithCount> out) throws Exception {String[] splits = value.split("\\s");
            for (String word:splits) {out.collect(new WordWithCount(word,1L));
            }
        }
    })//打平操作,把每行的单词转为 <word,count> 类型的数据
            //针对相同的 word 数据进行分组
            .keyBy("word")
            //指定计算数据的窗口大小和滑动窗口大小
            .timeWindow(Time.seconds(2),Time.seconds(1))
            .sum("count");
    //把数据打印到控制台, 使用一个并行度
    windowCount.print().setParallelism(1);
    //注意:因为 flink 是懒加载的,所以必须调用 execute 方法,上面的代码才会执行
    env.execute("streaming word count");
}

    /**
     * 主要为了存储单词以及单词出现的次数
     */
    public static class WordWithCount{public String word;
        public long count;
        public WordWithCount(){}
        public WordWithCount(String word, long count) {this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }

}

3.运行测试

首先,使用 nc 命令启动一个本地监听,命令是:

[admin@node21 ~]$ nc -l 9000

通过 netstat 命令观察 9000 端口。netstat -anlp | grep 9000,启动监听 如果报错:-bash: nc: command not found,请先安装 nc,在线安装命令:yum -y install nc

然后,IDEA 上运行 flink 官方案例程序

node21 上输入

Flink 开发 IDEA 环境搭建与测试

IDEA 控制台输出如下

Flink 开发 IDEA 环境搭建与测试

4.集群测试

这里单机测试官方案例

[admin@node21 flink-1.6.1]$ pwd
/opt/flink-1.6.1
[admin@node21 flink-1.6.1]$ ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node21.
Starting taskexecutor daemon on host node21.
[admin@node21 flink-1.6.1]$ jps
2100 StandaloneSessionClusterEntrypoint
2518 TaskManagerRunner
2584 Jps
[admin@node21 flink-1.6.1]$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

程序连接到套接字并等待输入。您可以检查 Web 界面以验证作业是否按预期运行:

Flink 开发 IDEA 环境搭建与测试

Flink 开发 IDEA 环境搭建与测试

单词在 5 秒的时间窗口(处理时间,翻滚窗口)中计算并打印到 stdout 监视 TaskManager 的输出文件并写入一些文本nc(输入在点击后逐行发送到 Flink):

Flink 开发 IDEA 环境搭建与测试

Flink 开发 IDEA 环境搭建与测试

三. 使用 IDEA 开发离线程序

Dataset 是 flink 的常用程序,数据集通过 source 进行初始化,例如读取文件或者序列化集合,然后通过 transformation(filtering、mapping、joining、grouping)将数据集转成,然后通过 sink 进行存储,既可以写入 hdfs 这种分布式文件系统,也可以打印控制台,flink 可以有很多种运行方式,如 local、flink 集群、yarn 等.

1. scala程序

package com.xyg.batch

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

/**
  * Author: Mr.Deng
  * Date: 2018/10/19
  * Desc:
  */
object WordCountScala{def main(args: Array[String]) {//初始化环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //从字符串中加载数据
    val text = env.fromElements("Who's there?",
      "I think I hear them. Stand, ho! Who's there?")
    //分割字符串、汇总 tuple、按照 key 进行分组、统计分组后 word 个数
    val counts = text.flatMap {_.toLowerCase.split("\\W+") filter {_.nonEmpty} }
      .map {(_, 1) }
      .groupBy(0)
      .sum(1)
    //打印
    counts.print()}
}

2. java程序

package com.xyg.batch;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * Author: Mr.Deng
 * Date: 2018/10/19
 * Desc:
 */
public class WordCountJava {public static void main(String[] args) throws Exception {//构建环境
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //通过字符串构建数据集
        DataSet<String> text = env.fromElements("Who's there?",
                "I think I hear them. Stand, ho! Who's there?");
        //分割字符串、按照 key 进行分组、统计相同的 key 个数
        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .groupBy(0)
                .sum(1);
        //打印
        wordCounts.print();}
    //分割字符串的方法
    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {for (String word : line.split(" ")) {out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

3.运行

Flink 开发 IDEA 环境搭建与测试

正文完
星哥说事-微信公众号
post-qrcode
 
星锅
版权声明:本站原创文章,由 星锅 2022-01-21发表,共计9439字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中