前言

本来前言这部分准备了一大堆话来描述下本篇要介绍的内容,算了,看图吧,本篇会给你打来的最大帮助就是,呈先大数据一部分项目的样貌,这篇内容也不是一次写完了,首先是呈先出一个完整过程,后期会不断填充细节,不多说了,全在图里面,干了。

实施过程

各个集群启动

  1. 启动hdfs集群

jps(Java Virtual Machine Process Status Tool)是java提供的一个显示当前所有java进程pid的命令;如果第一次启动namenode没启动起来可执行初始化命令hdfs namenode -format

[root@wq1 ~]# start-dfs.sh
[root@wq1 ~]# jps
6162 DataNode
8949 Jps
6518 SecondaryNameNode
6874 NameNode

根据自己的IP端口进入namennode的web界面http://192.168.3.233:50070/explorer.html#/
2. 启动zookeeper

[root@wq2 bin]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
  1. 启动kafka
[root@wq1 ~]# kafka-server-start.sh -daemon /opt/kafka/config/server.properties
  1. 启动hive
    启动hiveserver2是为了使用hive数仓的可视化界面使用(要远程连接),启动hive目的是在服务器上进行一些建表操作
[root@wq1 ~]# hive
[root@wq1 ~]# hiveserver2

flume开始拿数据

因为flume-》kafka->flume-》hdfs
1. kafka接数据前创建一个topic

[root@wq1 my_conf]# kafka-topics.sh --zookeeper wq1:2181 --create --topic mywebdata --partitions 3 --replication-factor 1       
Created topic "mywebdata".

Connection to node -1 could not be established. Broker may not be available.

在wq2执行如下命令时出现了该报错,虽然在执行后台启动命令后查看Java进程可以看到该机器kafka已经起了起来,但是一旦进行操作命令,kafka就会挂掉。

kafka-console-consumer.sh --bootstrap-server wq2:9092 --topic topic1

查看该机器Java进程发现少机器关了,那么就存在一个问题,如果kafka集群三台挂了一台怎么启动另一台。初步分析问题可能是网络波动导致这台机器没有被选举上,重启zookeeper还是没有解决,这下就得看报错信息了。

#查看kafka的server.log内容如下
kafka.common.InconsistentBrokerIdException: Configured broker.id 2 doesn't match stored broker.id 3 in meta.properties. 
If you moved your data, make sure your configured broker.id matches. If you intend to create a new broker, you should remove all data in your data directories (log.dirs).

最终解决办法,首先我们查看kafka集群的配置过程,知道zookeeper的zoo.cfg中有一项配置是下面这,而在kafka的sever.properties中有一项是设置broker.id,这个id我就设置错了,在zoo.cfg中设置的dataDir下,创建一个名为myid的文件,myid文件中写上当前机器对应的id号,那么就需要这三个id都对得上。id这块设置完了,但还是有问题,查看如上英文报错信息,改了id还不行必须把相关的日志信息删了才行,删除后至此问题解决

# server.id=主机名:通信端口:选举端口
server.1=wq1:2888:3888
server.2=wq2:2888:3888
server.3=wq3:2888:3888

查看这个topic情况(通过下方情况这个topic有三个分区一个副本,分区0和2的数据存在第二台机器上,分区1的数据在第一台机器上)

[root@wq2 bin]# kafka-topics.sh --zookeeper wq1:2181 --describe --topic mywebdata
Topic:mywebdata PartitionCount:3        ReplicationFactor:1     Configs:
        Topic: mywebdata        Partition: 0    Leader: 2       Replicas: 2     Isr: 2
        Topic: mywebdata        Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: mywebdata        Partition: 2    Leader: 2       Replicas: 2     Isr: 2
  1. flume的执行脚本
    执行前首先对flume传数据情况进行了解
flume-ng agent \
-c /opt/flume-1.9.0/conf \
-f /opt/flume-1.9.0/conf/my_conf/flume-kafka.conf \
-n a1 \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=5653 \
-Dflume.root.logger=INFO,console

访问地址

http://你的主机名:5653/metrics

http上展示的内容

