大数据实战项目——Flink案例遇到的坑
admin
2023-09-24 16:02:16
0

最近在用Flink和Spark的Structured Streaming基于同一个需求场景,做了一个对比测试,目的在于看这两种不同技术实现的流处理手段,在我们的实际开发场景中,哪个更加便捷和好用?


其实在我经历的这么多大大小小的大数据项目中,生产环境中我一直用的都是Spark,基于Spark生态,你能叫出名字的我基本上都用了个遍,而且很多功能都是深度使用的。


所以对于分布式计算引擎来说,个人认为,能够把Spark玩转,一定能满足你各种不同场景下的计算需求,如果不能,那只能说明你还不够熟。


当然,这里的玩转,需要你对其各个模块的功能、原理,底层实现都非常熟悉才可以,只有这样,当你面对棘手问题时,才能有足够的信心和底气去搞定它们。


熟悉我的小伙伴都知道,我一向对那些比较新的、广告打的特别多的技术有种本能的警惕,因为这一般就意味着该技术不够成熟、不够稳定、生态不够健全等等诸多弊病。


而Flink则是这其中的一位,它以更先进和更合理的流处理思想而闻名,那么今天咱就以一个非常简单、实用的场景来聊聊Flink的解决方案。


场景如下:

将kafka中的流式数据通过Flink,经过简单的清洗转换,最后写到HDFS文件系统,并将其保存为CSV(普通文本)格式。

够简单对不对,为了你更加清晰这个流程,咱简单画个图:





下面我将完成这件事情的具体步骤,记录如下:


1. 部署Flink on yarn

这个过程其实特别简单,只需要将flink官网提供的压缩包下载下来,然后放到一台你想提交flink任务的机器上就可以,也叫flink客户端机器。


为什么这么做,原因也很简单,因为原本我们有现成的大数据集群,而且我们当前的集群分布式任务都是通过yarn来管理的,目前已经管理了MR、Tez和Spark任务。


那突然加入一个Flink,当然也不会去搞特殊,也是为了避免增加系统的管理复杂度,同样想到用YARN来管理Flink任务。


跟Spark性质一样,但凡计算引擎,本质上就是一个不需要启动任何服务的SDK,你只需要找到一个能提交该任务的机器作为客户端就可以了,然后在提交Flink任务的时候,让它知道Hadoop的环境变量就可以,其他所有配置都不需要做任何更改。

因为你一旦设置了Hadoop的环境变量,那么就相当于把Hadoop相关的配置都告诉了当前用户,这里当然就包括yarn的集群地址。因此当该用户向yarn提交flink任务时,自然而然就提交成功。

事实证明确实如此,你只需要把在官网下载的压缩包,放到合适的目录下进行解压(我下载的次新版1.15.3),然后添加对应的用户,这里我用flink用户。



下载的flink版本

接着,在flink用户的环境配置文件里添加Hadoop的环境变量,这个flink部署就已经完成了。



添加Hadoop环境变量

是不是so easy?


部署完成后,那为了证明我们的部署是OK的,一定要跑个demo试试看,正好官网也给我们提供好了案例。


在当前机器的flink家目录下用flink用户运行如下命令:



flink on yarn的提交命令

当你看到如下输出的时候,证明你的flink on yarn环境已经部署好了:



flinkon yarn提交任务后的输出


2. Flink table API编码部分


部署flink环境确实很简单,但是接下来就是坑比较多的地方了。


首先是flink读取kafka,我最开始是想着用flink的Table API来处理,于是根据官方文档的要求,该引入的jar包也都引入了,也创建了kafka的source table,同时又创建了HDFS的Sink table,如下面的过程:



创建kafka source table 写hdfs文件


但是,运行报错了:



大概查了下资料,没有很快找到解决办法,算了,放弃,证明当前方式可能不好用,后面再找个专门时间去折腾。


3. FlinkDataStream API编码部分


果断选择用DataStream API来实现,虽然也有很多坑,但是好在最终调试通过,目前任务提交到yarn上运行了几个小时,一切正常。


具体代码代码如下,写的比较粗糙,仅提供你参考:

