小记--------sparkstreaming常驻yarn调度程序调优

 
#!/bin/bash
#hadoop
export HADOOP_HOME=/opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24
export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH
 
 
realtime_queue=root
receiver_max_rate=100
receiver_initial_rate=30
my_job_name="reservation"
main_class="com.df.RealTime_Increment_reservation"
 
 
/opt/cdh/spark-2.1.0-bin-2.6.0-cdh5.14.0/bin/spark-submit --master yarn --deploy-mode cluster \
--name ${my_job_name} \
--class ${main_class} \
--queue ${realtime_queue} \
--driver-memory 2g     \
--executor-memory 4g     \
--executor-cores 2     \
 
#driver端配置log4j
--conf spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties \
 
#executor端配置log4j
--conf spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties \
 
#使用kyro序列化
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer  \
 
#数据本地化等待时长10s,默认3s  (尽可能减少网络传输)
--conf spark.locality.wait=10  \
 
#程序失败最大次数为8
--conf spark.task.maxFailures=8  \
 
#允许在 webUI将stage和相应的job杀死
--conf spark.ui.killEnabled=false  \
 
#SparkContext启动时,记录有效sparkconf信息
--conf spark.logConf=true  \
 
#sparkstreaming接收器将接受数据合并成数据块并存储在spark的时间间隔(毫秒)默认为200
--conf spark.streaming.blockInterval=200  \
 
#数据在写入配置的checkpoint目录中的预写日志
    #这种机制可以让driver在恢复的时候,避免数据丢失,确保整个实时计算过程中不丢失数据
--conf spark.streaming.receiver.writeAheadLog.enable=true  \
 
#spark streaming 反压机制 默认false
  #反压机制能够通过动态收集系统的一些数据来自动地适配集群数据处理能力
--conf spark.streaming.backpressure.enabled=true \
 
#估算最低费率默认值为100
--conf spark.streaming.backpressure.pid.minRate=10  \
#------------------------------------------------------------
    #spark1.5之前;限制每个receiver每秒最大可以接收的记录的数据
--conf spark.streaming.receiver.maxRate=${receiver_max_rate} \
    #spark1.5之前;限制每次作业中每个kafka分区最多读取的记录条数
--conf spark.streaming.kafka.maxRatePerPartition=${receiver_max_rate}  \
#InputDStreams内部的RateController里面会存下计算好的最大速率,将速率推送到ReceiverSupervisorImpl。
#最后到底接受多少数据取决于三者的最小值。也就是每个接收器或者每个kafka分区每秒处理的数据不会超过maxRate、maxRatePerPartition的值
#------------------------------------------------------------
#启用反压机制时每个接收器接收第一批数据的初始最大速率
--conf spark.streaming.backpressure.initialRate=${receiver_initial_rate}  \
--conf spark.yarn.driver.memoryOverhead=512  \
--conf spark.yarn.executor.memoryOverhead=1024 \
 
#配置重新运行应用程序的最大尝试次数为4
--conf spark.yarn.maxAppAttempts=4 \
 
#配置重新运行程序的次数计数器重置时间为:1小时
--conf spark.yarn.am.attemptFailuresValidityInterval=1h \
 
#配置程序发生故障之前executor失败数量的计数器重置时间为:1小时
--conf spark.yarn.executor.failuresValidityInterval=1h \
 
#关闭推测执行任务,防止资源被全部占用,不能及时释放
  #推测任务是指针对一个stage里面拖后腿的Task,会在其他节点的executor上再次启动这个task,如果其中一个Task实例运行成功则将这个最先完成的Task的计算结果作为最终结果,同时会干掉其他executor上运行的实例,spark推测式执行默认是关闭的。。。
--conf spark.speculation=false \
 
#广播等待超时时间默认300s(单位:s);用于大小表进行join时,将小表广播到所有worker节点提升性能
--conf spark.sql.broadcastTimeout=600 \
 
#shuffle分区数默认200;根据数据量大小,资源大小进行调整或大或小
--conf spark.sql.shuffle.partitions=800 \
 
#spark执行任务并行度设置只对RDD有效,对SQL无效;spark.sql.shuffle.partitions(spark SQL 有效)
--conf spark.default.parallelism=600 \
 
#JVM堆内存中M的百分比(默认为0.6)
--conf spark.memory.fraction=0.8 \
 
#阻塞队列大小默认为10000
--conf spark.scheduler.listenerbus.eventqueue.size=100000 \
/opt/cdh/submit/reservation/Thermodynamic-1.0-SNAPSHOT.jar 100 df1:9092,df2:9092,df3:9092 reservation df_reservation_3 df2:2181,df3:2181,df4:2181,df5:2181,df6:2181
 
 
 

相关推荐