{"SINK.k1":{"ConnectionCreatedCount":"0","BatchCompleteCount":"0","BatchEmptyCount":"147","EventDrainAttemptCount":"10","StartTime":"1613963308675","BatchUnderflowCount":"1","ConnectionFailedCount":"0","ConnectionClosedCount":"0","Type":"SINK","RollbackCount":"0","EventDrainSuccessCount":"10","KafkaEventSendTimer":"48502","StopTime":"0"},
 "CHANNEL.c1":{"ChannelCapacity":"1000","ChannelFillPercentage":"0.0","Type":"CHANNEL","ChannelSize":"0","EventTakeSuccessCount":"10","EventTakeAttemptCount":"158","StartTime":"1613963279841","EventPutAttemptCount":"10","EventPutSuccessCount":"10","StopTime":"0"},
 "SOURCE.r1":{"AppendBatchAcceptedCount":"0","GenericProcessingFail":"0","EventAcceptedCount":"10","AppendReceivedCount":"0","StartTime":"1613963279882","AppendBatchReceivedCount":"0","ChannelWriteFail":"0","EventReceivedCount":"10","EventReadFail":"0","Type":"SOURCE","AppendAcceptedCount":"0","OpenConnectionCount":"0","StopTime":"0"}}

含义对照

{
    "SOURCE.src-1":{
        "OpenConnectionCount":"0",      //目前与客户端或sink保持连接的总数量(目前只有avro source展现该度量)
        "Type":"SOURCE",                    
        "AppendBatchAcceptedCount":"1355",  //成功提交到channel的批次的总数量
        "AppendBatchReceivedCount":"1355",  //接收到事件批次的总数量
        "EventAcceptedCount":"28286",   //成功写出到channel的事件总数量,且source返回success给创建事件的sink或RPC客户端系统
        "AppendReceivedCount":"0",      //每批只有一个事件的事件总数量(与RPC调用中的一个append调用相等)
        "StopTime":"0",         //source停止时自Epoch以来的毫秒值时间
        "StartTime":"1442566410435",    //source启动时自Epoch以来的毫秒值时间
        "EventReceivedCount":"28286",   //目前为止source已经接收到的事件总数量
        "AppendAcceptedCount":"0"       //单独传入的事件到Channel且成功返回的事件总数量
    },
    "CHANNEL.ch-1":{
        "EventPutSuccessCount":"28286", //成功写入channel且提交的事件总数量
        "ChannelFillPercentage":"0.0",  //channel满时的百分比
        "Type":"CHANNEL",
        "StopTime":"0",         //channel停止时自Epoch以来的毫秒值时间
        "EventPutAttemptCount":"28286", //Source尝试写入Channe的事件总数量
        "ChannelSize":"0",          //目前channel中事件的总数量
        "StartTime":"1442566410326",    //channel启动时自Epoch以来的毫秒值时间
        "EventTakeSuccessCount":"28286",    //sink成功读取的事件的总数量
        "ChannelCapacity":"1000000",       //channel的容量
        "EventTakeAttemptCount":"313734329512" //sink尝试从channel拉取事件的总数量。这不意味着每次事件都被返回,因为sink拉取的时候channel可能没有任何数据
    },
    "SINK.sink-1":{
        "Type":"SINK",
        "ConnectionClosedCount":"0",    //下一阶段或存储系统关闭的连接数量(如在HDFS中关闭一个文件)
        "EventDrainSuccessCount":"28286",   //sink成功写出到存储的事件总数量
        "KafkaEventSendTimer":"482493",    
        "BatchCompleteCount":"0",       //与最大批量尺寸相等的批量的数量
        "ConnectionFailedCount":"0",    //下一阶段或存储系统由于错误关闭的连接数量(如HDFS上一个新创建的文件因为超时而关闭)
        "EventDrainAttemptCount":"0",   //sink尝试写出到存储的事件总数量
        "ConnectionCreatedCount":"0",   //下一个阶段或存储系统创建的连接数量(如HDFS创建一个新文件)
        "BatchEmptyCount":"0",      //空的批量的数量,如果数量很大表示souce写数据比sink清理数据慢速度慢很多
        "StopTime":"0",         
        "RollbackCount":"9",            //
        "StartTime":"1442566411897",
        "BatchUnderflowCount":"0"       //比sink配置使用的最大批量尺寸更小的批量的数量,如果该值很高也表示sink比souce更快
    }
    }

flume到kafka的脚本。

