druid.io实践2---集群数据迁移


数据迁移指的是两套Druid、kafka、HDFS集群之间的数据迁移。其中会用到流处理框架Samza,Hadoop distcp命令,Druid重建索引的insert-segment-to-db工具。

整体流程

image

背景交代

Druid的DeepStorage采用HDFS。

实时输入流通过tranquility从kafka消费topic进行realtime indexing。
批量数据通过camus拉取kafka数据到hdfs,部分topic使用camus sweep进行去重。周期提交新提交(修改)目录到overloard进行batch indexing。

大体思路

  • kafka镜像流:
    启动samza任务,从老的kafka集群consume消息,produce到新的kafka集群。直接使用kafka的mirror也可以,由于项目使用的arvo编解码,涉及到schema-id的问题,所以只能通过samza任务。
  • druid批量数据获取&index:
    给新druid集群提交任务,camus批量拉取数据,发送post请求提交配置json到overload执行batch index。
  • 脏数据覆盖(批量路线稳定、数据正确之后)
    • 利用hadoop-distcp迁移老hdfs中之前所有的segment数据
    • 利用insert-segment-to-db工具在mysql生成这之前的元数据(索引)。
  • druid实时数据获取&index:
    Tranquility任务,获取实时数据,提交给overload执行realtime index
  • 切换(稳定后)
    将老的kafka的流的生产者逐个切换到新的kafka上

使用工具介绍

Samza Task

可以参考博客中的Samza相关

hadoop-distcp

迁移老hdfs中之前所有的segment数据

介绍

DistCp(分布式拷贝)是用于大规模集群内部和集群之间拷贝的工具。 它使用Map/Reduce实现文件分发,错误处理和恢复,以及报告生成。 它把文件和目录的列表作为map任务的输入,每个任务会完成源列表中部分文件的拷贝

https://hadoop.apache.org/docs/r1.0.4/cn/distcp.html

http://hadoop.apache.org/docs/r2.7.3/hadoop-distcp/DistCp.html

http://stackoverflow.com/questions/31862904/how-to-do-i-copy-data-from-one-hdfs-to-another-hdfs

1
2
3
Usage: $ hadoop distcp <src> <dst>
example: $ hadoop distcp hdfs://nn1:8020/file1 hdfs://nn2:8020/file2

默认情况下,如果在拷贝的目的地同名文件已经存在,则会默认跳过这些文件。

可以通过-overwrite选项指定覆盖掉同名文件,或者通过-update选项来更新同名文件。

-overwrite来覆盖刚开启批量任务时的数据不完整那个小时的数据。

insert-segment-to-db

介绍

HDFS上Segment在新的mysql中生成索引

https://groups.google.com/forum/#!searchin/druid-user/migration$20cluster|sort:relevance/druid-user/1dOMkCrQGmE/b8exkaI4CwAJ
https://groups.google.com/forum/#!topic/druid-user/yvnXsDEOkDU
http://druid.io/docs/0.9.1.1/operations/insert-segment-to-db.html

参数说明

–workingDir:segment位置

–updateDescriptor:默认true,如果desciptor.json的实际路径与“loadSpec”中的路径是不同的,该工具将更新“loadSpec”字段的描述符。

例如:将老集群的数据迁移过去之后,desciptor.json中的路径没变,生成索引时会更新成现在的路径

1
2
3
4
5
6
7
8
hadoop fs -cat hdfs://nn1:port/.../dataSource/20161025T060000.000+0800_20161025T070000.000+0800/2016-10-25T07_33_53.379+08_00/0/descriptor.json
其中loadSpec部分:
"loadSpec": {
"type": "hdfs",
"path": "hdfs://nn1:port/.../dataSource/20161025T060000.000+0800_20161025T070000.000+0800/2016-10-25T07_33_53.379+08_00/0/index.zip"
},

具体操作

官方例子

1
2
3
4
5
6
7
8
9
java
-Ddruid.metadata.storage.type=mysql
-Ddruid.metadata.storage.connector.connectURI=jdbc:mysql://mysql-db:3306/druid
-Ddruid.metadata.storage.connector.user=userxxx
-Ddruid.metadata.storage.connector.password=passxxxx
-Ddruid.extensions.loadList=[\"mysql-metadata-storage\",\"druid-hdfs-storage\"]
-Ddruid.storage.type=hdfs
-cp $DRUID_CLASSPATH
io.druid.cli.Main tools insert-segment-to-db --workingDir hdfs://nn1:port/.../dataSource

打印的日志:

1
2
2016-10-25T17:15:49,342 INFO [main] io.druid.storage.hdfs.HdfsDataSegmentFinder - Found segment [dataSource_2016-01-13T09:00:00.000+08:00_2016-01-13T10:00:00.000+08:00_2016-01-13T10:34:03.820+08:00_6] located at [hdfs://nn1:port/.../dataSource/20160113T090000.000+0800_20160113T100000.000+0800/2016-01-13T10_34_03.820+08_00/6/index.zip]
2016-10-25T17:15:49,342 INFO [main] io.druid.storage.hdfs.HdfsDataSegmentFinder - Updating loadSpec in descriptor.json at [hdfs://nn1:port/.../dataSource/20160113T090000.000+0800_20160113T100000.000+0800/2016-01-13T10_34_03.820+08_00/6/descriptor.json] with new path [dataSource/20160113T090000.000+0800_20160113T100000.000+0800/2016-01-13T10_34_03.820+08_00/6/index.zip]

会先更新descriptor.json中的路径:由之前的老集群的绝对路径—新集群的相对路径

错误处理

后面如果出现找不到新的相对路径的错误,可以把hadoop的配置文件core-site.xml放到common文件夹下。

java.io.FileNotFoundException:file /dataSource/20161025T060000.000+0800_20161025T070000.000+0800/2016-10-25T07_33_53.379+08_00/0/index.zip does not exist

文章目录
  1. 1. 整体流程
    1. 1.1. 背景交代
    2. 1.2. 大体思路
  2. 2. 使用工具介绍
    1. 2.1. Samza Task
    2. 2.2. hadoop-distcp
      1. 2.2.1. 介绍
    3. 2.3. insert-segment-to-db
      1. 2.3.1. 介绍
      2. 2.3.2. 参数说明
      3. 2.3.3. 具体操作
      4. 2.3.4. 错误处理
|