Spark Streaming基于Spark处理流式数据的框架,在MapReduce中,由于其分布式特性——所有数据需要读写磁盘、启动job耗时较大,难以满足时效性要求。而Streaming能够在Spark上生根发芽的原因是因为其内存特性、低延时的执行引擎和高速的执行效率。
Streaming的原理是将Stream数据分成小的时间间隔(比如几秒),即将其离散化(Discretized)并转换成一个一个数据集(RDD),然后分批处理处理这小的RDD。所以Streaming很容易很mlib,Spark SQL等进行结合,做到实时的数据分析处理。此外,Streaming也继承了RDD的容错特性。如果RDD 的某些 partition 丢失了 , 可以通过 lineage 信息重新计算恢复。
Streaming的数据源主要分下面两类:
· 外部文件系统 , 如 HDFS,Streaming可以监控一个目录中新产生的数据,并及时处理。如果出现fail,可以通过重新读取数据来恢复 , 绝对不会有数据丢失。
· 网络系统:如MQ系统(Kafka、ZeroMQ、Flume等)。Streaming会默认会在两个不同节点加载数据到内存 , 一个节点 fail 了 , 系统可以通过另一个节点的数据重算。假设正在运行 InputReceiver 的节点 fail 了 , 可能会丢失一部分数据。
Continue reading
Category Archives: BigData
Spark Streaming
Spark中的编程模型
1. Spark中的基本概念
在Spark中,有下面的基本概念。
Application:基于Spark的用户程序,包含了一个driver program和集群中多个executor
Driver Program:运行Application的main()函数并创建SparkContext。通常SparkContext代表driver program
Executor:为某Application运行在worker node上的饿一个进程。该进程负责运行Task,并负责将数据存在内存或者磁盘上。每个Application都有自己独立的executors
Cluster Manager: 在集群上获得资源的外部服务(例如 Spark Standalon,Mesos、Yarn)
Worker Node: 集群中任何可运行Application代码的节点
Task:被送到executor上执行的工作单元。
Job:可以被拆分成Task并行计算的工作单元,一般由Spark Action触发的一次执行作业。
Stage:每个Job会被拆分成很多组Task,每组任务被称为stage,也可称TaskSet。该术语可以经常在日志中看打。
RDD:Spark的基本计算单元,通过Scala集合转化、读取数据集生成或者由其他RDD经过算子操作得到。
2. Spark应用框架
客户Spark程序(Driver Program)来操作Spark集群是通过SparkContext对象来进行,SparkContext作为一个操作和调度的总入口,在初始化过程中集群管理器会创建DAGScheduler作业调度和TaskScheduler任务调度。
DAGScheduler作业调度模块是基于Stage的高层调度模块(参考:Spark分析之DAGScheduler),DAG全称 Directed Acyclic Graph,有向无环图。简单的来说,就是一个由顶点和有方向性的边构成的图中,从任意一个顶点出发,没有任何一条路径会将其带回到出发的顶点。它为每个Spark Job计算具有依赖关系的多个Stage任务阶段(通常根据Shuffle来划分Stage,如groupByKey, reduceByKey等涉及到shuffle的transformation就会产生新的stage),然后将每个Stage划分为具体的一组任务,以TaskSets的形式提交给底层的任务调度模块来具体执行。其中,不同stage之前的RDD为宽依赖关系。 TaskScheduler任务调度模块负责具体启动任务,监控和汇报任务运行情况。
创建SparkContext一般要经过下面几个步骤:
a). 导入Spark的类和隐式转换
1 2 |
import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ |
使用IDEA开发Spark应用
IDEA 全称IntelliJ IDEA,是java语言开发的集成环境,IntelliJ在业界被公认为最好的java开发工具之一,尤其在智能代码助手、代码自动提示、重构、J2EE支持、Ant、JUnit、CVS整合、代码审查、 创新的GUI设计等方面的功能都非常棒,而且IDEA是目前Scala支持最好的IDE。IDEA分ultimate和free edition版,ultimate提供了J2EE等很多非常强力的功能,free edition我觉得已经对于我这样的初学者已经够用了。前面写过一篇配置IntelliJ IDEA 13的SBT和Scala开发环境,本文在这个基础上使用IDEA进行Spark应用的配置和开发。 Continue reading
Cloudera Manager 5离线安装参考
0. 主机规划
Cloudera Manager需要一台单独的主机
debugo01 Cloudera Manager, yum repo
debugo02 NameNode, DataNode, Yarn RM, Spark Master, Spark Worker …
debugo03 DataNode, Spark Worker, Secondery NameNode …
Continue reading
编译spark-1.0.1源代码
截至到当前为止,spark最新版本为1.0.1。本文测试环境的操作系统是CentOS 7.0 x86_64,jdk为1.7.0_51。编译spark和编译hadoop类似,是一个很简单的过程。
1. 下载spark源代码
1 |
wget http://d3kbcqa49mib13.cloudfront.net/spark-1.0.1.tgz |
2. 安装maven或sbt
(1). java-devel是必须要安装的包
1 |
yum -y install java-devel |
(2). 安装maven
1 |
yum -y install maven |
检查maven
1 2 3 4 5 6 7 |
# mvn -v Apache Maven 3.0.5 (Red Hat 3.0.5-16) Maven home: /usr/share/maven Java version: 1.7.0_55, vendor: Oracle Corporation Java home: /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.55-2.4.7.2.el7_0.x86_64/jre Default locale: en_US, platform encoding: UTF-8 OS name: "linux", version: "3.10.0-123.4.2.el7.x86_64", arch: "amd64", family: "unix" |
使用createrepo配置本地ambari/HDP源
企业内部的hadoop平台一般是不可能连接外部网络。所以需要搭建一个本地的yum repository。下面是简单的配置流程,不足之处欢迎拍砖。
1. 配置hdp的源
wget -nv http://public-repo-1.hortonworks.com/HDP/centos6/2.x/GA/2.1-latest/hdp.repo -O /etc/yum.repos.d/hdp.repo
配置ambari源:
wget -nv http://public-repo-1.hortonworks.com/ambari/centos6/1.x/updates/1.6.0/ambari.repo -O /etc/yum.repos.d/hdp.repo
Continue reading
Git In IDEA
1. 使用命令行来clone branch
安装git之后,使用下面方式来初始化git本地目录
1 2 3 4 |
git config --global user.name 'debugo' git config --global user.email 'xiaoluffy@gmail.com' cd <your project home> git init |
clone spark源代码
1 |
git clone git://github.com/apache/spark.git |
2. IntelliJ配置git
File – Setting – Version Control – Github,配置github account
分享项目Github上:
选择菜单”VCS — Import into Version Control — Share project on Github”
不必输出library,等到分享结束。
Successfully shared project on Github: sbtFirst..
3. 通过IntelliJ来checkout源代码
输入git URL:
git://github.com/apache/spark.git
完成后会提示你是否打开。
^^
Hive点滴 – 查询练习
本文学习一下Hive中的一些查询技巧。
初始化数据
首先创建我们所需的数据库:
1 2 |
CREATE DATABASE Sales; use Sales; |
第一个表定义了一张标识日期的纬度表。通过日期的ID可以找到该日期的年月,年,月,日,星期几,第几周,第几季度,旬、半月等信息
1 2 3 4 5 6 7 8 9 10 11 12 13 |
CREATE TABLE DateList ( DateID string, theyearmonth string, theyear string, themonth string, thedate string, theweek string, theweeks string, thequot string, thetenday string, thehalfmonth string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' ; |
第二个表文件定义了订单的相关信息,主要字段有订单序号,交易地点ID,交易日期ID。
1 2 3 4 5 6 |
CREATE TABLE OrderList( ordernumber STRING, locationid STRING, dateID string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' ; |
第三章表定义了订单详细信息,其内容包括:订单号,行号,货品,数量,金额。
1 2 3 4 5 6 7 8 9 |
CREATE TABLE OrderDetails( ordernumber STRING, rownum int, itemid STRING, qty INT, price int, amount int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' ; |
下面将数据加载到三个表中:
1 2 3 |
LOAD DATA LOCAL INPATH '/var/lib/hive/DateList.txt' INTO TABLE DateList; LOAD DATA LOCAL INPATH '/var/lib/hive/OrderList.txt' INTO TABLE OrderList; LOAD DATA LOCAL INPATH '/var/lib/hive/OrderDetails.txt' INTO TABLE OrderDetails; |
检查数据:
1 2 |
0: jdbc:hive2://debugo02:10000> select count(*) from OrderDetails; | 287950 | |
通过HQL完成数据校验
在ETL中可能出现一些不正确的数据,比如OrderDetails和OrderList中信息不匹配。我们可以通过HQL语句来找到这些数据。
1 2 3 4 5 6 |
select count(*) from sales.OrderList a,sales.OrderDetails b where a.ordernumber=b.ordernumber; ...... 287950 select count(*) from sales.OrderList a, sales.OrderDetails b,DateList c where a.ordernumber=b.ordernumber and a.dateid=c.dateid; ...... 287942 |
这可能出现没有正确的dateid字段的订单。我们下面通过一个not in语句来找到这些订单!
select a.* from sales.OrderList a where a.dateid not in (select dateid from sales.DateList);
通过HQL完成报表统计
统计所有订单中每年的销售单数、销售增额。
1 2 3 4 5 |
select c.theyear,count(distinct a.ordernumber),sum(b.amount) from sales.orderlist a, sales.orderdetails b, sales.datelist c where a.ordernumber=b.ordernumber and a.dateid=c.dateid group by c.theyear order by c.theyear; |
1 2 3 4 5 6 7 |
select c.theyear,c.thequot,sum(b.amount) as sumofamount from sales.orderlist a,sales.orderdetails b,sales.datelist c where a.ordernumber=b.ordernumber and a.dateid=c.dateid group by c.theyear,c.thequot order by sumofamount desc limit 3; |
1 2 3 4 5 |
select a.ordernumber,sum(b.amount) as amount from sales.orderlist a,sales.orderdetails b where a.ordernumber=b.ordernumber group by a.ordernumber having amount>100000; |
找出订单中每年最畅销的商品
第一步:找到按年、商品id进行的统计汇总
1 2 3 4 5 |
CREATE VIEW IF NOT EXISTS v_yearItemSummary as select c.theyear,b.itemid,sum(b.amount) as amount from sales.orderlist a,sales.orderdetails b,sales.datelist c where a.ordernumber=b.ordernumber and a.dateid=c.dateid group by c.theyear,b.itemid; |
第二步: 找到每年最大的商品销量
1 2 3 4 5 |
CREATE VIEW IF NOT EXISTS v_yearItemSummary as select c.theyear,b.itemid,sum(b.amount) as amount from sales.orderlist a,sales.orderdetails b,sales.datelist c where a.ordernumber=b.ordernumber and a.dateid=c.dateid group by c.theyear,b.itemid; |
1 2 3 4 |
select distinct v.theyear,v.itemid,f.maxamount from v_yearItemSummary v , (select theyear, max(amount) as maxamount from v_yearItemSummary group by theyear) f where v.theyear=f.theyear and v.amount=f.maxamount order by v.theyear; |
^^
通过Squrirel连接Hive
SQuirrel SQL Client是一个用Java写的数据库客户端,用JDBC统一数据库访问接口以后,可以通过一个统一的用户界面来操作MySQL,MSSQL,Greenplum,Hive等等任何支持JDBC访问的数据库。使用起来非常方便。
1. 安装
下载地址: http://squirrel-sql.sourceforge.net/ 最新安装包:squirrel-sql-3.5.3-standard.jar及驱动jar包,此外我们还需要$HIVE_HOME/lib下的相关jar驱动包。安装需要依赖于JRE环境,直接双击squirrel-sql-3.5.3-standard.jar文件来执行安装程序。安装过程可以根据自己需要选择插件。
2. 添加JDBC Driver
安装完成后,运行squirrel-sql.bat,进入图形界面。此时我们还需要添加Hive的driver连接驱动程序。在Driver标签中添加新的driver。在external标签中添加了jar包后,通过list driver来选择jdbc.HiveDriver。
3. 登录Hive
完成后,我们在alias标签中建立一个hive连接的快捷别名:
完成后,我们可以查看Hive的schema以及通过SQL标签页来执行查询,非常方便。
^^
Hive点滴 – 单行转多行(split)
很多数据标签都是有在一个字段中有多条信息,而这些信息如果能直接在HQL中解析多行,无疑是极好滴。一个简单的栗子:
1 2 3 4 |
create table test_split( string tag; string value; ); |
我们插入一条数据
tag1 value1,value2,value3
普通模式查询:
1 2 3 4 5 6 |
select * from test_tag; +---------------+-----------------------+--+ | test_tag.tag | test_tag.value | +---------------+-----------------------+--+ | tag1 | value1,value2,value3 | +---------------+-----------------------+--+ |
通过split来拆分为多行,非常的方便。
1 2 3 4 5 6 7 8 |
select tag, v from test_tag lateral view explode(split(value,',')) adtable as v; +-------+---------+--+ | tag | v | +-------+---------+--+ | tag1 | value1 | | tag1 | value2 | | tag1 | value3 | +-------+---------+--+ |
^^