# Name the components on this agent
# a1是我们自定义的agent的名字
# a1.sources  a1这个agent包含的多个source的名字
# 包含几个组件就写一个名字空格隔开
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# 配置source
# 配置source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/mywebdata/access.log
# Describe the sink
# 配置sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mywebdata
a1.sinks.k1.kafka.bootstrap.servers = wq1:9092,wq2:9092,wq3:9092
a1.sinks.k1.kafka.flumeBatchSize = 1000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
# Use a channel which buffers events in memory
# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
# 将组件连接起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

flume从kafka采集到hdfs的脚本

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a3.sources.r1.batchSize = 5000
a3.sources.r1.batchDurationMillis = 2000
a3.sources.r1.kafka.bootstrap.servers = wq1:9092,wq2:9092,wq3:9092
a3.sources.r1.kafka.consumer.auto.offset.reset = earliest
a3.sources.r1.kafka.topics = mywebdata    
# Describe the sink
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = /flume-hdfs/
a3.sinks.k1.hdfs.filePrefix = webdata    
a3.sinks.k1.hdfs.rollInterval = 0
a3.sinks.k1.hdfs.rollSize = 67108864
a3.sinks.k1.hdfs.rollCount = 0
a3.sinks.k1.hdfs.fileType = DataStream
a3.sinks.k1.hdfs.writeFormat = Text
# Use a channel which buffers events in memory
a3.channels.c1.type = file
a3.channels.c1.capacity = 10000
a3.channels.c1.byteCapacityBufferPercentage = 20
a3.channels.c1.transactionCapacity = 10000
a3.channels.c1.byteCapacity = 20000000
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

报错

ava.io.IOException: Callable timed out after 30000 ms on file: /flume-hdfs//webdata.1613873488282.tmp
        at org.apache.flume.sink.hdfs.BucketWriter.callWithTimeout(BucketWriter.java:741)
        at org.apache.flume.sink.hdfs.BucketWriter.doFlush(BucketWriter.java:517)
        at org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:479)
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:441)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException
        at java.util.concurrent.FutureTask.get(FutureTask.java:205)
        at org.apache.flume.sink.hdfs.BucketWriter.callWithTimeout(BucketWriter.java:734)
        ... 6 more

目前可能想到的操作就是该机器作为消费者,最后没关导致问题出现(Soft lockup名称解释:所谓,soft lockup就是说,这个bug没有让系统彻底死机,但是若干个进程(或者kernel thread)被锁死在了某个状态(一般在内核区域),很多情况下这个是由于内核锁的使用的问题。)内核参数kernel.watchdog_thresh(/proc/sys/kernel/watchdog_thresh)系统默认值为10。如果超过2*10秒会打印信息,注意:调整值时参数不能大于60。

kernel:NMI watchdog: BUG: soft lockup - CPU#0 stuck for 24s! [kworker/u256:1:21875]

解决办法

echo 30 > /proc/sys/kernel/watchdog_thresh 

Hive数仓

下面三个步骤就是利用数仓对数据处理数据时走过的几步
第一步:查看数据的格式

[root@wq1 dt=26]# tail -f categories-.1603704061065 
82      IPDDpvnbs
94      QDTVDbatyc
11      Ubk
23      YCdjqay
[root@wq1 dt=26]# tail -f customers-.1603704300625 
999431  Dow Jones & Co. 'Nnypet Ekmcidvwrnomfk' 'Gqzvqt Susckrpkfog'    26, Great North Circle  Montrose        2005-04-17      South   73159   Nigeria      (839) 0574180   (185) 54897386
999491  Harrah's Entertainment Inc.     'Jpt Japhmiwb'  'Vxiwek Miquz'  5, Sanya Circle Mandalay        2004-05-09      East    64044   Bulgaria    (055) 6860602    (370) 10903682

