阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

CentOS7.5搭建Flink1.6.1分布式集群详解

179次阅读
没有评论

共计 12055 个字符,预计需要花费 31 分钟才能阅读完成。

一. Flink 的下载

安装包下载地址:http://flink.apache.org/downloads.html,选择对应 Hadoop 的 Flink 版本下载

CentOS7.5 搭建 Flink1.6.1 分布式集群详解

[admin@node21 software]$ wget http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.6.1/flink-1.6.1-bin-hadoop27-scala_2.11.tgz
[admin@node21 software]$ ll
-rw-rw-r-- 1 admin admin 301867081 Sep 15 15:47 flink-1.6.1-bin-hadoop27-scala_2.11.tgz

Flink 有三种部署模式,分别是 Local、Standalone Cluster 和 Yarn Cluster。

二.Local 模式

对于 Local 模式来说,JobManager 和 TaskManager 会公用一个 JVM 来完成 Workload。如果要验证一个简单的应用,Local 模式是最方便的。实际应用中大多使用 Standalone 或者 Yarn Cluster,而 local 模式只是将安装包解压启动(./bin/start-local.sh)即可,在这里不在演示。

三.Standalone 模式

快速入门教程地址:https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.html

1.  软件要求

  • Java 1.8.x或更高版本,
  • ssh(必须运行 sshd 才能使用管理远程组件的 Flink 脚本)

集群部署规划

节点名称  master worker zookeeper
node21  master   zookeeper
node22  master  worker zookeeper
node23    worker zookeeper

2.解压

[admin@node21 software]$ tar zxvf flink-1.6.1-bin-hadoop27-scala_2.11.tgz -C /opt/module/
[admin@node21 software]$ cd /opt/module/
[admin@node21 module]$ ll
drwxr-xr-x 8 admin admin 125 Sep 15 04:47 flink-1.6.1

3.修改配置文件

[admin@node21 conf]$ ls
flink-conf.yaml       log4j-console.properties  log4j-yarn-session.properties  logback.xml       masters  sql-client-defaults.yaml
log4j-cli.properties  log4j.properties          logback-console.xml            logback-yarn.xml  slaves   zoo.cfg

修改 flink/conf/masters,slaves,flink-conf.yaml

[admin@node21 conf]$ sudo vi masters
node21:8081
[admin@node21 conf]$ sudo vi slaves
node22
node23
[admin@node21 conf]$ sudo vi flink-conf.yaml 
taskmanager.numberOfTaskSlots:2
jobmanager.rpc.address: node21

可选配置:

  • 每个 JobManager(jobmanager.heap.mb的可用内存量
  • 每个 TaskManager(taskmanager.heap.mb的可用内存量
  • 每台机器的可用 CPU 数量(taskmanager.numberOfTaskSlots),
  • 集群中的 CPU 总数(parallelism.default)和
  • 临时目录(taskmanager.tmp.dirs

4.拷贝安装包到各节点

[admin@node21 module]$ scp -r flink-1.6.1/ admin@node22:`pwd`
[admin@node21 module]$ scp -r flink-1.6.1/ admin@node23:`pwd`

5.配置环境变量

配置所有节点 Flink 的环境变量

[admin@node21 flink-1.6.1]$ sudo vi /etc/profile
export FLINK_HOME=/opt/module/flink-1.6.1
export PATH=$PATH:$FLINK_HOME/bin
[admin@node21 flink-1.6.1]$ source /etc/profile

6.启动 flink

[admin@node21 flink-1.6.1]$ ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node21.
Starting taskexecutor daemon on host node22.
Starting taskexecutor daemon on host node23.

jps 查看进程

CentOS7.5 搭建 Flink1.6.1 分布式集群详解

CentOS7.5 搭建 Flink1.6.1 分布式集群详解

CentOS7.5 搭建 Flink1.6.1 分布式集群详解

7.  WebUI 查看

http://node21:8081

CentOS7.5 搭建 Flink1.6.1 分布式集群详解

8.Flink 的 HA

首先,我们需要知道 Flink 有两种部署的模式,分别是 Standalone 以及 Yarn Cluster 模式。对于 Standalone 来说,Flink 必须依赖于 Zookeeper 来实现 JobManager 的 HA(Zookeeper 已经成为了大部分开源框架 HA 必不可少的模块)。在 Zookeeper 的帮助下,一个 Standalone 的 Flink 集群会同时有多个活着的 JobManager,其中只有一个处于工作状态,其他处于 Standby 状态。当工作中的 JobManager 失去连接后(如宕机或 Crash),Zookeeper 会从 Standby 中选举新的 JobManager 来接管 Flink 集群。

对于 Yarn Cluaster 模式来说,Flink 就要依靠 Yarn 本身来对 JobManager 做 HA 了。其实这里完全是 Yarn 的机制。对于 Yarn Cluster 模式来说,JobManager 和 TaskManager 都是被 Yarn 启动在 Yarn 的 Container 中。此时的 JobManager,其实应该称之为 Flink Application Master。也就说它的故障恢复,就完全依靠着 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 一样)。由于完全依赖了 Yarn,因此不同版本的 Yarn 可能会有细微的差异。这里不再做深究。

1)修改配置文件

修改 flink-conf.yaml,HA 模式下,jobmanager 不需要指定,在 master file 中配置,由 zookeeper 选出 leader 与 standby。

#jobmanager.rpc.address: node21
high-availability:zookeeper                             #指定高可用模式(必须)high-availability.zookeeper.quorum:node21:2181,node22:2181,node23:2181  #ZooKeeper 仲裁是 ZooKeeper 服务器的复制组,它提供分布式协调服务(必须)high-availability.storageDir:hdfs:///flink/ha/       #JobManager 元数据保存在文件系统 storageDir 中,只有指向此状态的指针存储在 ZooKeeper 中(必须)
high-availability.zookeeper.path.root:/flink         #根 ZooKeeper 节点,在该节点下放置所有集群节点(推荐)high-availability.cluster-id:/flinkCluster           #自定义集群(推荐)state.backend: filesystem
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/checkpoints

修改 conf/zoo.cfg

server.1=node21:2888:3888
server.2=node22:2888:3888
server.3=node23:2888:3888

修改 conf/masters

node21:8081
node22:8081

修改 slaves

node22
node23

同步配置文件 conf 到各节点

2)启动 HA

