druid.io 3---druid数据

数据格式

  • Timestamp列: 所有的查询都以时间为中心。
  • Dimension列(维度): Dimensions对应事件的维度,通常用于筛选过滤数据。
  • Metric列(度量): Metrics是用于聚合和计算的列。通常是数字,并且支持count、sum、mean等聚合操作。
    线上广告的例子:
1
2
3
4
5
6
7
timestamp publisher advertiser gender country click price
2011-01-01T01:01:35Z bieberfever.com google.com Male USA 0 0.65
2011-01-01T01:03:63Z bieberfever.com google.com Male USA 0 0.62
2011-01-01T01:04:51Z bieberfever.com google.com Male USA 1 0.45
2011-01-01T01:00:00Z ultratrimfast.com google.com Female UK 0 0.87
2011-01-01T02:00:00Z ultratrimfast.com google.com Female UK 0 0.99
2011-01-01T02:00:00Z ultratrimfast.com google.com Female UK 1 1.53

dimensions: publisher, advertiser, gender, and country。
metrics: click和price

预聚合roll up

Roll-up是在一系列维度选定后的数据之上做的初始聚合,一般发生在push/pull数据流阶段,通过realtime node或者tranquility+indexing service的方式。

通过queryGranularity定义数据roll up的粒度。

这种预聚合的方式可以很显著的减少数据的存储(可减少100倍)。 Druid也是通过这种方式来减少数据的存储。 这种减少存储的方式也会带来副作用,比如我们没有办法再查询到每条数据具体的明细。换句话说,数据聚合的粒度是我们能查询数据的最小粒度。

例如定义粒度为HOUR

1
2
GROUP BY timestamp, publisher, advertiser, gender, country
:: impressions = COUNT(1), clicks = SUM(click), revenue = SUM(price)

上述例子聚合之后为:

1
2
3
4
5
timestamp publisher advertiser gender country impressions clicks revenue
2011-01-01T01:00:00Z ultratrimfast.com google.com Male USA 1800 25 15.70
2011-01-01T01:00:00Z bieberfever.com google.com Male USA 2912 42 29.18
2011-01-01T02:00:00Z ultratrimfast.com google.com Male UK 1953 17 17.31
2011-01-01T02:00:00Z bieberfever.com google.com Male UK 3194 170 34.01

也可以将过久时间的历史数据进行自定义的roll up操作,例如近90天的数据按小时进行roll up预处理,然后将90天之后的数据提交batch indexing任务按天roll up。

数据分片

数据以segments(段)的形式就行分片,Segments是自包含容器,包括基于列的压缩,以及这些列的索引,对应下面要介绍的存储的descriptor.json和index.zip。Druid只需要清楚如何扫描这些segments就可以查询。

第一级分片-segment

以时间作为第一级分片,通过segmentGranularity定义segments的分片时间粒度。

第二级分片-shard

支持两种类型的分区策略:“散列”、“单维度”。在大多数情况下,建议使用散列分区,可以提高索引性能和创造大小更均匀的数据段

基于哈希散列

首先选择第一级分片的segments,然后根据segments每一行的所有维度的hash来划分。