[root@wq1 dt=26]# tail -f employees-.1603706521271
4739    Ellerkamp       Liza    'Sales Hpatgpvbn'       'Fla.'  1973-06-01      2019-01-05      163, West 5th Avenue    Zagreb  West    55828   Ghan'(03) 555-7341'  '3356'  WNGCMYQSE       WVDEA   66      75455.65
2103    Mcleskey        Contessa        'Sales Ojwcaexw'        'Ipw.'  1973-03-30      2016-04-29      14, Alamosa Drive       Bakersfield     Nort52540    Finland '(14) 555-5542' '6383'  MUGIBADJF       VHUSQRVXHX      44      96812.09
2948    Bigusiak        Markus  'Sales Jfnxtiypx'       'Jnv.'  1972-04-22      2018-11-23      225, Montrose Circle    Wrexham South   16134   Algeria      '(620) 555-8951'        '1129'  QEQAOYNJD       XGGZSBTX        13      78077.37
9306    Lavgle  Blondell        'Sales Vbcsraacanf'     'Ogf.'  1988-06-27      2016-12-20      61, Melody Lane Crystal Cove    North   89766   Tuvalu       '(694) 555-0165'        '3189'  DJNOS   KW      49      77783.98

[root@wq1 dt=26]# tail -f  orders-.1603708021871
2107109 395777  7721    2015-01-20      2018-12-23      2016-05-17      3       69606.79        Laila Mcglockton        7, Cumberland Circle    Jatiluhur    North   15685   Tanzania
2107121 460594  9092    2018-10-26      2016-09-20      2017-04-07      1       12460.05        Delana Chiotti  83, Bear Mountain View Circle   Oklahoma City        East    47512   Eritrea
2107133 582440  2649    2017-06-28      2015-06-01      2019-05-28      2       397072.97       Jennefer Freudiger      79, Viewpark Road       Okinawa      West    58620   Israel
2107145 151015  6753    2017-01-26      2019-01-28      2015-08-12      2       650344.50       Alana Kawasaki  23, H And H Avenue      Colon   West57409    Switzerland

[root@wq1 dt=26]# tail -f orderdetails-.1603699455177
2441191 9014    259.92  91      0.42
8467405 4039    83.91   21      0.30
7858542 1714    189.81  70      0.46
7872718 2661    207.48  173     0.00

[root@wq1 dt=26]# tail -f products-.1603703609450 
9409    YVMSauyq        6       1       '1 jng qo 44 vctsfkw'   158.71
9469    NJkxkwa 7       2       '77 omzj'       35.75
9529    JOOMRpptrr      14      9       '265 ym'        38.65
9589    Dbhyba  15      9       '65 - 112 b uwkfn'      96.62

第二步:数仓分层设计过程

在查看过数据的结构后,绘制一张表格对每层所需字段进行梳理,后期字段变动也方便查找提高效率;数仓在设计过程中根据自己的业务进行处理。为什么要分层,首先,数据量大,源数据有时字段多大上千个,如果不进行分层梳理,那么处理起来效率极低,其次,每层都可以对应一定的业务需求,其次,每层下分担一定数据量,减轻集群压力,减少数据处理时间,最后,每一层,可对接一部分业务;

ODS(Operational Data Store)操作数据层,细节数据查询的功能在ODS来完成,与源系统的增量或者全量数据基本保持一致。总的来说这一层包含源数据所有字段

表名 employees_dwd 员工信息明细表
字段 employeeId emloyeeIdname title reportTo
字段类型 int string string string
字段映射 员工id 员工姓名 职位 上级
表间关系 LOAD DATA INPATH ‘hdfs://wq1:9000/source-six/employee’ overwrite into table employees_ods partition (yt=”2021″,mt=”03″,dt=”29″);

CDM(Common Data Model):公共维度模型层;细分为DWD和DWS;主要作用是完成数据加工与整合、建立一致性的维度、构建可复用的面向分析和统计的明细事实表以及汇总公共粒度的指标。
DWD(Data Warehouse Detail):明细数据层

表名 employees_dwd 员工信息明细表
字段 employeeId2dwd emloyeeIdname2dwd title2dwd reportTo2dwd
字段类型 int string string string
字段来源 a.employeeId concat(a.firstName,a.lastname) a.title b.emloyeeIdname2dwd
字段映射 员工id 员工姓名 职位 上级
建表类型 事实明细层
表间关系 FROM shoppingmall.employees_ods a INNER JOIN wushi_cdm.duty_dwd b ON a.reportTo=b.employeeId

DWS(Data Warehouse Summary):汇总数据层

表名 employeehm_dws 员工入职月份维度表
字段 employeeId2dws month2dws
字段类型 int string
字段来源 employeeId month(birthdate)
字段映射 员工id 入职月份
建表类型 汇总维度层
表间关系 FROM category_ods

