前言
本来前言这部分准备了一大堆话来描述下本篇要介绍的内容,算了,看图吧,本篇会给你打来的最大帮助就是,呈先大数据一部分项目的样貌,这篇内容也不是一次写完了,首先是呈先出一个完整过程,后期会不断填充细节,不多说了,全在图里面,干了。
实施过程
各个集群启动
- 启动hdfs集群
jps(Java Virtual Machine Process Status Tool)是java提供的一个显示当前所有java进程pid的命令;如果第一次启动namenode没启动起来可执行初始化命令hdfs namenode -format
[root@wq1 ~]# start-dfs.sh
[root@wq1 ~]# jps
6162 DataNode
8949 Jps
6518 SecondaryNameNode
6874 NameNode
根据自己的IP端口进入namennode的web界面http://192.168.3.233:50070/explorer.html#/
2. 启动zookeeper
[root@wq2 bin]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
- 启动kafka
[root@wq1 ~]# kafka-server-start.sh -daemon /opt/kafka/config/server.properties
- 启动hive
启动hiveserver2是为了使用hive数仓的可视化界面使用(要远程连接),启动hive目的是在服务器上进行一些建表操作
[root@wq1 ~]# hive
[root@wq1 ~]# hiveserver2
flume开始拿数据
因为flume-》kafka->flume-》hdfs
1. kafka接数据前创建一个topic
[root@wq1 my_conf]# kafka-topics.sh --zookeeper wq1:2181 --create --topic mywebdata --partitions 3 --replication-factor 1
Created topic "mywebdata".
Connection to node -1 could not be established. Broker may not be available.
在wq2执行如下命令时出现了该报错,虽然在执行后台启动命令后查看Java进程可以看到该机器kafka已经起了起来,但是一旦进行操作命令,kafka就会挂掉。
kafka-console-consumer.sh --bootstrap-server wq2:9092 --topic topic1
查看该机器Java进程发现少机器关了,那么就存在一个问题,如果kafka集群三台挂了一台怎么启动另一台。初步分析问题可能是网络波动导致这台机器没有被选举上,重启zookeeper还是没有解决,这下就得看报错信息了。
#查看kafka的server.log内容如下
kafka.common.InconsistentBrokerIdException: Configured broker.id 2 doesn't match stored broker.id 3 in meta.properties.
If you moved your data, make sure your configured broker.id matches. If you intend to create a new broker, you should remove all data in your data directories (log.dirs).
最终解决办法,首先我们查看kafka集群的配置过程,知道zookeeper的zoo.cfg中有一项配置是下面这,而在kafka的sever.properties中有一项是设置broker.id,这个id我就设置错了,在zoo.cfg中设置的dataDir下,创建一个名为myid的文件,myid文件中写上当前机器对应的id号,那么就需要这三个id都对得上。id这块设置完了,但还是有问题,查看如上英文报错信息,改了id还不行必须把相关的日志信息删了才行,删除后至此问题解决
# server.id=主机名:通信端口:选举端口
server.1=wq1:2888:3888
server.2=wq2:2888:3888
server.3=wq3:2888:3888
查看这个topic情况(通过下方情况这个topic有三个分区一个副本,分区0和2的数据存在第二台机器上,分区1的数据在第一台机器上)
[root@wq2 bin]# kafka-topics.sh --zookeeper wq1:2181 --describe --topic mywebdata
Topic:mywebdata PartitionCount:3 ReplicationFactor:1 Configs:
Topic: mywebdata Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: mywebdata Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: mywebdata Partition: 2 Leader: 2 Replicas: 2 Isr: 2
- flume的执行脚本
执行前首先对flume传数据情况进行了解
flume-ng agent \
-c /opt/flume-1.9.0/conf \
-f /opt/flume-1.9.0/conf/my_conf/flume-kafka.conf \
-n a1 \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=5653 \
-Dflume.root.logger=INFO,console
访问地址
http://你的主机名:5653/metrics
http上展示的内容
{"SINK.k1":{"ConnectionCreatedCount":"0","BatchCompleteCount":"0","BatchEmptyCount":"147","EventDrainAttemptCount":"10","StartTime":"1613963308675","BatchUnderflowCount":"1","ConnectionFailedCount":"0","ConnectionClosedCount":"0","Type":"SINK","RollbackCount":"0","EventDrainSuccessCount":"10","KafkaEventSendTimer":"48502","StopTime":"0"},
"CHANNEL.c1":{"ChannelCapacity":"1000","ChannelFillPercentage":"0.0","Type":"CHANNEL","ChannelSize":"0","EventTakeSuccessCount":"10","EventTakeAttemptCount":"158","StartTime":"1613963279841","EventPutAttemptCount":"10","EventPutSuccessCount":"10","StopTime":"0"},
"SOURCE.r1":{"AppendBatchAcceptedCount":"0","GenericProcessingFail":"0","EventAcceptedCount":"10","AppendReceivedCount":"0","StartTime":"1613963279882","AppendBatchReceivedCount":"0","ChannelWriteFail":"0","EventReceivedCount":"10","EventReadFail":"0","Type":"SOURCE","AppendAcceptedCount":"0","OpenConnectionCount":"0","StopTime":"0"}}
含义对照
{
"SOURCE.src-1":{
"OpenConnectionCount":"0", //目前与客户端或sink保持连接的总数量(目前只有avro source展现该度量)
"Type":"SOURCE",
"AppendBatchAcceptedCount":"1355", //成功提交到channel的批次的总数量
"AppendBatchReceivedCount":"1355", //接收到事件批次的总数量
"EventAcceptedCount":"28286", //成功写出到channel的事件总数量,且source返回success给创建事件的sink或RPC客户端系统
"AppendReceivedCount":"0", //每批只有一个事件的事件总数量(与RPC调用中的一个append调用相等)
"StopTime":"0", //source停止时自Epoch以来的毫秒值时间
"StartTime":"1442566410435", //source启动时自Epoch以来的毫秒值时间
"EventReceivedCount":"28286", //目前为止source已经接收到的事件总数量
"AppendAcceptedCount":"0" //单独传入的事件到Channel且成功返回的事件总数量
},
"CHANNEL.ch-1":{
"EventPutSuccessCount":"28286", //成功写入channel且提交的事件总数量
"ChannelFillPercentage":"0.0", //channel满时的百分比
"Type":"CHANNEL",
"StopTime":"0", //channel停止时自Epoch以来的毫秒值时间
"EventPutAttemptCount":"28286", //Source尝试写入Channe的事件总数量
"ChannelSize":"0", //目前channel中事件的总数量
"StartTime":"1442566410326", //channel启动时自Epoch以来的毫秒值时间
"EventTakeSuccessCount":"28286", //sink成功读取的事件的总数量
"ChannelCapacity":"1000000", //channel的容量
"EventTakeAttemptCount":"313734329512" //sink尝试从channel拉取事件的总数量。这不意味着每次事件都被返回,因为sink拉取的时候channel可能没有任何数据
},
"SINK.sink-1":{
"Type":"SINK",
"ConnectionClosedCount":"0", //下一阶段或存储系统关闭的连接数量(如在HDFS中关闭一个文件)
"EventDrainSuccessCount":"28286", //sink成功写出到存储的事件总数量
"KafkaEventSendTimer":"482493",
"BatchCompleteCount":"0", //与最大批量尺寸相等的批量的数量
"ConnectionFailedCount":"0", //下一阶段或存储系统由于错误关闭的连接数量(如HDFS上一个新创建的文件因为超时而关闭)
"EventDrainAttemptCount":"0", //sink尝试写出到存储的事件总数量
"ConnectionCreatedCount":"0", //下一个阶段或存储系统创建的连接数量(如HDFS创建一个新文件)
"BatchEmptyCount":"0", //空的批量的数量,如果数量很大表示souce写数据比sink清理数据慢速度慢很多
"StopTime":"0",
"RollbackCount":"9", //
"StartTime":"1442566411897",
"BatchUnderflowCount":"0" //比sink配置使用的最大批量尺寸更小的批量的数量,如果该值很高也表示sink比souce更快
}
}
flume到kafka的脚本。
# Name the components on this agent
# a1是我们自定义的agent的名字
# a1.sources a1这个agent包含的多个source的名字
# 包含几个组件就写一个名字空格隔开
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# 配置source
# 配置source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/mywebdata/access.log
# Describe the sink
# 配置sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mywebdata
a1.sinks.k1.kafka.bootstrap.servers = wq1:9092,wq2:9092,wq3:9092
a1.sinks.k1.kafka.flumeBatchSize = 1000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
# Use a channel which buffers events in memory
# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
# 将组件连接起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume从kafka采集到hdfs的脚本
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a3.sources.r1.batchSize = 5000
a3.sources.r1.batchDurationMillis = 2000
a3.sources.r1.kafka.bootstrap.servers = wq1:9092,wq2:9092,wq3:9092
a3.sources.r1.kafka.consumer.auto.offset.reset = earliest
a3.sources.r1.kafka.topics = mywebdata
# Describe the sink
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = /flume-hdfs/
a3.sinks.k1.hdfs.filePrefix = webdata
a3.sinks.k1.hdfs.rollInterval = 0
a3.sinks.k1.hdfs.rollSize = 67108864
a3.sinks.k1.hdfs.rollCount = 0
a3.sinks.k1.hdfs.fileType = DataStream
a3.sinks.k1.hdfs.writeFormat = Text
# Use a channel which buffers events in memory
a3.channels.c1.type = file
a3.channels.c1.capacity = 10000
a3.channels.c1.byteCapacityBufferPercentage = 20
a3.channels.c1.transactionCapacity = 10000
a3.channels.c1.byteCapacity = 20000000
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
报错
ava.io.IOException: Callable timed out after 30000 ms on file: /flume-hdfs//webdata.1613873488282.tmp
at org.apache.flume.sink.hdfs.BucketWriter.callWithTimeout(BucketWriter.java:741)
at org.apache.flume.sink.hdfs.BucketWriter.doFlush(BucketWriter.java:517)
at org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:479)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:441)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at org.apache.flume.sink.hdfs.BucketWriter.callWithTimeout(BucketWriter.java:734)
... 6 more
目前可能想到的操作就是该机器作为消费者,最后没关导致问题出现(Soft lockup名称解释:所谓,soft lockup就是说,这个bug没有让系统彻底死机,但是若干个进程(或者kernel thread)被锁死在了某个状态(一般在内核区域),很多情况下这个是由于内核锁的使用的问题。)内核参数kernel.watchdog_thresh(/proc/sys/kernel/watchdog_thresh)系统默认值为10。如果超过2*10秒会打印信息,注意:调整值时参数不能大于60。
kernel:NMI watchdog: BUG: soft lockup - CPU#0 stuck for 24s! [kworker/u256:1:21875]
解决办法
echo 30 > /proc/sys/kernel/watchdog_thresh
Hive数仓
下面三个步骤就是利用数仓对数据处理数据时走过的几步
第一步:查看数据的格式
[root@wq1 dt=26]# tail -f categories-.1603704061065
82 IPDDpvnbs
94 QDTVDbatyc
11 Ubk
23 YCdjqay
[root@wq1 dt=26]# tail -f customers-.1603704300625
999431 Dow Jones & Co. 'Nnypet Ekmcidvwrnomfk' 'Gqzvqt Susckrpkfog' 26, Great North Circle Montrose 2005-04-17 South 73159 Nigeria (839) 0574180 (185) 54897386
999491 Harrah's Entertainment Inc. 'Jpt Japhmiwb' 'Vxiwek Miquz' 5, Sanya Circle Mandalay 2004-05-09 East 64044 Bulgaria (055) 6860602 (370) 10903682
[root@wq1 dt=26]# tail -f employees-.1603706521271
4739 Ellerkamp Liza 'Sales Hpatgpvbn' 'Fla.' 1973-06-01 2019-01-05 163, West 5th Avenue Zagreb West 55828 Ghan'(03) 555-7341' '3356' WNGCMYQSE WVDEA 66 75455.65
2103 Mcleskey Contessa 'Sales Ojwcaexw' 'Ipw.' 1973-03-30 2016-04-29 14, Alamosa Drive Bakersfield Nort52540 Finland '(14) 555-5542' '6383' MUGIBADJF VHUSQRVXHX 44 96812.09
2948 Bigusiak Markus 'Sales Jfnxtiypx' 'Jnv.' 1972-04-22 2018-11-23 225, Montrose Circle Wrexham South 16134 Algeria '(620) 555-8951' '1129' QEQAOYNJD XGGZSBTX 13 78077.37
9306 Lavgle Blondell 'Sales Vbcsraacanf' 'Ogf.' 1988-06-27 2016-12-20 61, Melody Lane Crystal Cove North 89766 Tuvalu '(694) 555-0165' '3189' DJNOS KW 49 77783.98
[root@wq1 dt=26]# tail -f orders-.1603708021871
2107109 395777 7721 2015-01-20 2018-12-23 2016-05-17 3 69606.79 Laila Mcglockton 7, Cumberland Circle Jatiluhur North 15685 Tanzania
2107121 460594 9092 2018-10-26 2016-09-20 2017-04-07 1 12460.05 Delana Chiotti 83, Bear Mountain View Circle Oklahoma City East 47512 Eritrea
2107133 582440 2649 2017-06-28 2015-06-01 2019-05-28 2 397072.97 Jennefer Freudiger 79, Viewpark Road Okinawa West 58620 Israel
2107145 151015 6753 2017-01-26 2019-01-28 2015-08-12 2 650344.50 Alana Kawasaki 23, H And H Avenue Colon West57409 Switzerland
[root@wq1 dt=26]# tail -f orderdetails-.1603699455177
2441191 9014 259.92 91 0.42
8467405 4039 83.91 21 0.30
7858542 1714 189.81 70 0.46
7872718 2661 207.48 173 0.00
[root@wq1 dt=26]# tail -f products-.1603703609450
9409 YVMSauyq 6 1 '1 jng qo 44 vctsfkw' 158.71
9469 NJkxkwa 7 2 '77 omzj' 35.75
9529 JOOMRpptrr 14 9 '265 ym' 38.65
9589 Dbhyba 15 9 '65 - 112 b uwkfn' 96.62
第二步:数仓分层设计过程
在查看过数据的结构后,绘制一张表格对每层所需字段进行梳理,后期字段变动也方便查找提高效率;数仓在设计过程中根据自己的业务进行处理。为什么要分层,首先,数据量大,源数据有时字段多大上千个,如果不进行分层梳理,那么处理起来效率极低,其次,每层都可以对应一定的业务需求,其次,每层下分担一定数据量,减轻集群压力,减少数据处理时间,最后,每一层,可对接一部分业务;
ODS(Operational Data Store)
操作数据层,细节数据查询的功能在ODS来完成,与源系统的增量或者全量数据基本保持一致。总的来说这一层包含源数据所有字段
表名 | employees_dwd | 员工信息明细表 | ||
---|---|---|---|---|
字段 | employeeId | emloyeeIdname | title | reportTo |
字段类型 | int | string | string | string |
字段映射 | 员工id | 员工姓名 | 职位 | 上级 |
表间关系 | LOAD DATA INPATH ‘hdfs://wq1:9000/source-six/employee’ overwrite into table employees_ods partition (yt=”2021″,mt=”03″,dt=”29″); |
CDM(Common Data Model)
:公共维度模型层;细分为DWD和DWS;主要作用是完成数据加工与整合、建立一致性的维度、构建可复用的面向分析和统计的明细事实表以及汇总公共粒度的指标。
DWD(Data Warehouse Detail):明细数据层
表名 | employees_dwd | 员工信息明细表 | ||
---|---|---|---|---|
字段 | employeeId2dwd | emloyeeIdname2dwd | title2dwd | reportTo2dwd |
字段类型 | int | string | string | string |
字段来源 | a.employeeId | concat(a.firstName,a.lastname) | a.title | b.emloyeeIdname2dwd |
字段映射 | 员工id | 员工姓名 | 职位 | 上级 |
建表类型 | 事实明细层 | |||
表间关系 | FROM shoppingmall.employees_ods a INNER JOIN wushi_cdm.duty_dwd b ON a.reportTo=b.employeeId |
DWS(Data Warehouse Summary):汇总数据层
表名 | employeehm_dws | 员工入职月份维度表 |
---|---|---|
字段 | employeeId2dws | month2dws |
字段类型 | int | string |
字段来源 | employeeId | month(birthdate) |
字段映射 | 员工id | 入职月份 |
建表类型 | 汇总维度层 | |
表间关系 | FROM category_ods |
ADS(Application Data Service):应用数据层
表名 | saletop10_ads | 销售片区Top10详情表 |
---|---|---|
字段 | producename2ads | sale2ads |
字段类型 | int | string |
字段来源 | b.producename | sum(a.unitpricequantitydiscount) |
字段映射 | 商品名称 | 销售额 |
建表类型 | 汇总维度层 | |
表间关系 | FROM (SELECT b.producename,sum(a.unitpricequantitydiscount) c FROM shoppingmall.orderdetails_ods a INNER JOIN shoppingmall.products_ods b ON a.productid=b.productid GROUP BY b.producename ORDER BY c limit 10)tmp; |
第三步:各层建表语句,同步过程
- ods层
- 建库建表
CREATE EXTERNAL TABLE IF NOT EXISTS employees_ods_t
(
employeeId INT COMMENT "员工id",
lastname STRING COMMENT "员工名",
firstName STRING COMMENT "员工姓",
title STRING COMMENT "头衔",
titleofCountry STRING COMMENT "所属国家",
birthdate DATE COMMENT "出生日期",
hiredate DATE COMMENT "入职时间",
address STRING COMMENT "地址",
city STRING COMMENT "城市",
region STRING COMMENT "片区",
postalCode STRING COMMENT "邮编",
country STRING COMMENT "国籍",
homePhone STRING COMMENT "电话",
extension STRING COMMENT "附加",
photo STRING COMMENT "照片",
note STRING COMMENT "注意",
reportTo INT COMMENT "上级领导",
salary DOUBLE COMMENT "工资"
)
COMMENT "员工表"
partitioned by (yt string,mt string,dt string)
row format delimited fields terminated by '\t'
LOCATION '/user/hive/warehouse/ods_database.db/employees_ods_t';
- 映射hdfs的数据到hive中(hdfs的文件只是换两个与hive表location相同的位置而已)
LOAD DATA INPATH 'hdfs://wq1:9000/source-six/employee' overwrite into table employees_ods partition (yt="2021",mt="03",dt="29");
- 确认数据
select * from employees_ods limit 10;
#如果查看hdfs集群上的那个LOCATION有数据,但是在hive中查不出数据,执行下面这个,不小心删除外部表后,可再执行该命令
MSCK REPAIR TABLE employees_ods;
- DWD层
- 建库建表
CREATE EXTERNAL TABLE IF NOT EXISTS employees_dwd
(
employeeId INT COMMENT "员工id",
emloyeeIdname2dwd STRING COMMENT "员工姓名",
title STRING COMMENT "头衔",
reportTo INT COMMENT "上级领导",
)
COMMENT "员工信息明细表"
partitioned by (yt string,mt string,dt string)
LOCATION '/user/hive/warehouse/wushi_cdm.db/employees_dwd';
- 数据同步、处理
insert
into employees_dwd partition(yt="2021",mt="04",dt="01")
select
a.employeeId ,
concat(a.firstName,a.lastname),
a.title ,
b.emloyeeIdname2dwd ,
FROM shoppingmall.employees_ods a
INNER JOIN
wushi_cdm.duty_dwd b
ON
a.reportTo=b.employeeId2dwd;
- ADS
create table sourth
as
select
b.producename ,
sum(a.unitprice*quantity*discount)
as c
from
pdw_database.orderdetails_pdw_t
as a
inner join
pdw_database.products_pdw_t
as b
on a.productid=b.productid
inner join
pdw_database.orders_pdw_t
as e
on a.orderid=e.orderid
where e.shipregion='South'
group by b.producename
order by c
limit 10
自定义hive函数
将数据导入MySQL
导入MySQL前,在数据库上建同样结构的表
CREATE TABLE lake
(
table_name VARCHAR(20) COMMENT "表名",
FieldName VARCHAR(20) COMMENT "列名"
);
sqoop导数据的脚本(按照制表符分隔,map数量为1)
sqoop export
--connect jdbc:mysql://localhost:3306/system_db
--username root
--password 123456
--table lake
--export-dir /user/hive/warehouse/system_db.db/lake/year=2021/month=04/day=01/lake.txt
--fields-terminated-by '\t'
--m 1