基于 Spark 的资料分析实践
基于 Spark 的资料分析实践 转载本文需注明出处:微信公众号EAWorld,违者必究。 本文主要分析了 Spark RDD 以及 RDD 作为开发的不足之处,介绍了 SparkSQL 对已有的常见资料系统的操作方法,以及重点介绍了普元在众多资料开发专案中总结的基于 SparkSQL Flow 开发框架。 二、基于Spark RDD资料开发的不足 三、SparkSQL 四、SparkSQL Flow RDD具有资料流模型的特点:自动容错、位置感知性排程和可伸缩性。 //Scala 在内存中使用列表建立 val lines = List(“A”, “B”, “C”, “D” …) val rdd:RDD = sc.parallelize(lines); //以文字档案建立 val rdd:RDD[String] = sc.textFile(“hdfs://path/filename”) Spark RDD Partition 分区划分 新版本的 Hadoop 已经把 BlockSize 改为 128M,也就是说每个分割槽处理的资料量更大。 Spark 读取档案分割槽的核心原理 本质上,Spark 是利用了 Hadoop 的底层对资料进行分割槽的 API(InputFormat): public abstract class InputFormat{ public abstract List getSplits(JobContextcontext ) throwsIOException,InterruptedException; public abstract RecordReader createRecordReader(InputSplitsplit, TaskAttemptContextcontext )throwsIOException,InterruptedException; } Spark 任务提交后通过对输入进行 Split,在 RDD 构造阶段,只是判断是否可 Split(如果引数异常一定在此阶段报出异常),并且 Split 后每个 InputSplit 都是一个分割槽。只有在Action 算子提交后,才真正用 getSplits 返回的 InputSplit 通过 createRecordReader 获得每个 Partition 的连线。 然后通过 RecordReader 的 next() 遍历分割槽内的资料。 Spark RDD 转换函式和提交函式 Spark RDD 的众多函式可分为两大类Transformation 与 Action。Transformation 与 Action 的区别在于,对 RDD 进行 Transformation 并不会触发计算:Transformation 方法所产生的 RDD 物件只会记录住该 RDD 所依赖的 RDD 以及计算产生该 RDD 的资料的方式;只有在使用者进行 Action 操作时,Spark 才会排程 RDD 计算任务,依次为各个 RDD 计算资料。这就是 Spark RDD 内函式的“懒载入”特性。 即使 SparkRDD 相对于 MapReduce 提高很大的便利性,但在使用上仍然有许多问题。体现在一下几个方面: RDD 函式众多,开发者不容易掌握,部分函式使用不当 shuffle时造成资料倾斜影响效能;RDD 关注点仍然是Spark太底层的 API,基于 Spark RDD的开发是基于特定语言(Scala,Python,Java)的函式开发,无法以资料的视界来开发资料;对 RDD 转换算子函式内部分常量、变数、广播变数使用不当,会造成不可控的异常;对多种资料开发,需各自开发RDD的转换,样板程式码较多,无法有效重利用;在执行期可能发生的异常。如:物件无法序列化等执行期才能发现的异常。 一般的资料处理步骤:读入资料 -> 对资料进行处理 -> 分析结果 -> 写入结果 SparkSQL 结构化资料 处理结构化资料(如 CSV,JSON,Parquet 等);把已经结构化资料抽象成 DataFrame (HiveTable);非结构化资料通过 RDD.map.filter 转换成结构化进行处理;按照列式数据库,只加载非结构化中可结构化的部分列(Hbase,MongoDB);处理非结构化资料,不能简单的用 DataFrame 装载。而是要用 SparkRDD 把资料读入,在通过一系列的 Transformer Method 把非结构化的资料加工为结构化,或者过滤到不合法的资料。 SparkSQL DataFrame引言:
Spark是在借鉴了MapReduce之上发展而来的,继承了其分散式平行计算的优点并改进了MapReduce明显的缺陷。Spark主要包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX等元件。目录:
一、Spark RDD一、Spark RDD
RDD(Resilient Distributed Dataset)叫做弹性分散式资料集,是Spark中最基本的资料抽象,它代表一个不可变、可分割槽、元素可平行计算的集合。二、基于Spark RDD资料开发的不足
由于MapReduce的shuffle过程需写磁盘,比较影响效能;而Spark利用RDD技术,计算在内存中流式进行。另外 MapReduce计算框架(API)比较局限, 使用需要关注的引数众多,而Spark则是中间结果自动推断,通过对资料集上炼式执行函式具备一定的灵活性。三、SparkSQL
Spark 从 1.3 版本开始原有 SchemaRDD 的基础上提供了类似Pandas DataFrame API。新的DataFrame API不仅可以大幅度降低普通开发者的学习门槛,同时还支援Scala、Java与Python三种语言。更重要的是,由于脱胎自SchemaRDD,DataFrame天然适用于分散式大资料场景。
SparkSQL 中一切都是 DataFrame,all in DataFrame. DataFrame是一种以RDD为基础的分散式资料集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元资讯,即DataFrame所表示的二维表资料集的每一列都带有名称和型别。如果熟悉 Python Pandas 库中的 DataFrame 结构,则会对 SparkSQL DataFrame 概念非常熟悉。
TextFile DataFrame
import.org.apache.spark.sql._
//定义资料的列名称和型别
valdt=StructType(List(id:String,name:String,gender:String,age:Int))
//汇入user_info.csv档案并指定分隔符
vallines = sc.textFile(/path/user_info.csv).map(_.split(,))
//将表结构和资料关联起来,把读入的资料user.csv对映成行,构成资料集
valrowRDD = lines.map(x=>Row(x(0),x(1),x(2),x(3).toInt))
//通过SparkSession.createDataFrame()建立表,并且资料表表头
val df= spark.createDataFrame(rowRDD, dt)
读取规则资料档案作为DataFrame
SparkSession.Builder builder = SparkSession.builder()
Builder.setMaster(local).setAppName(TestSparkSQLApp)
SparkSession spark = builder.getOrCreate();
SQLContext sqlContext = spark.sqlContext();
# 读取 JSON 资料,path 可为档案或者目录
valdf=sqlContext.read().json(path);
# 读取 HadoopParquet 档案
vardf=sqlContext.read().parquet(path);
# 读取 HadoopORC 档案
vardf=sqlContext.read().orc(path);
JSON 档案为每行一个 JSON 物件的档案型别,行尾无须逗号。档案头也无须[]指定为阵列;SparkSQL 读取是只是按照每行一条 JSON Record序列化;
Parquet档案
Configurationconfig = new Configuration();
ParquetFileReaderreader = ParquetFileReader.open(
HadoopInputFile.fromPath(new Path(hdfs:///path/file.parquet),conf));
Mapschema = reader.getFileMetaData().getKeyValueMetaData();
String allFields= schema.get(org.apache.spark.sql.parquet.row.metadata);
allFiedls 的值就是各字段的名称和具体的型别,整体是一个json格式进行展示。
读取 Hive 表作为 DataFrame
Spark2 API 推荐通过 SparkSession.Builder 的 Builder 模式建立 SparkContext。 Builder.getOrCreate() 用于建立 SparkSession,SparkSession 是 SparkContext 的封装。
在Spark1.6中有两个核心元件SQLcontext和HiveContext。SQLContext 用于处理在 SparkSQL 中动态注册的表,HiveContext 用于处理 Hive 中的表。
从Spark2.0以上的版本开始,spark是使用全新的SparkSession界面代替Spark1.6中的SQLcontext和HiveContext。SQLContext.sql 即可执行 Hive 中的表,也可执行内部注册的表;
在需要执行 Hive 表时,只需要在 SparkSession.Builder 中开启 Hive 支援即可(enableHiveSupport())。
SparkSession.Builder builder = SparkSession.builder().enableHiveSupport();
SparkSession spark = builder.getOrCreate();
SQLContext sqlContext = spark.sqlContext();
// db 指 Hive 库中的数据库名,如果不写预设为 default
// tableName 指 hive 库的资料表名
sqlContext.sql(“select * from db.tableName”)
SparkSQL ThriftServer
//首先开启 Hive 的 Metastore服务
hive$bin/hive –-service metastore –p 8093
//把 Spark 的相关 jar 上传到hadoophdfs指定目录,用于指定sparkonyarn的依赖 jar
spark$hadoop fs –put jars/*.jar /lib/spark2
// 启动 spark thriftserver 服务
spark$ sbin/start-thriftserver.sh --master yarn-client --driver-memory 1G --conf
spark.yarn.jars=hdfs:///lib/spark2/*.jar
当hdfs 上传了spark 依赖 jar 时,通过spark.yarn.jars 可看到日志 spark 无须每个job 都上传jar,可节省启动时间
19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.0.5.jar
19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.1.7.3.jar
//通过 spark bin 下的 beeline 工具,可以连线到 spark ThriftServer(SparkOnHive)
bin/beeline -u jdbc:hive2://ip:10000/default -n hadoop
-u 是指定 beeline 的执行驱动地址;
-n 是指定登陆到 spark Session 上的使用者名称称;
Beeline 还支援传入-e 可传入一行 SQL,
-e query that should be executed
也可通过 –f 指定一个 SQL File,内部可用逗号分隔的多个 SQL(储存过程)
-f script file that should be executed
SparkSQL Beeline 的执行效果展示
SparkSQL ThriftServer
对于 SparkSQL ThriftServer 服务,每个登陆的使用者都有建立的 SparkSession,并且执行的对个 SQL 会通过时间顺序列表展示。
SparkSQL ThriftServer 服务可用于其他支援的数据库工具建立查询,也用于第三方的 BI 工具,如 tableau。
四、SparkSQL Flow
SparkSQL Flow 是以 SparkSQL 为基础,开发的统一的基于 XML 配置化的可执行一连串的 SQL 操作,这一连串的 SQL 操作定义为一个 Flow。下文开始 SparkSQL Flow 的介绍:SparkSQL Flow 是基于 SparkSQL 开发的一种基于 XML 配置化的 SQL 资料流转处理模型。该模型简化了 SparkSQL 、Spark RDD的开发,并且降低开发了难度,适合了解资料业务但无法驾驭大资料以及 Spark 技术的开发者。
一个由普元技术部提供的基于 SparkSQL 的开发模型;一个可二次定制开发的大资料开发框架,提供了灵活的可扩充套件 API;一个提供了 对档案,数据库,NoSQL 等统一的资料开发视界语义;基于 SQL 的开发语言和 XML 的模板配置,支援 Spark UDF 的扩充套件管理;支援基于 Spark Standlone,Yarn,Mesos 资源管理平台;支援开源、华为、星环等平台统一认证。SparkSQL Flow 适合的场景:
批量 ETL;非实时分析服务;SparkSQL Flow XML 概览
Properties 内定义一组变数,可用于宏替换;Methods 内可注册 udf 和 udaf 两种函式;Prepare 内可定义前置 SQL,用于执行 source 前的 sql 操作;Sources 内定义一个到多个数据表检视;Transformer 内可定义 0 到多个基于 SQL 的资料转换操作(支援 join);Targets 用于定义 1 到多个数据输出;After 可定义 0到多个任务日志;如你所见,source 的 type 引数用于区分 source 的型别,source 支援的种类直接决定SparkSQL Flow 的资料来源载入广度;并且,根据 type 不同,source 也需要配置不同的引数,如数据库还需要 driver,url,user和 password 引数。
Transformer 是基于 source 定的资料检视可执行的一组转换 SQL,该 SQL 符合 SparkSQL 的语法(SQL99)。Transform 的 SQL 的执行结果被作为中间表命名为 table_name 指定的值。
Targets 为定义输出,table_name 的值需在 source 或者 Transformer 中定义。
SparkSQL Flow 支援的Sourse
支援从 Hive 获得资料;支援档案:JSON,TextFile(CSV),ParquetFile,AvroFile支援RDBMS数据库:PostgreSQL, MySQL,Oracle支援 NOSQL 数据库:Hbase,MongoDBSparkSQL Flow TextFile Source
textfile 为读取文字档案,把文字档案每行按照 delimiter 指定的字元进行切分,切分不够的列使用 null 填充。
fields=cust_id,name1,gender1,age1:int
delimiter=,
path=file:///Users/zhenqin/software/hive/user.txt/>
Tablename 为该档案对映的资料表名,可理解为资料的检视;Fields 为切分后的字段,使用逗号分隔,字段后可紧跟该字段的型别,使用冒号分隔;Delimiter 为每行的分隔符;Path 用于指定档案地址,可以是档案,也可是资料夹;Path 指定地址需要使用协议,如:file:// 、 hdfs://,否则跟 core-site.xml 配置密切相关;SparkSQL Flow DB Source
table=user
url=jdbc:mysql://localhost:3306/tdb?characterEncoding=UTF-8
driver=com.mysql.jdbc.Driver
user=root password=123456/>
RDBMS 是从数据库使用 JDBC读取 资料集。支援 type 为:db、mysql、oracle、postgres、mssql;
tablename 为该资料表的抽象 table 名称(检视);url、driver、user,password 为数据库 JDBC 驱动资讯,为必须字段;SparkSQL 会载入该表的全表资料,无法使用 where 条件。SparkSQL Flow Transformer
SELECT c_phone,c_type,c_num, CONCAT_VAL(cust_id) as cust_ids
FROM user_concat_testx
group by c_phone,c_type,c_num
Transform 支援 cached 属性,预设为 false;如果设定为 true,相当于把该结果快取到内存中,快取到内存中的资料在后续 Transform 中使用能提高计算效率。但是需使用大量内存,开发者需要评估该资料集能否放到内存中,防止出现 OutofMemory 的异常。
SparkSQL Flow Targets
SparkSQL Flow Targets 支援输出资料到一个或者多个目标。这些目标,基本覆盖了 Source 包含的外部系统。下面以 Hive 举例说明:
table_name=cust_id_agmt_id_t
savemode=”append”
target_table_name=cust_id_agmt_id_h/>
table_name 为 source 或者 Transform 定义的表名称;target_table_name 为 hive 中的表结果,Hive 表可不存在也可存在,sparksql 会根据 DataFrame 的资料型别自动建立表;savemode 预设为 overwrite 覆盖写入,当写入目标已存在时删除源表再写入;支援 append 模式, 可增量写入。Target 有一个特殊的 show 型别的 target。用于直接在控制台输出一个 DataFrame 的结果到控制台(print),该 target 用于开发和测试。
Rows 用于控制输出多少行资料。
SparkSQL Around
After 用于 Flow 在执行结束后执行的一个环绕,用于记录日志和写入状态。类似 Java 的 try {} finally{ round.execute() }
多个 round 一定会执行,round 异常不会导致任务失败。
sql=insert into cpic_task_history(id, task_type, catalog_model, start_time, retry_count, final_status, created_at)
values(${uuid}, ${task.type}, ${catalog.model}, ${starttime}, 0, ${status}, now())
url=${jdbc.url} .../>
sql=update cpic_task_history set
end_time = ${endtime}, final_status = ${status}, error_text = ${error} where id = ${uuid}
url=${jdbc.url}”…/>
Prepare round 和 after round 配合使用可用于记录 SparkSQL Flow 任务的执行日志。
SparkSQL Around可使用的变数
SparkSQL Around的执行效果
Prepare round 可做插入(insert)动作,after round 可做更新 (update)动作,相当于在数据库表中从执行开始到结束有了完整的日志记录。SparkSQL Flow 会保证round 一定能被执行,而且 round 的执行不影响任务的状态。
SparkSQL Flow 提交
bin/spark-submit --master yarn-client --driver-memory 1G
--num-executors 10 --executor-memory 2G
--jars /lib/jsoup-1.11.3.jarlib/jsqlparser-0.9.6.jar,/lib/mysql-connector-java-5.1.46.jar
--conf spark.yarn.jars=hdfs:///lib/spark2/*.jar
--queue default --name FlowTest
etl-flow-0.2.0.jar -f hive-flow-test.xml
接收必须的引数 –f,可选的引数为支援 Kerberos 认证的租户名称principal,和其认证需要的金钥档案。
usage: spark-submit --jars etl-flow.jar --class
com.yiidata.etl.flow.source.FlowRunner
-f,--xml-file Flow XML File Path
--keytabFile keytab File Path(Huawei)
--krb5File krb5 File Path(Huawei)
--principal principal for hadoop(Huawei)
SparkSQL Execution Plan
每个Spark Flow 任务本质上是一连串的 SparkSQL 操作,在 SparkUI SQL tab 里可以看到 flow 中重要的资料表操作。
regiserDataFrameAsTable 是每个 source 和 Transform 的资料在 SparkSQL 中的资料检视,每个检视都会在 SparkContex 中注册一次。
对RegisterDataFrameAsTable的分析
通过单个 regiserDataFrameAsTable 项进行分析,SparkSQL 并不是把source 的资料立即计算把资料放到内存,而是每次执行 source 时只是生成了一个 Logical Plan,只有遇到需要提交的算子(Action),SparkSQL 才会触发前面所依赖的的 plan 执行。
总结
这是一个开发框架,不是一个成熟的产品,也不是一种架构。他只是基于 SparkSQL 整合了大多数的外部系统,能通过 XML 的模板配置完成资料开发。面向的是理解资料业务但不了解 Spark 的资料开发人员。整个框架完成了大多数的外部系统对接,开发者只需要使用 type 获得资料,完成资料开发后通过 target 回写到目标系统中。整个过程基本无须程式开发,除非当前的 SQL 函式无法满足使用的情况下,需要自行开发一下特定的 UDF。因此本框架在对 SparkSQL 做了二次开发基础上,大大简化了 Spark 的开发,可降低了开发者使用难度。关于作者:震秦,普元资深开发工程师,专注于大资料开发 8 年,擅长 Hadoop 生态内各工具的使用和优化。参与某公关广告(上市)公司DMP 建设,负责资料分层设计和批处理,排程实现,完成交付使用;参与国内多省市公安社交网络专案部署,负责产品开发(Spark 分析应用);参与资料清洗加工为我方主题库并部署上层应用。
关于EAWorld:微服务,DevOps,资料治理,移动架构原创技术分享。