druid.io---Coordinator balancer源码解析

Druid.io每次增加历史节点,经过一段时间的balance,数据总会均衡,下面从代码分析一下原因。Druid版本为0.9.11

DruidCoordinatorBalancer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
{
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp);
final int maxSegmentsToMove = params.getCoordinatorDynamicConfig().getMaxSegmentsToMove();
for (Map.Entry<String, MinMaxPriorityQueue<ServerHolder>> entry :
params.getDruidCluster().getCluster().entrySet()) {//循环cluster中的所有server
final List<ServerHolder> serverHolderList = Lists.newArrayList(entry.getValue());
for (int iter = 0; iter < maxSegmentsToMove; iter++) {//循环maxSegmentsToMove次
//选择将要移动的segment
final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList);
if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) {
//选择将要移动到的server
final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList);
if (holder != null) {//执行移动
moveSegment(segmentToMove, holder.getServer(), params);
}
}
}
}
//移动单个segment,入参:segment、移动到哪个server,协调节点参数
protected void moveSegment(
final BalancerSegmentHolder segment,
final ImmutableDruidServer toServer,
final DruidCoordinatorRuntimeParams params
){}
}

选择将要移动的segment—pickSegmentToMove()方法

所在类:

1
2
3
4
5
6
7
8
public class RandomBalancerStrategy implements BalancerStrategy
{
@Override
public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders)
{
return sampler.getRandomBalancerSegmentHolder(serverHolders);
}
}

getRandomBalancerSegmentHolder

循环遍历serverList中的每一个server中的每一个segment,每个segment都是等概率的1/N,所以当前已有segment越多的server选中segment的概率会越大。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ReservoirSegmentSampler
{
public BalancerSegmentHolder getRandomBalancerSegmentHolder(final List<ServerHolder> serverHolders)
{
final Random rand = new Random();
ServerHolder fromServerHolder = null;
DataSegment proposalSegment = null;
int numSoFar = 0;
for (ServerHolder server : serverHolders) {//serverList中循环每一个server
for (DataSegment segment : server.getServer().getSegments().values()) {//循环每一个server中的segment
int randNum = rand.nextInt(numSoFar + 1);
// w.p. 1 / (numSoFar+1), swap out the server and segment
if (randNum == numSoFar) {//循环过程中,后面的会覆盖前面的
fromServerHolder = server;
proposalSegment = segment;
}
numSoFar++;
}
}
if (fromServerHolder != null) {
return new BalancerSegmentHolder(fromServerHolder.getServer(), proposalSegment);
} else {
return null;
}
}
}

实现一个等概率的选择:
比如说一共有5个数,第2个选中的话,就需要第3、4、5选不中:

1
1/2 * 2/3 * 3/4 * 4/5 =1/5

第i个:

1
1/(i+1)*(i+1)/(i+2)*……*(n-2)/(n-1)*(n-1)/n=1/n

选择将要移动到的server—findNewSegmentHomeBalancer()

所在类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class CostBalancerStrategy implements BalancerStrategy
{
@Override
public ServerHolder findNewSegmentHomeBalancer(
DataSegment proposalSegment, List<ServerHolder> serverHolders
)
{
return chooseBestServer(proposalSegment, serverHolders, true).rhs;
}
}
protected Pair<Double, ServerHolder> chooseBestServer(
final DataSegment proposalSegment,
final Iterable<ServerHolder> serverHolders,
final boolean includeCurrentServer
)
{
Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadCount));
List<ListenableFuture<Pair<Double, ServerHolder>>> futures = Lists.newArrayList();
//开启一个线程池,将serverList中的每个server计算出computeCost的值
for (final ServerHolder server : serverHolders) {
futures.add(
service.submit(
new Callable<Pair<Double, ServerHolder>>()
{
@Override
public Pair<Double, ServerHolder> call() throws Exception
{
return Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server);
}
}
)
);
}
final ListenableFuture<List<Pair<Double, ServerHolder>>> resultsFuture = Futures.allAsList(futures);
try {
for (Pair<Double, ServerHolder> server : resultsFuture.get()) {
if (server.lhs < bestServer.lhs) {//比较computeCost计算出的值最小的为best
bestServer = server;
}
}
}
catch (Exception e) {
log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit();
}
service.shutdown();
return bestServer;
}

