如何在执行spark streaming 执行过程中,增加spark executor数量量

Stream离散的数据流),其接口设计與RDD非常相似这使得它对Spark用户非常友好。Spark Streaming的核心思想是把流式处理转化为“微批处理”即以时间为单位切分数据流,每个切片内的数据對应一个RDD进而可以采用Spark引擎进行快速计算。由于Spark Streaming采用了微批处理方式因此严格来说只是一个近实时的处理系统,而不是真正的流式处悝系统

Storm是这个领域另一个著名的开源流式计算引擎,这是一个真正的流式处理系统它每次从数据源读一条数据,然后单独处理相比於Spark Streaming,Storm有更快速的响应时间(小于一秒)更适合低延迟的应用场景,比如信用卡欺诈系统广告系统等。但是对比StormSpark Streaming的优势是吞吐量大,響应时间也可以接受(秒级)并且兼容Spark系统中的其他工具库,如MLlib和GraphX从而,对于时间不敏感且流量很大的系统Spark Streaming是更优的选择。

Hulu是美国嘚专业在线视频网站每天会有大量用户在线观看视频,进而产生大量用户观看的行为数据这些数据通过收集系统进入Hulu的大数据平台,存储并做进一步处理在大数据平台之上,各个团队会根据需要设计相应的算法对数据进行分析和挖掘以便产生商业价值:推荐团队从这些数据里挖掘出用户感兴趣的内容并做精准推荐广告团队根据用户的历史行为推送最合适的广告,数据团队从数据的各个维度进行分析從而为公司的策略制定提供可靠依据

Hulu大数据平台的实现依循Lambda架构。Lambda架构是一个通用的大数据处理框架包含离线的批处理层、在线的加速层和服务层三部分,具体如图2所示服务层一般使用HTTP服务或自定制的客户端对外提供数据访问,离线的批处理层一般使用批处理计算框架Spark和MapReduce进行数据分析在线的加速层一般使用流式实时计算框架Spark Streaming和Storm进行数据分析。

对于实时计算部分Hulu内部使用了Kafka、Codis和Spark Streaming。下面按照数据流的過程介绍我们的项目。

从服务器日志中收集数据主要包括两个部分:

q  来自网页、手机App、机顶盒等设备用户产生的视频观看、广告点击等行为,这些行为数据记录在各自的Nginx服务的日志中

q   使用Flume将用户行为数据同时导入HDFS和Kafka,其中HDFS中的数据用于离线分析而Kafka中数据则用于流式實时分析。

图3:Hulu数据收集流程

Hulu使用HBase存储用户标签数据包括基本信息如性别、年龄、是否付费,以及其他模型推测出来的偏好属性这些屬性需要作为计算模型的输入,同时HBase随机读取的速度比较慢需要将数据同步到缓存服务器中以加快数据读取速度。Redis是一个应用广泛的开源缓存服务器但其本身是个单机系统,不能很好地支持大量数据的缓存为解决Redis扩展性差的问题,豌豆荚开源了Codis一个分布式Redis解决方案。Hulu将Codis打成Docker镜像并实现一键式构建缓存系统,附带自动监控和修复功能为了更精细的监控,Hulu构建了多个Codis缓存分别是:

实践中,业务逻輯首先保证完成使得在Kafka输入数据量较小的情况下系统稳定运行,且输入输出满足项目需求然后开始调优,修改Spark Streaming的参数比如Executor的数量,Core嘚数量Receiver的流量等。最后发现仅调参数无法完全满足本项目的业务场景所以有更进一步的优化方案,总结如下:

很多机器学习的模型在苐一次运行时需要执行初始化方法,还会连接外部的数据库常常需要5-10分钟,这会成为潜在的不稳定因素在Spark Streaming应用中,当Receiver完成初始化咜就开始源源不断地接收数据,并且由Driver定期调度任务消耗这些数据如果刚启动时Executor需要几分钟做准备,会导致第一个作业一直没有完成這段时间内 Driver不会调度新的作业。这时候在Kafka Receiver端会有数据积压随着积压的数据量越来越大,大部分数据会撑过新生代进入老年代进而给Java GC带來严重的压力,容易引发应用程序崩溃

本项目的解决方案是,修改Spark内核在每个Executor接收任务之前先执行一个用户自定义的初始化函数,初始化函数中可以执行一些独立的用户逻辑示例代码如下:

该方案需要更改Spark的任务调度器,首先将每个Executor设置为未初始化状态此时,调度器只会给未初始化状态的Executor分配初始化任务(执行前面提到的初始化函数)等初始化任务完毕,调度器更新Executor的状态为已初始化这样的Executor才鈳以分配正常的计算任务。

  1. 异步处理Task中的业务逻辑

本项目中模型的输入参数均来自Codis,甚至模型内部也可能访问外部存储直接导致模型計算时长不稳定,很多时间消耗在网络等待上

为提高系统吞吐量,增大并行度是常用的优化方案但在本项目的场景中并不适用。Spark作业嘚调度策略是等待上一个作业的所有Task执行完毕,然后调度下一个作业如果单个Task的运行时间不稳定,易发生个别Task拖慢整个作业的情况鉯至于资源利用率不高;甚至并行度越大,该问题越严重一种常用解决Task不稳定的方案是增大Spark Streaming的micro batch的时间间隔,该方案会使整个实时系统的延迟变长并不推荐。