ADS(Application Data Service):应用数据层

表名 saletop10_ads 销售片区Top10详情表
字段 producename2ads sale2ads
字段类型 int string
字段来源 b.producename sum(a.unitpricequantitydiscount)
字段映射 商品名称 销售额
建表类型 汇总维度层
表间关系 FROM (SELECT b.producename,sum(a.unitpricequantitydiscount) c FROM shoppingmall.orderdetails_ods a INNER JOIN shoppingmall.products_ods b ON a.productid=b.productid GROUP BY b.producename ORDER BY c limit 10)tmp;

第三步:各层建表语句,同步过程

  1. ods层
  • 建库建表
CREATE EXTERNAL TABLE IF NOT EXISTS employees_ods_t
(
employeeId INT COMMENT "员工id",
lastname STRING COMMENT "员工名",
firstName STRING COMMENT "员工姓",
title STRING COMMENT "头衔",
titleofCountry STRING COMMENT "所属国家",
birthdate DATE COMMENT "出生日期",
hiredate DATE COMMENT "入职时间",
address STRING COMMENT "地址",
city STRING COMMENT "城市",
region STRING COMMENT "片区",
postalCode STRING COMMENT "邮编",
country STRING COMMENT "国籍",
homePhone STRING COMMENT "电话",
extension STRING COMMENT "附加",
photo STRING COMMENT "照片",
note STRING COMMENT "注意",
reportTo INT COMMENT "上级领导",
salary DOUBLE COMMENT "工资"
)
COMMENT "员工表"
partitioned by (yt string,mt string,dt string)
row format delimited fields terminated by '\t'
LOCATION '/user/hive/warehouse/ods_database.db/employees_ods_t';
  • 映射hdfs的数据到hive中(hdfs的文件只是换两个与hive表location相同的位置而已)
LOAD DATA INPATH 'hdfs://wq1:9000/source-six/employee' overwrite into table employees_ods partition (yt="2021",mt="03",dt="29");
  • 确认数据
select * from employees_ods limit 10;
#如果查看hdfs集群上的那个LOCATION有数据,但是在hive中查不出数据,执行下面这个,不小心删除外部表后,可再执行该命令
MSCK REPAIR TABLE employees_ods;
  1. DWD层
  • 建库建表
CREATE EXTERNAL TABLE IF NOT EXISTS employees_dwd
(
employeeId INT COMMENT "员工id",
emloyeeIdname2dwd STRING COMMENT "员工姓名",
title STRING COMMENT "头衔",
reportTo INT COMMENT "上级领导",
)
COMMENT "员工信息明细表"
partitioned by (yt string,mt string,dt string)
LOCATION '/user/hive/warehouse/wushi_cdm.db/employees_dwd';
  • 数据同步、处理
insert 
into  employees_dwd partition(yt="2021",mt="04",dt="01")
select
a.employeeId          ,
concat(a.firstName,a.lastname),
a.title       ,
b.emloyeeIdname2dwd          ,
FROM shoppingmall.employees_ods a
INNER JOIN
wushi_cdm.duty_dwd b
ON
a.reportTo=b.employeeId2dwd;
  1. ADS
create table sourth 
as  
select
b.producename ,
sum(a.unitprice*quantity*discount) 
as c  
from 
pdw_database.orderdetails_pdw_t 
as a  
inner join 
pdw_database.products_pdw_t 
as b  
on a.productid=b.productid  
inner join
pdw_database.orders_pdw_t 
as e 
on a.orderid=e.orderid 
where e.shipregion='South' 
group by b.producename 
order by c
limit 10

自定义hive函数

将数据导入MySQL

导入MySQL前,在数据库上建同样结构的表

CREATE  TABLE  lake 
( 
    table_name  VARCHAR(20) COMMENT "表名", 
    FieldName VARCHAR(20) COMMENT "列名" 
);

sqoop导数据的脚本(按照制表符分隔,map数量为1)

sqoop export 
--connect jdbc:mysql://localhost:3306/system_db
--username root 
--password 123456 
--table lake 
--export-dir /user/hive/warehouse/system_db.db/lake/year=2021/month=04/day=01/lake.txt
--fields-terminated-by '\t' 
--m 1