server的cost的计算方法

当前server已有的and将要加载的每个seg,与将要移动的seg的cost值之和。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
protected double computeCost(
final DataSegment proposalSegment, final ServerHolder server, final boolean includeCurrentServer
)
{
final long proposalSegmentSize = proposalSegment.getSize();
//如果当前server已有该seg
if (!includeCurrentServer && server.isServingSegment(proposalSegment)) {
return Double.POSITIVE_INFINITY;
}
//如果当前server正在load该segment或者存储空间不够
if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
return Double.POSITIVE_INFINITY;
}
double cost = 0d;
//将要balance的seg与当前server的cost和,不包括该seg
cost += computeJointSegmentsCost(
proposalSegment,
Iterables.filter(
server.getServer().getSegments().values(),
Predicates.not(Predicates.equalTo(proposalSegment))
)
);
//将要load的segment与当前server的cost和
cost += computeJointSegmentsCost(proposalSegment, server.getPeon().getSegmentsToLoad());
return cost;
}

两个seg的cost的计算方法

如果要提高查询速度,同一个查询查询的数据尽量分散在不同的server。而上述chooseBestServer中选择最佳server是选择cost最小的,即将要移动的seg移动到cost小的server。

druid 0.9.0
  • recencyPenalty: 离得越近的时间,越可能在同一个查询中
  • dataSourcePenalty: 同一个数据源的数据,越可能在同一个查询中
  • gapPenalty: 重叠区域越多,越可能在同一个查询中

cost越大的因素:seg大小越大、七天内离现在越近、同一个数据源、两段seg的区间重叠或者间距越小。
cost越小的因素:seg大小越小、七天内离现在越远、非同一个数据源、两段seg的区间间距越大

同一个server符合cost越小,即数据段离得越远、不重叠且间距大、来自不同的数据源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public double computeJointSegmentCosts(final DataSegment segment1, final DataSegment segment2)
{
final Interval gap = segment1.getInterval().gap(segment2.getInterval());
final double baseCost = Math.min(segment1.getSize(), segment2.getSize());//取两个seg的最小值
double recencyPenalty = 1;//默认值为1,如果两个seg都为七天之内的数据,最大为4,离得越远则值越小
double dataSourcePenalty = 1;//默认为1,两个seg属于同一个数据源则为2
double gapPenalty = 1;//两个seg有重叠部分为2,否则最大为2,重叠区域越大值越小
if (segment1.getDataSource().equals(segment2.getDataSource())) {
dataSourcePenalty = 2;
}
double segment1diff = referenceTimestamp - segment1.getInterval().getEndMillis();
double segment2diff = referenceTimestamp - segment2.getInterval().getEndMillis();
if (segment1diff < SEVEN_DAYS_IN_MILLIS && segment2diff < SEVEN_DAYS_IN_MILLIS) {
recencyPenalty = (2 - segment1diff / SEVEN_DAYS_IN_MILLIS) * (2 - segment2diff / SEVEN_DAYS_IN_MILLIS);
}
/** gap是null如果两段区间重叠或者他们相邻 */
if (gap == null) {
gapPenalty = 2;
} else {
long gapMillis = gap.toDurationMillis();
if (gapMillis < THIRTY_DAYS_IN_MILLIS) {
gapPenalty = 2 - gapMillis / THIRTY_DAYS_IN_MILLIS;
}
}
final double cost = baseCost * recencyPenalty * dataSourcePenalty * gapPenalty;
return cost;
}
druid 0.9.1改进