先启动 zookeeper 集群各节点(测试环境中也可以用 Flink 自带的 start-zookeeper-quorum.sh),启动 dfs , 再启动 flink

[admin@node21 flink-1.6.1]$ start-cluster.sh

CentOS7.5 搭建 Flink1.6.1 分布式集群详解

WebUI 查看,这是会自动产生一个主 Master,如下

CentOS7.5 搭建 Flink1.6.1 分布式集群详解

3)验证 HA

CentOS7.5 搭建 Flink1.6.1 分布式集群详解

手动杀死 node22 上的 master,此时,node21 上的备用 master 转为主 mater。

CentOS7.5 搭建 Flink1.6.1 分布式集群详解

4)手动将 JobManager / TaskManager 实例添加到群集

您可以使用 bin/jobmanager.shbin/taskmanager.sh脚本 将 JobManager 和 TaskManager 实例添加到正在运行的集群中

添加 JobManager

bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all

添加 TaskManager

bin/taskmanager.sh start|start-foreground|stop|stop-all

[admin@node22 flink-1.6.1]$ jobmanager.sh start node22

新添加的为从 master。

9.运行测试任务

[admin@node21 flink-1.6.1]$ flink run -m node21:8081 ./examples/batch/WordCount.jar --input /opt/wcinput/wc.txt --output /opt/wcoutput/
[admin@node21 flink-1.6.1]$ flink run -m node21:8081 ./examples/batch/WordCount.jar --input hdfs:///user/admin/input/wc.txt --output hdfs:///user/admin/output2

CentOS7.5 搭建 Flink1.6.1 分布式集群详解

四.Yarn Cluster 模式

1.引入

在一个企业中,为了最大化的利用集群资源,一般都会在一个集群中同时运行多种类型的 Workload。因此 Flink 也支持在 Yarn 上面运行。首先,让我们通过下图了解下 Yarn 和 Flink 的关系。

CentOS7.5 搭建 Flink1.6.1 分布式集群详解

在图中可以看出,Flink 与 Yarn 的关系与 MapReduce 和 Yarn 的关系是一样的。Flink 通过 Yarn 的接口实现了自己的 App Master。当在 Yarn 中部署了 Flink,Yarn 就会用自己的 Container 来启动 Flink 的 JobManager(也就是 App Master)和 TaskManager。

启动新的 Flink YARN 会话时,客户端首先检查所请求的资源(容器和内存)是否可用。之后,它将包含 Flink 和配置的 jar 上传到 HDFS(步骤 1)。

客户端的下一步是请求(步骤 2)YARN 容器以启动 ApplicationMaster(步骤 3)。 由于客户端将配置和 jar 文件注册为容器的资源,因此在该特定机器上运行的 YARN 的 NodeManager 将负责准备容器(例如,下载文件)。完成后,将 启动ApplicationMaster(AM)。

JobManager 和 AM 在同一容器中运行。一旦它们成功启动,AM 就知道 JobManager(它自己的主机)的地址。它正在为 TaskManagers 生成一个新的 Flink 配置文件(以便它们可以连接到 JobManager)。该文件也上传到 HDFS。此外,AM容器还提供 Flink 的 Web 界面。YARN 代码分配的所有端口都是 临时端口 这允许用户并行执行多个 Flink YARN 会话。

之后,AM 开始为 Flink 的 TaskManagers 分配容器,这将从 HDFS 下载 jar 文件和修改后的配置。完成这些步骤后,即可建立 Flink 并准备接受作业。

2.修改环境变量

export  HADOOP_CONF_DIR= /opt/module/hadoop-2.7.6/etc/hadoop

3.部署启动 