= 输入的基数 / targetPartitionSize```
1
配置举例:

“partitionsSpec”: {
“type”: “hashed”,
“targetPartitionSize”: 5000000
}

1
2
3
4
numShards:可以直接指定shard个数。当配置这个参数时,还可以配置partitionDimensions来指定维度,而不用全部维度
#### 单维度划分
首先自动选择一个维度划分,然后分离该维度成连续的范围。每一部分将包含所有行维度的值范围。

“partitionsSpec”: {
“type”: “dimension”,
“targetPartitionSize”: 5000000
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
可以通过partitionDimension指定维度进行划分,不用自动选择的维度。
#### shard大小推荐
根据[官方文档](http://druid.io/docs/latest/design/segments.html),shard大小推荐为300MB-700MB,需要调整两级分片的参数segmentGranularity和partitioningSpec。
个人是这么理解的,shard相当于是druid数据最小存储单位,分得太大的话,可能在各个历史节点分散不开。分得合适的话,查询的时候在历史节点比较分散,可以充分利用每个历史节点的cpu。mark,有空看看这部分的源码。
### 例子
Segments通过datasource, interval, version, partition number来区分。
> dataSource_interval_version_partitionNumber。
interval:时间段,由queryGranularity决定
partition number:shard的序号,shard的个数由输入的基数/targetPartitionSize决定
例如:
Segment sampleData_2011-01-01T01:00:00:00Z_2011-01-01T02:00:00:00Z_v1_0 包含

2011-01-01T01:00:00Z ultratrimfast.com google.com Male USA 1800 25 15.70
2011-01-01T01:00:00Z bieberfever.com google.com Male USA 2912 42 29.18

1
Segment sampleData_2011-01-01T02:00:00:00Z_2011-01-01T03:00:00:00Z_v1_0 包含

2011-01-01T02:00:00Z ultratrimfast.com google.com Male UK 1953 17 17.31
2011-01-01T02:00:00Z bieberfever.com google.com Male UK 3194 170 34.01

1
2
3
4
5
6
7
## 数据存储
**MVCC**:多版本控制,以HDFS作为DeepStorage为例,HDFS上会保存每次修改的版本,history节点只load最新的数据,即用最新的版本来表示数据。
druid在0.9版本之后,HDFS上存储格式如下:
**hdfs://nn1:port/.../ sampleData / startTime_endTime / updateTime / shard / index.zip**
**hdfs://nn1:port/.../ sampleData / startTime_endTime / updateTime / shard / descriptor.json**

startTime_endTime:看聚合粒度,可能是1个小时之内,可能是1天之内
updateTime:修改时间,即版本号。取最新的一个版本
shard:分片,数据比较多,一个分片放不下

1
2
3
### HDFS存储举例
**hdfs://nn1:port/.../sampleData/20160801T120000.000+0800_20160801T130000.000+0800/**
目录下有如下两个版本:

2016-08-01T12_07_40.483+08_00/ rwxr-xr-x root/supergroup 2016-08-01 13:15:12
2016-08-01T13_17_06.199+08_00/ rwxr-xr-x root/supergroup 2016-08-01 13:23:40

1
**2016-08-01T13_17_06.199+08_00**目录下有3个分片:

0/ rwxr-xr-x root/supergroup 2016-08-01 13:23:05
1/ rwxr-xr-x root/supergroup 2016-08-01 13:23:11
2/ rwxr-xr-x root/supergroup 2016-08-01 13:23:46

1
**0/**目录下有两个文件:

descriptor.json rw-r–r– eadata/supergroup 766 B 128 mb 3 2016-08-01 13:23:05
index.zip rw-r–r– eadata/supergroup 5.56 mb 128 mb 3 2016-08-01 13:23:04

1
descriptor.json保存的信息:

{
“dataSource”: “sampleData”,//数据源
“interval”: “2016-08-01T12:00:00.000+08:00/2016-08-01T13:00:00.000+08:00”,//时间区间
“version”: “2016-08-01T13:17:06.199+08:00”,//版本号,即修改时间
“loadSpec”: {//存储路径
“type”: “hdfs”,
“path”: “hdfs://nn1:port/…./sampleData/20160801T120000.000+0800_20160801T130000.000+0800/2016-08-01T13_17_06.199+08_00/0/index.zip”
},
“dimensions”: “publisher,advertiser,gender,country”,
“metrics”: “click,price”,
“shardSpec”: {//分片信息
“type”: “hashed”,
“partitionNum”: 0,
“partitions”: 3,
“partitionDimensions”: []
},
“binaryVersion”: 9,
“size”: 8959843,//index.zip解压后的大小
“identifier”: “sampleData_2016-08-01T12:00:00.000+08:00_2016-08-01T13:00:00.000+08:00_2016-08-01T13:17:06.199+08:00”
}
```

当historical节点加载HDFS中的segment到本地存储时,会解压index.zip,解压出00000.smoosh、meta.smoosh、version.bin三个文件。保存在本地路径:

druid.segmentCache.locations / sampleData / startTime_endTime / updateTime / shard

descriptor.json保存在本地路径:

druid.segmentCache.locations / info_dir / sampleData / startTime_endTime / updateTime / shard

druid.segmentCache.locations在historical节点配置中配置。

druid在0.9版本之前,realtime index放在上述路径中,batch(批量)任务会放在:
datasource / datasource / startTime_endTime / updateTime /shard / index.zip

数据索引

Druid是列式存储,每一个列都是单独存储,在查询的过程中只扫描查询所需的列即可。不同的列可以采用不同的压缩方式,也可以关联不同的索引。

Druid的索引是基于每一个分片(即segment)上的。

数据加载

Druid使用的通常情况是,联合使用批量和实时数据流的加载方法。近期的数据通过实时方式处理,离线批量处理来来提高精度。

在一些网络抖动延迟的非正常场景中,实时数据流加载方式有可能出现消息丢失或者消息重复,因此批量再加载方式消除了这种历史数据的潜在错误。或者由于某种原因,需要修改数据,批量再加载方式也提供给你再加载数据的选项。

实时方式

  • Stream push
    • Tranquility+indexing service,数据来自于一个数据流系统,如Kafka、Storm、Spark Streaming。Tranquility:一个发送数据流到Druid的http客户端。
  • Stream pull
    • Realtime Node,直接从外部数据源拉数据流进入Druid。

      离线批处理方式

  • Files
    • Batch Data Ingestion+indexing service,从HDFS、S3、本地文件、或者其他支持批处理的Hadoop文件系统加载数据。例如:利用camus从kafka拉取数据到hdfs,再提交HDFS上新拉取的数据到overlord进行batch indexing。

下图中,上面的indexing service为realtime,下面的为Batch Data Ingestion
image

数据查询

Druid原生的查询方式是通过http发送json,但是社区已经贡献出多种查询库,包括SQL方式的plyql。

Druid被设计为执行单表操作,不支持join操纵(实际上可以做join)。 生产环境需要在ETL阶段进行join,因为数据在加载进Druid之前必须规范化。

参考

druid.io
druidio.cn
lxw的大数据田地–Druid.io实时OLAP数据分析存储系统介绍
萌の宇博客–realtime node与index server区别
zqhxuyuan博客–Druid OLAP架构设计

文章目录
  1. 1. 数据格式
  2. 2. 预聚合roll up
  3. 3. 数据分片
    1. 3.1. 第一级分片-segment
    2. 3.2. 第二级分片-shard
      1. 3.2.1. 基于哈希散列
  4. 4. 数据索引
  5. 5. 数据加载
    1. 5.1. 实时方式
    2. 5.2. 离线批处理方式
  6. 6. 数据查询
  7. 7. 参考
|