Hadoop大数据技术基础
大数据介绍
- 定义:大数据是从各种类型的海量数据中快速获得有价值信息的技术。
- 特点:
- 大量化 Volume :数据体量大
- 多样化 Vaiety:数据类型多
- 快速化 Velocity:处理速度快
- 价值 Value:价值密度低
Hadoop介绍
Google的三驾马车和Hadoop的开源实现
Google三驾马车 | Hadoop的实现 |
---|---|
GFS | HDFS |
MapReduce | MapReduce |
BigTable | HBase |
特点
- 高可靠性
- 高拓展性
- 高效性
- 高容错性
生态圈
名称 | 功能 |
---|---|
HDFS | 分布式文件系统 |
YARN | 资源调度框架 |
MapReduce | 分布式并行编程模型 |
HBase | 分布式列式数据库 |
Hive | 大数据数据仓库 |
Pig | 查询半结构化数据集的分析平台 |
Flume | 一个高可用、高可靠、分布式的海量日志采集、聚合和传输系统 |
Sqoop | 传统数据库与Hadoop数据存储和处理平台间进行数据传递的工具 |
ZooKeeper | 提供分布式协调一致性服务 |
Ambari | Hadoop快速部署工具 |
Mahout | 提供一些可拓展的机器学习领域经典算法的实现 |
HDFS
介绍
- 优点:低成本运行、处理超大文件、流式访问、高容错性
- 缺点:不适合低延迟访问、不适合小文件读写、不适合多用户写入、不适合文件任意修改
HDFS 架构
HDFS
由 NameNode
、SecondaryNameNode
和DataNode
三块组成。
graph TB subgraph 服务端 SenondaryNameNode NameNode DataNode1 DataNode2 DataNode3 end Client--处理数据块-->DataNode1 Client(客户端)--请求文件-->NameNode
NameNode
功能:
- 管理和维护 HDFS 的命名空间:两类文件
- 管理 DataNode 上的数据块
- 接受客户端的请求
两类文件:
fsimage
文件: 命名空间镜像文件,存储文件系统元信息。- 包含所有的目录和文件信息
- 对于目录来说,包含的信息主要有修改时间、访问控制权限等
- 对于文件来说,包含的信息有修改时间、访问时间、访问控制、块大小和组成一个文件块等
edits
文件: 记录操作日志editlog
。edits
保存了很多对文件进行操作的指令。
- 关系:
edits
记录的是文件的操作记录,是过程fsimage
保存的是文件的操作结果,是结果
DataNode
数据块
- 数据块是磁盘进行数据读、写的最小单位,
- 文件系统数据块的大小通常为磁盘数据块的整数倍。Hadoop 2.x 默认为 128MB。
- 目的:减少寻址开销,减少磁盘一次读取时间。
- HDFS将每个文件划分为大小相等的数据块进行存储。为了容错,文件的所有数据块都被冗余复制,
功能
- 存储数据块
- I/O:根据
NameNode
的指令,和客户端就行 I/O 操作 - 心跳通信:定期向
NameNode
发送心跳信息,保持和NameNode
的通信。
SecondaryNameNode
定期合并 fsimage
和 edits
,避免 edits
过大。
graph LR STOP(暂停)-->DOWNLOAD(下载) DOWNLOAD-->MERGE(合并) -->UPLOAD(上传) -->REPLACE(替换) -->RESTORE(恢复)
- 暂停:
SecondaryNameNode
向NameNode
发送请求,要求其暂停使用edits
文件。之后新的更新操作写入edits.new
文件。 - 下载:
SecondaryNameNode
从NameNode
下载fsimage
和edits
文件。 - 合并:
SecondaryNameNode
根据edits
中的信息,对fsimage
进行操作。最终形成一个新的fsimage.ckpt
。 - 上传:
SecondaryNameNode
将合并后的fsimage.ckpt
上传到NameNode
。‘ - 替换:
NameNode
用得到的新fsimage.ckpt
替换旧的fsimage
,用edits.new
替换edits
。 - 恢复:
NameNode
恢复使用edits
记录操作日志。
工作机制
机架感知
- 概述:根据网络的拓扑结构,对
HDFS
的操作进行优化。 - 默认情况:不开启机架感知,即认为全部在一个机架上。
- 自定义:通过外部脚本文件来判断。输入IP地址,输出机架信息。
副本冗余策略
副本存储
- 副本1:在上传文件的数据节点
- 副本2:在与第一个节点不同的机架上
- 副本3:和副本2一个机架,但不同的节点
- 更多副本:均匀分布在剩余的机架下
- 其余副本:随机选择节点存放
优点
- 减小了机架间的数据传输,提高写操作效率
- 不影响数据的可靠性和可用性,但改进了写的性能
- 减小因机架间的数据传输,导致的网络传输之间的总带宽
文件读取
sequenceDiagram Client->>DistributedFileSystem: 打开文件 DistributedFileSystem->> NameNode: 请求数据块信息 Client->>FSDataInputStream: 打开流 FSDataInputStream->>DataNode: 读取数据 Client->>FSDataInputStream: 关闭流
- 客户端通过
DistributedFileSystem
打开文件 DistributedFileSystem
向NameNode
请求数据块信息Client
从FSDataInputStream
读取数据FSDataInputStream
从DataNode
读取数据Client
关闭FSDataInputStream
文件写入
sequenceDiagram Client->>DistributedFileSystem: 创建元请求 DistributedFileSystem->> NameNode: 创建文件元数据 Client->>FSDataOutputStream: 写入数据 FSDataOutputStream->>DataNode: 写入数据包 DataNode->>FSDataOutputStream: 接受数据包 Client->>FSDataInputStream: 关闭 FSDataInputStream->>NameNode: 通知写入完成
客户端通过
DistributedFileSystem
打开文件。DistributedFileSystem
请求NameNode
创建元数据。此操作通过 RPC 调用完成。RPC 是指远程过程调用,它类似函数调用,区别是函数在另一台服务器上。
Client
向FSDataOutputStream
写入数据,先写入缓冲区,再一个个切分成数据包。FSDataOutputStream
向DataNode
发送数据,节点由NameNode
分配。数据包由这些DataNode
组成的管道进行传输。管道上的
DataNode
反向返回确认信息,最终由第一个数据节点返回给FSDataOutputStream
。Client
关闭FSDataOutputStream
FSDataInputStream
通知NameNode
文件写入完成。
错误处理
DataNode 出错
由 HDFS 自动处理
- 判定:若
NameNode
近期未接受到DataNode
的心跳信息,就认为这个DataNode
挂了。 - 处理方法:检测需要复制的数据块,将其复制到其他节点。
NameNode 出错
- 因素:
fsimage
或者edits
损坏。 - 后果:整个
HDFS
直接挂掉
解决方案
- 同步存储到其他文件系统
HDFS HA
: 高可用性- 共享存储系统:如
ZooKeeper
- 使用主从
NameNode
存储数据 - 各个节点之间使用
JournalNode
同步数据
- 共享存储系统:如
Shell 操作
操作 | 命令 | 参数 | 示例 |
---|---|---|---|
创建文件夹 | hdfs dfs -mkdir [-p] <paths> | -p :如果父目录不存在,则先创建父目录 | 创建 hadoop 目录:hdfs dfs -mkdir /hadoop |
列出文件夹 | hdfs dfs -ls [-dhR] <paths> | -d :返回父目录-R :同时显示子目录 | 显示 hadoop 目录:hdfs dfs -ls /hadoop |
创建文件 | hdfs dfs -touchz <paths> | 无 | 创建 hadoop.txt 文件:hdfs dfs -touchz /hadoop.txt |
上传文件 | hdfs dfs -put <local_src> <hdfs_dst> hdfs dfs -copyFromLocal <local_src> <hdfs_dst> | 无 | 将 /root/app.txt 上传到 HDFS 的根目录:hdfs dfs -put /root/app.txt /app.txt |
移动文件到HDFS | hdfs dfs -moveFromLocal <local_src> <hdfs_dst> | 无 | 将 /root/app.txt 移动到 HDFS 的根目录:hdfs dfs -moveFromLocal /root/app.txt /app.txt |
下载文件 | hdfs dfs -get <hdfs_src> <local_dst> hdfs dfs -copyToLocal <hdfs_src> <local_dst> | 无 | 将 /app.txt 下载到 /root/app.txt :hdfs dfs -copyToLocal /app.txt /root/app.txt |
查看文件 | hdfs dfs -cat <hdfs_path> | 无 | 查看 /app.txt :hdfs dfs -cat /app.txt |
追加文件 | hdfs dfs -appendToFile <hdfs_src> <local_dst> | 无 | 追加 /root/app.txt 到 /app.txt :hdfs dfs -appendToFile /root/app.txt /app.txt |
删除文件或目录 | hdfs dfs -rm [-fr] <hdfs_path> | -f 文件不存在时,不显示错误信息-r 递归删除文件夹 | 删除文件 /app.txt :hdfs -dfs -rm /app.txt 删除空文件夹 /hadoop :hdfs -dfs -rm /hadoop 删除非空文件夹 /hadoop : hdfs -dfs -rm -r /hadoop |
YARN
概述
- 一个资源管理系统,为上层应用提供统一的资源管理和调度
- 主要功能:
- 安排任务
- 任务的进度监控
架构
YARN
由 Container
、ResourceManager
、 NodeManager
、 ApplicationMaster
组成。
graph TB subgraph Node1 NodeManager1 subgraph Container1 ApplicationMaster end end subgraph Node2 NodeManager2 subgraph Container2 Application1 end end subgraph Node3 NodeManager3 subgraph Container3 Application2 end end Client-->ResourceManger ResourceManger-->NodeManager1 ResourceManger-->NodeManager2 ResourceManger-->NodeManager3
Container
:是YARN
中资源CPU
、内存等的抽象,它封装了某个节点的多维度资源。ResourceManager
:负责整个系统的资源分配和管理。Scheduler
:为应用程序分配封装在Container
的容器ApplicationManager
:管理整个系统中的应用程序
NodeManager
:每个节点上的资源管理器- 定时向
ResourceManager
报告资源使用情况 - 接受和处理来自
ApplicationManager
的启动/停止请求
- 定时向
ApplicationMaster
:- 与
ResourceManager
协商以获取Container
- 负责应用的监控,跟踪执行状态、重启失败任务
- 和
NodeManager
协同完成工作的监控
- 与
应用运行和监控机制
sequenceDiagram Client->>ResourceManager: 提交应用 ResourceManager->>Container1(内含ApplicationMaster): 初始化 Container1(内含ApplicationMaster)->>Container1(内含ApplicationMaster): 启动容器 alt 计算资源不够 Container1(内含ApplicationMaster)->>ResourceManager: 申请资源 Container1(内含ApplicationMaster)->>Container2(内含Application): 启动容器 end loop 进度监控 Container2(内含Application)->>Container1(内含ApplicationMaster): 汇报进度 Container1(内含ApplicationMaster)->>Container1(内含ApplicationMaster): 形成视图 end Client->>Container1(内含ApplicationMaster): 获取进度
运行机制
- 提交应用:
Client
向ResourceManager
提交应用 - 初始化容器:
ResourceManager
初始化一个容器Container
- 启动容器:节点在
NodeManager
的协助下启动容器Container
。- 首次启动时
Container
内包含ApplicationMaster
- 如果
ApplicationMaster
计算资源不足,则向ResourceManager
申请资源。 - 申请资源后,
ApplicationMaster
启动一个新的容器Container
,内含Application
。
- 首次启动时
- 执行应用
监控机制
- 汇报状态:
Application
向ApplicationMaster
报告自己的进度 - 汇聚视图:
ApplicationMaster
汇聚作业视图 - 获取状态:客户端获取作业的状态
MapReduce
介绍
- 核心思想 分而治之,汇总结果 ,将一个大任务拆分成很多的小任务,每个小任务进行各自的处理,最后将小任务的结果进行汇总。
- 特点:易于编程、良好的拓展性、高容错性、离线处理海量数据
- 缺点:不擅长实时计算、不擅长流式计算、不擅长有向图计算
编程模型
graph LR Input--InputFormat-->K11V11(k1,v1) Input--InputFormat-->K12V12(k1,v1) Input--InputFormat-->K13V13(k1,v1) Input--InputFormat-->K14V14(k1,v1) K11V11--Map-->K21V21(k2,v2) K12V12--Map-->K22V22(k2,v2) K13V13--Map-->K23V23(k2,v2) K14V14--Map-->K24V24(k2,v2) K21V21--Shuffle-->K32V32(k2,v2的数组) K22V22--Shuffle-->K31V31(k2,v2的数组) K23V23--Shuffle-->K31V31 K24V24--Shuffle-->K33V33(k2,v2的数组) K31V31--Reduce-->K41V41(k3,v3) K32V32--Reduce-->K42V42(k3,v3) K33V33--Reduce-->K43V43(k3,v3) K41V41--OutputFormat-->Output K42V42--OutputFormat-->Output K43V43--OutputFormat-->Output
执行流程
- 输入Input一个大文件,拆分Split成很多片段。
- 每个片段由单独一个节点处理,称为 Map 阶段。
- 将各个主机计算得到的结果进行汇总得到最终结果,称为 Reduce 阶段。
特点
Job = Map + Reduce
Map
的输出是Reduce
的输入- 输入和输出都是
<K,V>
的形式 - 输入和输出的类型必须是
Hadoop
的类型 - 处理的数据都是
HDFS
上的数据
编程步骤
graph LR create(创建项目)-->maven(配置 Maven) maven-->mapper(编写Mapper) mapper--> reducer(编写 Reducer) reducer--> App(编写主类)
创建项目
使用 idea
创建一个项目。
配置 Maven
- 依赖项:
groupId | artifactId | version |
---|---|---|
org.apache.hadoop | hadoop-common | 和安装的 hadoop 版本一致 |
org.apache.hadoop | hadoop-hdfs | 和安装的 hadoop 版本一致 |
org.apache.hadoop | hadoop-client | 和安装的 hadoop 版本一致 |
org.apache.hadoop | hadoop-common | 和安装的 hadoop 版本一致 |
- 构建插件需要设置主类
示例(hadoop-2.9.2):
1 |
|
编写 Mapper
类
自定义的
Mapper
类需要继承org.apache.hadoop.mapreduce.Mapper
,后面需要接上括号。括号中分别为k1,v1,k2,v2
。如果没有指定
InputFormat
,或者指定的InputFormat
为TextInputFormat
,则会将输入按行进行分割。k1
是LongWritable
,是该行在整个文件中的字节偏移量v1
是Text
,是该行的内容自定义的
Mapper
需要覆写map
函数,参数为k1 key
、v1 value
和Context context
,抛出IOException
和InterruptedException
。写入键值对
context.write(k2,v2)
。
示例:
1 | import org.apache.hadoop.io.IntWritable; |
编写 Reducer
类
自定义的
Reducer
类需要继承org.apache.hadoop.mapreduce.Reducer
,后面需要接上括号。括号中分别为k2,v2,k3,v3
。自定义的
Reducer
需要覆写reduce
函数,参数为k2 key
、Iterable<v2> values
和Context context
,抛出IOException
和InterruptedException
。写入键值对
context.write(k3,v3)
。
示例:
1 | import org.apache.hadoop.io.IntWritable; |
编写主类
graph LR create(创建 job 实例)-->app(设置主类) app-->mapper(设置Mapper和输出键值对) mapper--> reducer(设置Reducer和输出键值对) reducer--> io(设置 I/O 路径) --> run(执行任务)
!> 设置的类需要以 类名.class
结尾,而不是类名本身。详情请参考 Java反射相关知识。
示例:
1 | import org.apache.hadoop.conf.Configuration; |
数据处理和转换
Java内置类型转换
源类型 | 目标类型 | 转换方法 src 和 dst |
---|---|---|
boolean | String | dst = Boolean.toString(src) |
String | boolean | dst = Boolean.parseBoolean(src) |
int | String | dst = Integer.toString(src) |
String | int | dst = Integer.parseInt(src) |
long | String | dst = Long.toString(src) |
String | long | dst = Long.parseLong(src) |
float | String | dst = Float.toString(src) |
String | float | dst = Float.parseFloat(src) |
double | String | dst = Double.toString(src) |
String | double | dst = Double.parseDouble(src) |
int | long ,double | 隐式类型转换dst = src |
long ,double | int | 强制类型转换dst = (int)src |
double | long | 强制类型转换dst = (long)src |
boolean | int ,long | 三目语句dst=src ? 1 : 0 |
Hadoop类型和Java类型转换
Java类型 | Hadoop类型 | Hadoop类型hv 转换成Java类型jv | Java类型jv 转换成Hadoop类型hv |
---|---|---|---|
boolean | BooleanWritable | jv=hv.get() | hv=new BooleanWritable(jv) |
int | IntWritable | jv=hv.get() | hv=new IntWritable(jv) |
long | LongWritable | jv=hv.get() | hv=new LongWritable(jv) |
float | FloatWritable | jv=hv.get() | hv=new FloatWritable(jv) |
double | DoubleWritable | jv=hv.get() | hv=new DoubleWritable(jv) |
String | Text | jv=hv.toString() | hv=new Text(jv) |
NullWritable | 无 | hv=NullWritable.get() |
csv表格的处理
csv
使用 ,
作为数据分隔符。通过字符串分割获取每一列的内容。
1 | String data = v1.toString(); // v1 是单行的内容 |
自定义类型
- 序列化接口用于将内存中的
Java
对象转换成可存储文件或者可以传送到其他设备的流。 Hadoop
类型相对于Java
内置类型,添加了序列化接口 :org.apache.hadoop.io.Writable
。对于自定义类型,如果需要支持MapReduce
的操作,则需要实现Writable
接口。Writable
接口有两个方法:
+void write(DataOutput out) throws IOException
:用于将数据写入流
+void readFields(DataInput in) throws IOException
:用于从流中读取数据
两个方法中,读写顺序必须严格一致。
- 常见类型的读写
Java 类型 | 输出out | 输入in |
---|---|---|
boolean | out.writeBoolean(obj) | 输出in.readBoolean(obj) |
int | out.writeInt(obj) | 输出in.readInt(obj) |
long | out.writeLong(obj) | 输出in.readLong(obj) |
double | out.writeDouble(obj) | 输出in.readDouble(obj) |
String | out.writeUTF(obj) | 输出in.readUTF(obj) |
示例:
1 | import org.apache.hadoop.io.Writable; |
运行和监控机制
MapReduce
的运行机制和 YARN
大体相同,不过 MapReduce
的 Application Master
称为 MRAppMaster
。
sequenceDiagram Client->>Client: 请求执行 Job Client->>ResourceManager: 请求Job ID Client->>Client: 计算输入分片 Client->>HDFS: 保存相关资源 Client->>ResourceManager: 提交应用 ResourceManager->>Container1(内含MRAppMaster): 初始化 Container1(内含MRAppMaster)->>Container1(内含MRAppMaster): 启动容器 HDFS->>Container1(内含MRAppMaster): 获取分片资源 Container1(内含MRAppMaster)->>Container1(内含MRAppMaster): 创建任务对象 alt 计算资源不够 Container1(内含MRAppMaster)->>ResourceManager: 申请资源 Container1(内含MRAppMaster)->>Container2(内含Application): 启动容器 HDFS->>Container2(内含Application): 获取资源并本地化 end loop 进度监控 Container2(内含Application)->>Container1(内含MRAppMaster): 汇报进度 Container1(内含MRAppMaster)->>Container1(内含MRAppMaster): 形成视图 end Client->>Container1(内含MRAppMaster): 获取进度
运行机制
请求
Job
:Client
请求执行Job
。准备
Job
:Job
向ResourceManager
请求Job ID
。- 计算输入分片。
- 将运行作业所需的
Jar
文件、配置文件、分片等资源保存在HDFS
中以jobID
命名的目录下。
提交应用:
Job
向ResourceManager
提交应用,并传入相应资源。初始化容器:
ResourceManager
初始化一个容器Container
启动容器:
- 节点在
NodeManager
的协助下启动容器,包含MRAppMaster
。 MRAppMaster
对应用进行初始化。
- 节点在
创建和分发
Map
、Reduce
任务MRAppMaster
获取从HDFS
中输入的分片,对每个分片创建一个Map
任务对象和多个Reduce
任务对象。此时分配各个Task ID
。- 如果
MRAppMaster
计算资源不足,则向ResourceManager
申请资源。 - 申请资源后,
MRAppMaster
启动一个新的容器Container
,内含Application
。 Application
获取Jar
文件、配置文件、分片等资源,保存到本地。
运行任务
监控机制
- 汇报状态:
Application
向MRAppMaster
报告自己的进度 - 汇聚视图:
MRAppMaster
汇聚作业视图 - 获取状态:客户端获取作业的状态
编号 | 单独考查 | 混合考查 | HBase重点 | Hive重点 | Flume重点 |
---|---|---|---|---|---|
373031902856 | 40% | 0 | HBase Shell操作 | Hive 数据模型 Hive表操作 Hive数据查询 Hive常用函数 | Flume配置文件编写 |
373032001234 | 45% | 0 | HBase Shell操作 | Hive安装 Hive表操作 Hive数据查询 Hive常用函数 | Flume配置文件编写 |
HBase
简介
- 特性:伸缩性强、自动分区、线性拓展和新节点自动处理、容错能力强、检索性能强、支持普通硬件
- 应用场景:对象存储、时序数据存储、推荐画像、时空数据存储
基本概念
表 Table:由行和列构成,列又分成若干个列族。
行Row:用行键
rowKey
唯一标识数据列族Column Family:
- 需要在表的定义阶段给出。
- 有一个或多个列成员。列成员可以动态加入
列限定符Qualifier:又称为列 Column。列族里面的数据通过列限定符来定位。无需事先定义,也不需要在不同行之间保持一致。
单元格 Cell:由行键、列族、列限定符、时间戳唯一决定。不分类型。
时间戳 TimeStamp:通过时间戳区分版本。
基本操作
表操作
操作 | 命令 | 注意 |
---|---|---|
创建表 | create "表名","列族2","列族2",... | |
添加列族 | alter "表名","列族名" | |
删除列族 | alter "表名",{NAME => "列族名称",METHOD => "delete"} | |
查看表的描述 | describe "表名" | |
禁用表 | disable "表名" | |
删除表 | drop "表名" | 需要先禁用表,再执行删除操作 |
数据操作
操作 | 命令 | 注意 |
---|---|---|
插入/更新数据 | create "表名","行键","列族:列名","值" | |
删除数据 | delete "表名","行键","列族:列名" | |
查看某条数据 | get "表名","行键" | |
查看全部数据 | scan "表名" |
Hive
一个基于 Hadoop
的数据仓库工具。
环境搭建
嵌入式安装
安装
解压目录
设置环境变量
HIVE_HOME=Hive安装目录
PATH=$PATH:$HIVE_HOME/bin
配置
- 修改配置文件,位置:
$HIVE_HOME/conf/hive-site.xml
- 修改配置文件,位置:
配置项 | 值 | 描述 |
---|---|---|
javax.jdo.option.ConnectionURL | jdbc:derby;databaseName=metastore_db;create=true | 数据库连接地址 |
javax.jdo.option.ConnectionDriverName | org.apache.derby.jdbc.EmbeddedDriver | 数据库连接驱动 |
hive.metastore.local | true | 是否使用本地存储 |
hive.metastore.warehouse.dir | file://home/hadoop/hive/warehouse | 本地存储目录 |
- 初始化
初始化
Derby
数据库schematool -dbType derby -initSchema
本地模式安装
需要先装好
HADOOP
和MySQL
安装
解压目录
设置环境变量
HIVE_HOME=Hive安装目录
PATH=$PATH:$HIVE_HOME/bin
配置
- 修改配置文件,位置:
$HIVE_HOME/conf/hive-site.xml
- 修改配置文件,位置:
配置项 | 值 | 描述 |
---|---|---|
javax.jdo.option.ConnectionURL | jdbc:mysql://localhost:3306/hive?useSSL=false | 数据库连接地址 |
javax.jdo.option.ConnectionDriverName | com.mysql.jdbc.Driver | 数据库连接驱动 |
javax.jdo.option.ConnectionUserName | hive | 数据库用户名 |
javax.jdo.option.ConnectionPassword | 123456 | 数据库密码 |
- 初始化
复制
mysql
驱动到$HIVE_HOME/lib
初始化
MySQL
数据库schematool -dbType mysql -initSchema
数据模型
表
- 内部表:
- 概念:与数据库中的表在概念上是类似
- 存储:每一个内部表在
Hive
中有相应的存储目录。删除时元数据和数据都会删除。
- 外部表:
- 概念:指向一个已经在
HDFS
中的数据,可以创建分区。 - 存储:元数据的组织和内部表相同,但不会移动数据到数据仓库目录中,只是与外部数据建立一个链接。删除时,只删除元数据和链接。
- 概念:指向一个已经在
- 分区表:
- 概念:通过对某或某些列的数据进行分区,使得查询时不必扫描全表,以提高查询速度。
- 存储:每个分区在单独一个文件夹下,每个文件夹存放于表目录下。
- 分区在表中以字段的形式存在,但仅表示分区,不表示数据。
- 内部表:
桶:是指将表或分区中指定列的值为
key
进行hash
,hash
到指定的桶中- 优点:提高查询处理效率、提高取样效率
视图:是一个虚表。视图并不在数据库中以存储的数据值集形式存在。行和列数据来自由定义视图的查询所引用的表,并且在引用视图时动态生成。
数据库操作
- 创建数据库
1 | CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name |
语句 | 说明 |
---|---|
DATABASE|SCHEMA | 用于限定创建数据库或数据库模式 |
IF NOT EXISTS | 目标对象不存在时才执行创建操作(可选) |
COMMENT | 起注释说明作用 |
LOCATION | 指定数据库位于HDFS上的存储路径。若未指定,将使用hive.metastore.warehouse.dir 定义值作为其上层路径位置 |
WITH DBPROPERTIES | 为数据库提供描述信息,如创建database的用户或时间 |
创建数据库示例
1 | create database hadoop_hive |
- 选择数据库
1 | use database_name; |
使用默认数据库:
1 | USE DEFAULT; |
表操作
- 创建表
1 | CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name // 指定表名 |
语句 | 说明 |
---|---|
EXTERNAL | 创建外部表,若未指定,则默认创建的是内部表 |
IF NOT EXISTS | 若表不存在才创建,若未指定,当目标表存在时,创建操作抛出异常 |
COMMENT | 添加注释说明,注释内容位于单引号内 |
LIKE | 复制其他表的定义 |
PARTITIONED BY | 添加分区列 |
CLUSTERED BY | 根据列之间的相关性指定列聚类在相同桶中(BUCKETS),可以对表内容按某一列进行升序(ASC)或降序(DESC)排序(SORTED BY关键字) |
ROW FORMAT | 指定 hive 表行对象(ROW Object)数据与 HDFS 数据之间进行传输的转换方式(HDFS files -> Deserializer ->Row object以及Row object ->Serializer ->HDFS files),以及数据文件内容与表行记录各列的对应。 在创建表时可以指定数据列分隔符(FIFLDS TERMINATED BY 子句), |
LOCATION | 指定表数据在 HDFS 上的存储位置。若未指定,db_name数据库将会储存在${hive.metastore.warehouse.dir}定义位置的db_name目录下 |
TBLPROPERTIES | 为所创建的表设置属性(如创建时间和创建者,默认为当前用户和当前系统时间) |
例子 创建普通表
INFO | TYPE | COMMENT |
---|---|---|
Sno | INT | student sno |
name | STRING | student name |
age | INT | student age |
sex | STRING | student sex |
score | STRUCT <Chinese:FLOAT,Math:FLOAT,English:FLOAT> | student score |
1 | CREATE TABLE IF NOT EXISTS test2.student( |
例子 复制表结构
1 | CREATE TABLE IF NOT EXISTS items_info2 LIKE items_info; |
例子 创建分区表
使用 shopping
数据库创建一张商品信息分区表 items_info2
,按商品品牌 p_brand
和商品分类 p_category
进行分区。
1 | CREATE TABLE IF NOT EXISTS shopping.items_info2( |
例子 指定存储格式
建立指定存储格式的表示例:
1 | CREATE TABLE IF NOT EXISTS shopping.items_info( |
例子 创建分桶表
1 | create table bucket_user ( |
导入数据:用于把数据搬运到
Hive
表上位于HDFS
上的目录位置。- 导入HDFS的目录:执行移动操作
- 导入本地的目录:执行复制操作
- 导入本地的文件:执行复制操作
- 若创建表时指定了分区列,使用 LOAD 命令加载数据时也要为所有分区列指定特定值。
!> 此操作不会对数据进行格式化和检查。创建的表需要指定存储格式。
1 | LOAD DATA [LOCAL] INPATH 'filepath' |
语句 | 说明 |
---|---|
LOCAL | 加载本地文件系统中的数据 |
OVERWRITE | 覆盖原有数据,而不是追加 |
PARTITION (partcol1=vall,partcol2=val2 …) | 指定数据所在分区 |
- 导出数据
单文件写入
1 | INSERT OVERWRITE [LOCAL] DIRECTORY directory |
语句 | 说明 |
---|---|
LOCAL | 数据写入到本地文件系统 |
写入数据示例
1 | INSERT OVERWRITE LOCAL DIRECTORY '/home/test4' |
视图操作
- 创建视图
1 | CREATE VIEW [IF NOT EXISTS] view_name |
例子
基于表test创建一个test_view视图:
1 | CREATE VIEW test_view(id,name_length) |
数据查询
1 | SELECT select_expr column_name, select_expr column_name, ... | * // 指定要查询的列 |
- 查询所有列
1 | select * from db_name; |
where
子句:将不满足条件的行过滤,在SQL语句中执行顺序优先于group by。- 多个语句间可以使用
AND
OR
进行连接。
- 多个语句间可以使用
示例
1 | select name from student where age>25; |
like
:用于在WHERE
子句中搜索列中的指定模式。%代表任意多个字符。
示例
查询出工作职责涉及hive的数据
1 | select * from db1.table1 where responsibility like '%hive%'; |
group by
:表示按照某些字段的值进行分组,有相同的值放到一起,需要注意的是select后面的非聚合列必须出现在group by中。
示例
1 | select city,avg(salary) from st group by city; |
join
:用于多表查询,并将查询结果进行连接。Hive只支持等值连接,即ON子句中只能使用等号连接。
类型 | 解释 | 示例 |
---|---|---|
内连接 | 把符合两边连接条件的数据查询出来。 | select a.name,b.score from a join b on a.id=b.cid; |
左外连接 | 左表全部查询出来,右表不符合连接条件的显示为空 | select a.name,b.score from a left outer join b on a.id=b.cid; |
右外连接 | 右表全部查询出来,左表不符合连接条件的显示为空 | select a.name,b.score from a right outer join b on a.id=b.cid; |
全外连接 | 左右表符合连接条件和不符合连接条件的都查出来,不符合的显示空 | select a.name,b.score from a full outer join b on a.id=b.cid; |
左半开连接 | 查询出满足连接条件的左边表记录,需要注意的是select和where语句中都不能使用右表的字段 | select a.name from a LEFT SEMI JOIN b on a.id=b.cid; |
order by
排序order by后面可以有多列进行排序,默认按字典排序(desc:降序,asc(默认):升序。order by为全局排序。
limit
限制输出条数
在Hive查询中限制查询输出条数
插入数据
将查询结果写入表
1 | INSERT OVERWRITE|INTO TABLE table_name |
语句 | 说明 |
---|---|
OVERWRITE | 直接覆盖原来的数据 |
INTO | 追加到原来的数据 |
将查询结果写入文件
1 | INSERT OVERWRITE [LOCAL] DIRECTORY directory |
语句 | 说明 |
---|---|
LOCAL | 查询结果写入本地文件系统,不指定则写入 HDFS |
ROW FORMAT | 参考查询语句 |
示例
1 | INSERT OVERWRITE LOCAL DIRECTORY '/home/test4' |
常用函数的使用
聚合函数
聚合函数通常用于获取一组数据的数字特征。
函数 | 说明 |
---|---|
COUNT | 计算某一列的个数 |
SUM | 计算某一列的合计值,该列必须为数值类型 |
AVG | 计算某一列的平均值,该列必须为数值类型 |
MAX | 计算某一列的最大值 |
MIN | 计算某一列的最小值 |
例子 使用聚合函数计算男生平均成绩
1 | SELECT AVG(score) average FROM students WHERE gender = 'M'; |
例子 使用聚合函数查询表的行数
1 | SELECT COUNT(*) TOTAL FROM students; |
例子 使用分组聚合查询各班的人数
1 | SELECT COUNT(*) TOTAL FROM students GROUP BY cls; |
条件函数
1 | CASE a |
上述语句可以理解为如果 a 等于 b,那么返回 c;如果 a 等于 d,那么返回 e;否则返回 f。
示例
1 | Select case job when 0 then 'president' when 1 then 'student' else 'person' end from student; |
根据 job 输出职位。
- 0: president
- 1: student
- 其他: person
Flume
常用Source
- Avro Source:Avro端口监听并接收来自外部的 avro 客户流的事件。
属性名 | 默认值 | 说明 |
---|---|---|
type | - | 必须为avro |
channel | - | channel的名称 |
bind | - | IP地址或者主机名 |
port | - | 绑定的端口 |
- Exec Source:通过设定一个 Unix(linux) 命令监控文件。
属性名 | 默认值 | 说明 |
---|---|---|
type | - | 必须为exec |
channel | - | channel的名称 |
command | - | 执行的命令,常使用 tail -F file |
- Spooling Directory Source:监控某个目录下新增的文件,并读取文件中的数据
属性名 | 默认值 | 说明 |
---|---|---|
type | - | 必须为spooldir |
channel | - | channel的名称 |
spoolDir | - | 监控目录 |
常用 Sink
- HDFS Sink:将事件写入 HDFS ,目前支持创建文本文件和序列文件,并支持压缩。可以根据时间长短或数据大小或事件数量定期滚动文件。
属性名 | 默认值 | 说明 |
---|---|---|
type | - | 必须为hdfs |
hdfs.path | - | 保存文件的目录 |
channel | - | channel的名称 |
hdfs.rollInterval | 30s | 回滚间隔,为零则不根据时间回滚 |
hdfs.rollSize | 1024byte | 回滚大小,为零则不根据大小回滚 |
hdfs.rollCount | 10 | 回滚条数,为零则不根据条数回滚 |
hdfs.fileType | SequenceFile | 保存到hdfs的文件格式,支持SequenceFile, DataStream 或者CompressedStream,当使用CompressedStream时需指定压缩形式 |
- Logger Sink:INFO 级别记录事件。通常用于测试/调试目的。
属性名 | 默认值 | 说明 |
---|---|---|
type | - | 必须为logger |
channel | - | channel的名称 |
常见 Channel
- Memory Channel:将 agent 缓存于内存中,适用于高吞吐量并且当 agent 挂掉以后允许数据丢失的业务上。
属性名 | 默认值 | 说明 |
---|---|---|
type | - | 必须为memory |
capacity | 100 | 存储channel的最大event数 |
transitionCapacity | 100 | 从 Source接收的channel,或发送到 Sink 的最大event数 |
keep-alive | 3 | 添加或删除 Event 的超时时间 |
byteCapacityBufferPercentge | 20 | 定义缓存百分比 |
byteCapacity | - | Channel 中所有 Event 总和允许的最大内存字节数 |
- File Channel:将 agent存储于磁盘上,当 agent挂掉以后数据不会丢失。
属性名 | 默认值 | 说明 |
---|---|---|
type | - | 必须为file |
dataDirs | /.flume/file-channel/data | 数据存储目录,可配置多个(可提高性能),用逗号分隔 |
checkpointDir | /.flume/file-channel/checkpoint | 存储 checkpoint 文件的目录 |
编写步骤
graph LR DEFINE(命名)-->CHANNEL(配置Channel) CHANNEL-->SOURCE(配置Source) SOURCE-->SINK(配置Sink) -->BIND(绑定)
示例 采集/opt/flume/data
所有数据到 HDFS 的 /flume
目录下。
1 | 配置命名 |
ZooKeeper
核心架构
graph LR Leader Follower1(Follower)-->Leader Follower2(Follower)-->Leader
Leader
:- 协调请求:所有事务请求,并将事务请求转换成一个提议
- 分发提议:分发提议到所有 Follower,等待半数以上 Follower 的正确反馈
- 分发
Commit
:若半数以上 Follower 的正确反馈提议,则再次发送Commit
请求,要求Follower
对提议进行提交。
Follower
:- 响应提议
- 提交提议
服务器一般为奇数个。 个节点可以承受 个节点故障。
请求处理
- 写请求:可以被任意服务器接收,但全部转发给
Leader
- 读请求:可以被任意服务器接收
- zxid:每次读写返回
zxid
编号,保证返回的数据不会比客户端传过来的zxid
新。 - 模糊快照和日志:定期将内存的数据保存到磁盘中,形成模糊快照。
工作模式
- 崩溃恢复:若
Leader
挂掉了,则可以通过选举机制,选举产生新的Leader
服务器。选举的过程称为 崩溃恢复模式。 - 消息广播:若过半服务器完成状态同步,则进入消息广播 模式。若新服务器加入已进行消息广播的集群中,则自动进入数据恢复模式。
数据模型
树形结构
每个节点称为
ZNode
一般在
1MB
以内每个节点包含版本号、时间戳等信息。
不能创建一个已存在的节点。
临时节点:
- 一旦创建,不可改变
- 创建临时节点的会话一旦消失,临时节点也会消失
- 临时节点不允许拥有子节点
顺序节点
- 创建节点时,可以在路径结尾加上一个递增的计数。
节点的监听
- 节点的状态改变时,会触发
Watch
对应的操作,且只触发一次。
- 节点的状态改变时,会触发
选举机制
- Zookeeper集群中只有超过半数以上的服务器启动,集群才能正常工作。
- 在集群正常工作之前,每个服务器启动时发起一次选举。先给自己投票,然后和其他服务器交换数据,将投票改成
id
大的。 - 在集群正常工作时,获得票最多的节点成为
Leader
。 - 选出
Leader
之后,之前的服务器状态由Looking
改变为Following
,以后的服务器都是Follower
。
示例
假设有7台服务器。
- 服务器1启动,发起一次选举。此时服务器1投自己一票。此时服务器1票数1票。(不足4票)
- 服务器2启动,发起一次选举。此时服务器2投自己一票。此时服务器1发现服务器2的id比自己大,更改选票投给服务器2。此时服务器1票数0票,服务器2票数1票。(不足4票)
- 服务器3启动,发起一次选举。此时服务器3投自己一票。此时服务器1,2发现服务器3的id最大,都更改选票投给服务器3。此时服务器1,2票数0票,服务器3票数3票。(不足4票)
- 服务器4启动,发起一次选举。此时服务器4投自己一票。此时服务器1,2,3发现服务器4的id最大,都更改选票投给服务器4。此时服务器1,2,3票数0票,服务器4票数4票。此时服务器4成为 Leader
- 后面启动的服务器,发现已经有 Leader了,就直接成为 Follower。
应用
介绍
- Master选举
- 分布式锁
- 数据发布和订阅
- 分布式协调通知
- 心跳检测
- 命名服务
- 分布式队列
- 组服务
Master选举
- 原理:
- ZooKeeper无法创建一个已存在的节点
- 创建临时节点的会话一旦消失,临时节点也会消失
- 机制:
- 多台机器同时申请创建同一个临时节点
- 创建成功的机器即为
Master
- 创建不成功的机器则监听此节点
- 若
Master
出现故障,临时节点消失,则各个机器重新同时申请创建同一个临时节点
graph TB APPLY(多台机器同时创建相同节点)--创建成功-->MASTER(即为Master节点) APPLY--创建不成功-->WATCH(监听Master节点) MASTER--挂了-->APPLY
分布式锁
原理:
- ZooKeeper可以创建顺序节点
- 创建临时节点的会话一旦消失,临时节点也会消失
加锁机制
- 一台机器申请一个临时顺序节点
- 获取节点所在目录的最小节点
- 若最小节点就是申请节点,则获得锁
- 若最小节点不是申请节点,则监听前一个节点
- 若监听的节点发生变化(解锁或者机器坏了),则跳转到
2
步骤
graph TB APPLY[申请一个临时顺序节点]-->GETNODE GETNODE[获取最小节点]-->MINNODE MINNODE{最小节点=申请的节点}--是-->LOCK[获得锁] MINNODE--否-->WAIT[监听前一个节点] WAIT--节点变化-->GETNODE
- 解锁机制:删除加锁机器所申请的节点即可
- 标题: Hadoop大数据技术基础
- 作者: ObjectKaz
- 创建于: 2021-01-24 07:05:00
- 更新于: 2021-04-26 00:12:00
- 链接: https://www.objectkaz.cn/d477d713584d.html
- 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。