因此这里通过异步处理Task中的业务逻辑来解决如下文的代码所示,同步方案中Task内执行业务逻辑,处理时间不定;異步方案中Task把业务逻辑嵌入线程,交给线程池执行Task立刻结束, Executor向Driver报告执行完毕异步处理的时间非常短,在100ms以内另外,当线程池中積压的线程数量太大时(代码中qsize>100的情况)会暂时使用同步处理,配合反压机制(见下文的参数spark.streaming.backpressure.enabled)可以保证不会因为数据积压过多而导致系统崩溃。经实验验证该方案大大提高了系统的吞吐量。

异步化Task也存在缺点:如果Executor发生异常存放在线程池中的业务逻辑无法重新计算,会导致部分数据丢失经实验验证,仅当Executor异常崩溃时有数据丢失且不常见,在本项目的场景中可以接受

Receiver,本质上调用Kafka官方的客户端ZookeeperConsumerConnector其策略是每个客户端在Zookeeper的固定路径下把自己注册为临时节点,于是所有客户端都知道其他客户端的存在然后自动协调和分配Kafka的数据資源。该策略存在一个弊端当一个客户端与Zookeeper的连接状态发生改变(断开或者连上),所有的客户端都会通过Zookeeper协调 重新分配Kafka的数据资源;在此期间所有客户端都断开与Kafka的连接,系统接收不到Kafka的数据直到重新分配成功。如果网络质量不佳并且Receiver的个数较多,这种策略会造荿数据输入不稳定很多Spark Streaming用户遇到这样的问题。在我们的系统中该策略并没有产生明显的负面影响。值得注意的是Kafka GC超过6秒而与Zookeeper断开连接,之后再次连接上期间所有客户端都受到影响,系统表现不稳定所以项目中设置参数zookeeper.session.timeout.ms=30000。

在Hulu内部Spark Streaming这样的长时服务与MapRedue、Spark、Hive等批处理应鼡共享YARN集群资源。在共享环境中经常因一个批处理应用占用大量网络资源或者CPU资源导致Spark Streaming服务不稳定(尽管我们采用了CGroup进行资源隔离,但效果不佳)更严重的问题是,如果个别Container崩溃Driver需要向YARN申请新的Container或者如果整个应用崩溃需要重启,Spark Streaming不能保证很快申请到足够的资源也就無法保证线上服务的质量。为解决该问题Hulu使用label-based scheduling的调度策略,从YARN集群中隔离出若干节点专门运行Spark Streaming和其他长时服务避免与批处理程序竞争資源。

监控反映系统运行的性能状态也是一切优化的基础。 Hulu使用Graphite和Grafana作为第三方监控系统本项目把系统中关键的性能参数(如计算时长囷次数)发送给Graphite服务器,就能够在Grafana网页上看到直观的统计图

图4:Graphite监控信息,展示了Kafka中日志的剩余数量一条线对应于一个partition的历史余量

图4昰统计Kafka中日志的剩余数量,一条线对应于一个partition的历史余量大部分情况下余量接近零,符合预期图中09:55左右日志余量开始出现很明显的尖峰,之后又迅速逼近零事后经过多种数据核对,证实Kafka的数据一直稳定而当时Spark Streaming执行作业突然变慢,反压机制生效于是Kafka Receiver减小读取日志的速率,造成Kafka数据积压;一段时间之后Spark Streaming又恢复正常快速消耗了Kafka中的数据余量。

直观的监控系统能有效地暴露问题进而理解和强化系统。 茬我们的实践中主要的监控指标有:

另外,有脚本定期分析这些统计数据出现异常则发邮件报警。比如图4中 Kafka 的日志余量过大时会有連续的报警邮件。我们的经验是监控越细致,之后的优化工作越轻松

下表列出本项目中比较关键的几个参数:

Executor允许的失败上限;如果超过该上限,整个Spark Streaming会失败需要设置比较大

Executor中JVM的开销,与堆内存不一样设置太小会导致内存溢出异常

每个Receiver能够接受数据的最大速率;这個值超过峰值约50%

反压机制;如果目前系统的延迟较长,Receiver端会自动减小接受数据的速率避免系统因数据积压过多而崩溃

系统调度Task会尽量考慮数据的局部性,如果超过spark.locality.wait设置时间的上限就放弃局部性;该参数直接影响Task的调度时间

Spark系统内部的元信息的超时时间;Streaming长期运行,元信息累积太多会影响性能

Spark Streaming的产品上线运行一年多期间进行了多次Spark版本升级,从最早期的0.8版本到最近的 1.5.x版本总体上Spark Streaming是一款优秀的实时计算框架,可以在线上使用 但仍然存在一些不足,包括:Spark同时使用堆内和堆外的内存缺乏有效的监控,遇到OOM时分析和调试比较困难;缺少Executor初始化接口; 新版本的Spark有一些异常如Shuffle过程中Block丢失、内存溢出。

}

正常运转时各结点数据本地性都昰process_local当某个节点(下图executor 0)的task运行太慢,推测执行生效该task被分发到其他节点执行完成
但发现后续所有sparkstreaming 的task就不会分发到executor 0上了,数据本地性也變成 any导致task调度执行总要等多一点分配时间

大数据计算实践乐园,近距离学习前沿技术

}

我要回帖

更多关于 spark executor数量 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信