[admin@node21 flink-1.6.1]$ yarn-session.sh -d -s 2 -tm 800 -n 2
-n : TaskManager 的数量,相当于 executor 的数量
-s : 每个 JobManager 的 core 的数量,executor-cores。建议将 slot 的数量设置每台机器的处理器数量
-tm : 每个 TaskManager 的内存大小,executor-memory
-jm : JobManager 的内存大小,driver-memory

上面的命令的意思是,同时向 Yarn 申请 3 个 container,其中 2 个 Container 启动 TaskManager(-n 2),每个 TaskManager 拥有两个 Task Slot(-s 2),并且向每个 TaskManager 的 Container 申请 800M 的内存,以及一个 ApplicationMaster(Job Manager)。

CentOS7.5 搭建 Flink1.6.1 分布式集群详解

Flink 部署到 Yarn Cluster 后,会显示 Job Manager 的连接细节信息。

Flink on Yarn 会覆盖下面几个参数,如果不希望改变配置文件中的参数,可以动态的通过 - D 选项指定,如 -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368

jobmanager.rpc.address:因为 JobManager 会经常分配到不同的机器上

taskmanager.tmp.dirs:使用 Yarn 提供的 tmp 目录

parallelism.default:如果有指定 slot 个数的情况下

yarn-session.sh 会挂起进程,所以可以通过在终端使用 CTRL+ C 或输入 stop 停止 yarn-session。

如果不希望 Flink Yarn client 长期运行,Flink 提供了一种 detached YARN session,启动时候加上参数 - d 或—detached

在上面的命令成功后,我们就可以在 Yarn Application 页面看到 Flink 的纪录。如下图。

CentOS7.5 搭建 Flink1.6.1 分布式集群详解

如果在虚拟机中测试,可能会遇到错误。这里需要注意内存的大小,Flink 向 Yarn 会申请多个 Container,但是 Yarn 的配置可能限制了 Container 所能申请的内存大小,甚至 Yarn 本身所管理的内存就很小。这样很可能无法正常启动 TaskManager,尤其当指定多个 TaskManager 的时候。因此,在启动 Flink 之后,需要去 Flink 的页面中检查下 Flink 的状态。这里可以从 RM 的页面中,直接跳转(点击 Tracking UI)。这时候 Flink 的页面如图

CentOS7.5 搭建 Flink1.6.1 分布式集群详解

yarn-session.sh 启动命令参数如下:

[admin@node21 flink-1.6.1]$ yarn-session.sh --help
Usage:
   Required
     -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)
   Optional
     -D <property=value>             use value for given property
     -d,--detached                   If present, runs the job in detached mode
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId <arg>       Attach to running YARN session
     -j,--jar <arg>                  Path to Flink jar file
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -m,--jobmanager <arg>           Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified i
n the configuration.     -n,--container <arg>            Number of YARN container to allocate (=Number of Task Managers)
     -nl,--nodeLabel <arg>           Specify YARN node label for the YARN application
     -nm,--name <arg>                Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -st,--streaming                 Start Flink in streaming mode
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode

4.提交任务

之后,我们可以通过这种方式提交我们的任务

[admin@node21 flink-1.6.1]$ ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar --input /opt/wcinput/wc.txt --output /opt/wcoutput/

以上命令在参数前加上 y 前缀,-yn 表示 TaskManager 个数。

在这个模式下,同样可以使用 -m yarn-cluster 提交一个 ” 运行后即焚 ” 的 detached yarn(-yd)作业到 yarn cluster。

5.停止 yarn cluster

yarn application -kill application_1539058959130_0001

6.Yarn 模式的 HA

应用最大尝试次数(yarn-site.xml),您必须配置为尝试应用的最大数量的设置yarn-site.xml,当前 YARN 版本的默认值为 2(表示允许单个 JobManager 失败)。

<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>4</value>
  <description>The maximum number of application master execution attempts</description>
</property>

申请尝试(flink-conf.yaml),您还必须配置最大尝试次数conf/flink-conf.yaml:yarn.application-attempts:10

示例:高度可用的 YARN 会话

  1. 配置 HA 模式和 zookeeper 法定人数 conf/flink-conf.yaml

    high-availability: zookeeper
    high-availability.zookeeper.quorum: node21:2181,node22:2181,node23:2181
    high-availability.storageDir: hdfs:///flink/recovery
    high-availability.zookeeper.path.root: /flink
    yarn.application-attempts: 10
  2. 配置 ZooKeeper 的服务器 conf/zoo.cfg(目前它只是可以运行每台机器的单一的 ZooKeeper 服务器):

    server.1=node21:2888:3888
    server.2=node22:2888:3888
    server.3=node23:2888:3888
  3. 启动 ZooKeeper 仲裁

    $ bin / start-zookeeper-quorum.sh
  4. 启动 HA 群集

    $ bin / yarn-session.sh -n 2

五. 错误异常

1. 身份认证失败

[root@node21 flink-1.6.1]# flink run examples/streaming/SocketWindowWordCount.jar --port 9000
Starting execution of program

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: b7a99ac5db242290413dbebe32ba52b0)
    at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
    at org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

通过查看日志,发现有如下报错

2018-10-20 02:32:19,668 ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - Authentication failed

解决法案:添加定时任务认证kerberos

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计12055字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中