packagecom.anrygimportjava.time.Durationimportorg.apache.flink.api.common.eventtime.WatermarkStrategyimportorg.apache.flink.api.common.serialization.{SimpleStringEncoder,SimpleStringSchema}importorg.apache.flink.configuration.MemorySizeimportorg.apache.flink.connector.kafka.source.KafkaSourceimportorg.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerimportorg.apache.flink.core.fs.Pathimportorg.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkimportorg.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicyimportorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment/***@DESC:读取kafka数据,从DataStream到HDFS*@Auther:Anryg*@Date:2022/8/1419:08*/objectFlinkDSFromKafka2HDFS{defmain(args:Array[String]):Unit={valenv=StreamExecutionEnvironment.getExecutionEnvironment//获取流任务的环境变量valkafkaSource=KafkaSource.builder()//获取kafka数据源.setBootstrapServers("${kafka_node}:6667").setTopics("your_topic").setGroupId("FlinkDSFromKafka2HDFS2").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(newSimpleStringSchema()).build()importorg.apache.flink.streaming.api.scala._//引入隐私转换函数valkafkaDS=env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"kafka-data")//读取数据源生成DataStream对象valtargetDS=kafkaDS.map(line=>{//对数据源做简单的ETL处理line.split("\\|")}).filter(_.length==9).map(array=>(array(0),array(1),array(2),array(3),array(4),array(5),array(6),array(7),array(8)))/**基于flink1.14之后新的,文件系统的sink策略,跟官网提供的不一致,有坑*/valhdfsSink2=StreamingFileSink.forRowFormat(newPath("hdfs://${namenode}:8020/tmp/flink_sink2"),newSimpleStringEncoder[(String,String,String,String,String,String,String,String,String)]("UTF-8"))//.withBucketAssigner(newDateTimeBucketAssigner)/**默认基于时间分配器*/.withRollingPolicy(//设置文件的滚动策略,也就是分文件策略,也可以同时设置文件的命名规则,这里暂时用默认DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofSeconds(300))//文件滚动间隔,设为5分钟,即每5分钟生成一个新文件.withInactivityInterval(Duration.ofSeconds(20))//空闲间隔时间,也就是当前文件有多久没有写入数据,则进行滚动.withMaxPartSize(MemorySize.ofMebiBytes(500))//单个文件的最大文件大小,设置为500MB.build()).build()targetDS.addSink(hdfsSink2)//目标DataStream添加sink策略env.execute("FlinkDSFromKafka2HDFS")//启动任务}}

具体的代码已经上传到GitHub,有需要的小伙伴可自行下载。


GitHub地址:https://github.com/Anryg/internet_behavior_project


虽然是一个非常简单的需求,但是如果你是第一次玩,一定会遇到非常多的坑,下面就来给你细数一下,我现在还能想起来的坑有哪些:


  1. flink的新版本(具体版本不记得了),已经不再跟Hadoop绑定(目前我用的1.15.3就是),这样就导致当你把API写好,往HDFS写数据时,运行时发现,缺少了Hadoop的相关依赖,于是必须给加上,但是错误提示并不告诉你缺少了哪些,官网也没有说,你只能一个个来试;

  2. 按理说讲flink处理好的数据写入到HDFS,这个非常常用的文件系统应该是非常简单的事情,可是从官方文档来看,情况并不是这样的,我如果告诉你官方文档提供的API是错的,你信吗?


    官方文档提供的API是这样的:




    但是,1.15.2这个版本根本就没有这个类,然后我去maven中央仓库去查了下,发现这个类对应的jar包最高的版本也只到1.14.6。





    所以,只能去各种查资料,最后才知道,原来新版本的flink对于文件系统的Sink方式改成了用StreamingFileSink这个新类,坑爹的玩意,可是它在官网压根就没有提及,你只能各种找,然后凭经验去试错,才能最终把问题解决。

  3. 还有在查相关资料的时候,你会发现Flink的API特别的乱,同样都是写数据到HDFS,flink不同版本之间的实现方式都不一样,关键变化还不是一点点,如果之前的老版本,比如1.10左右的,想升级到比较新的版本,你会发现之前写的api完全都不能用,必须全部重新换。


4.跟Structured streaming的对比


因为之前这个需求就是用spark的structured streaming来实现的,记得第一次实现的时候,就很顺利,每一个步骤官方文档都有示例代码,几乎没什么坑。


因为spark跟Hadoop是天然集成的,因此在spark代码中你只要指定一个文件路径,它就默认是HDFS,因此这个需求用spark来完成基本上是分分钟就写出来了。


但是我用flink,花了至少半天时间,倒不是因为它难,而是在找资料的过程中,会有太多干扰你的东西,这个不对,那个也不行,你都不知道最后能相信谁,只能挨个去试错。


而这,就是用新技术必须要付出的代价,而这些坑,我几年前可没少趟...


Flink虽然有很多吐槽的地方,但是优点,咱还是要来夸一夸的:


  1. 默认情况下,任务运行时,控制台的输出非常的干净,除了用户代码的显示打印外,几乎没有任何多余的输出信息,给人的感觉非常的清爽,当然,它带来的一个坏处就是你不能清楚直观的看到任务的执行过程,以及每个过程都经历了什么;

  2. 数据经过shuffle操作后,会自动对任务的分区根据key进行调整,比如原来有10个分区,聚合后变成了3个key,那么partition数也自动变成了3个,而spark是不变的,除非人工去调整;

  3. 任务界面:各有利弊,Flink的显示界面默认每3秒刷新一次,而且可以非常清楚的看到运行的JVM的内存变化,可以及时的预判是否会发生OOM,这一点比spark做的好。

    但是Flink只能显示当前累计处理的数量大小,不能显示条数,以及不能显示数据的读取速度和数据处理速度,而这些spark的structured streaming是都可以的:





    Flink只能显示累计处理的数据量大小



    Spark能显示数据源读取的速率以及处理的速率


此外,spark还拥有比较丰富、直观的对数据处理的图表分析功能:




而这是Flink目前还不具备的,至少在flink on yarn里面没有这个功能。


最后

技术没有绝对的好坏,只有具体场景下基于成本考量下的取舍。


以上,就是我针对这个小小应用场景下对这两个技术的一次简单对比,后续我会用更多生产案例来对flink进行测试。


如果你有任何相关技术问题,欢迎找我交流...



你可以添加我的私人微信,拉你入技术讨论群,跟一群热爱技术的小伙伴一起长大...

相关内容