https://github.com/druid-io/druid/pull/2972

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static double computeJointSegmentsCost(final DataSegment segment, final Iterable<DataSegment> segmentSet)
{
double totalCost = 0;
for (DataSegment s : segmentSet) {
totalCost += computeJointSegmentsCost(segment, s);
}
return totalCost;
}
public static double computeJointSegmentsCost(final DataSegment segmentA, final DataSegment segmentB)
{
final Interval intervalA = segmentA.getInterval();
final Interval intervalB = segmentB.getInterval();
final double t0 = intervalA.getStartMillis();
final double t1 = (intervalA.getEndMillis() - t0) / MILLIS_FACTOR;
final double start = (intervalB.getStartMillis() - t0) / MILLIS_FACTOR;
final double end = (intervalB.getEndMillis() - t0) / MILLIS_FACTOR;
// constant cost-multiplier for segments of the same datsource
final double multiplier = segmentA.getDataSource().equals(segmentB.getDataSource()) ? 2.0 : 1.0;
return INV_LAMBDA_SQUARE * intervalCost(t1, start, end) * multiplier;
//入参分别为:A区间的end,B区间的start、B区间的end
}

具体函数:
两个seg的时间区间 :X = [x0=0, x1) and Y = [y0, y1)
image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public static double intervalCost(double x1, double y0, double y1)
{
if (x1 == 0 || y1 == y0) {
return 0;
}
if (y0 < 0) {//y0<x0交换两个区间,便于计算
double tmp = x1;
x1 = y1 - y0;
y1 = tmp - y0;
y0 = -y0;
}
// 上述情况交换区间之后,肯定满足x0 <= y0
if (y0 < x1) {//有重叠,两种情况
// X = [ A )[ B )[ C ) or [ A )[ B )
// Y = [ ) [ )[ C )
// A is empty if y0 = 0
// C is empty if y1 = x1
//cost(X, Y) = cost(A, Y) + cost(B, C) + cost(B, B)
//cost(A, Y) and cost(B, C) can be calculated using the non-overlapping case
//which reduces the overlapping case to computing
//cost(B, B) = \int_0^{\beta} \int_{0}^{\beta} e^{-|x-y|}dxdy
// = 2 \cdot (\beta + e^{-\beta} - 1)
// where \beta is the length of interval B
final double beta; // b1 - y0, length of interval B
final double gamma; // c1 - y0, length of interval C
if (y1 <= x1) {
beta = y1 - y0;
gamma = x1 - y0;
} else {
beta = x1 - y0;
gamma = y1 - y0;
}
return intervalCost(y0, y0, y1) + // cost(A, Y)
intervalCost(beta, beta, gamma) + // cost(B, C)
2 * (beta + FastMath.exp(-beta) - 1); // cost(B, B)
} else {//没有重叠
//cost(X, Y) = \int_0^{x_1} \int_{y_0}^{y_1} e^{-|x-y|} dxdy
// = \int_0^{x_1} \int_{y_0}^{y_1} e^{x-y} dxdy
// = (e^{-y_1} - e^{-y_0}) - (e^{x_1-y_1} - e^{x_1-y_0})
final double exy0 = FastMath.exp(x1 - y0);
final double exy1 = FastMath.exp(x1 - y1);
final double ey0 = FastMath.exp(0f - y0);
final double ey1 = FastMath.exp(0f - y1);
return (ey1 - ey0) - (exy1 - exy0);
}
}
文章目录
  1. 1. DruidCoordinatorBalancer.java
    1. 1.1. 选择将要移动的segment—pickSegmentToMove()方法
      1. 1.1.1. getRandomBalancerSegmentHolder
    2. 1.2. 选择将要移动到的server—findNewSegmentHomeBalancer()
      1. 1.2.1. server的cost的计算方法
      2. 1.2.2. 两个seg的cost的计算方法
        1. 1.2.2.1. druid 0.9.0
        2. 1.2.2.2. druid 0.9.1改进
|