序
本文主要研究一下elasticsearch的RoutingService
RoutingService
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java
public class RoutingService extends AbstractLifecycleComponent { private static final Logger logger = LogManager.getLogger(RoutingService.class); private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute"; private final ClusterService clusterService; private final AllocationService allocationService; private AtomicBoolean rerouting = new AtomicBoolean(); @Inject public RoutingService(ClusterService clusterService, AllocationService allocationService) { this.clusterService = clusterService; this.allocationService = allocationService; } @Override protected void doStart() { } @Override protected void doStop() { } @Override protected void doClose() { } /** * Initiates a reroute. */ public final void reroute(String reason) { try { if (lifecycle.stopped()) { return; } if (rerouting.compareAndSet(false, true) == false) { logger.trace("already has pending reroute, ignoring {}", reason); return; } logger.trace("rerouting {}", reason); clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")", new ClusterStateUpdateTask(Priority.HIGH) { @Override public ClusterState execute(ClusterState currentState) { rerouting.set(false); return allocationService.reroute(currentState, reason); } @Override public void onNoLongerMaster(String source) { rerouting.set(false); // no biggie } @Override public void onFailure(String source, Exception e) { rerouting.set(false); ClusterState state = clusterService.state(); if (logger.isTraceEnabled()) { logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state:\n{}", source, state), e); } else { logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]", source, state.version()), e); } } }); } catch (Exception e) { rerouting.set(false); ClusterState state = clusterService.state(); logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e); } }}复制代码
- RoutingService的构造器要求输入clusterService及allocationService;其reroute方法主要是向clusterService提交ClusterStateUpdateTask,其execute方法是委托给allocationService.reroute
AllocationService.reroute
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java
public class AllocationService { //...... /** * Reroutes the routing table based on the live nodes. ** If the same instance of ClusterState is returned, then no change has been made. */ public ClusterState reroute(ClusterState clusterState, String reason) { return reroute(clusterState, reason, false); } /** * Reroutes the routing table based on the live nodes. *
* If the same instance of ClusterState is returned, then no change has been made. */ protected ClusterState reroute(ClusterState clusterState, String reason, boolean debug) { ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState); RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState); // shuffle the unassigned nodes, just so we won't have things like poison failed shards routingNodes.unassigned().shuffle(); RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState, clusterInfoService.getClusterInfo(), currentNanoTime()); allocation.debugDecision(debug); reroute(allocation); if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) { return clusterState; } return buildResultAndLogHealthChange(clusterState, allocation, reason); } private void reroute(RoutingAllocation allocation) { assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes"; assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() : "auto-expand replicas out of sync with number of nodes in the cluster"; // now allocate all the unassigned to available nodes if (allocation.routingNodes().unassigned().size() > 0) { removeDelayMarkers(allocation); gatewayAllocator.allocateUnassigned(allocation); } shardsAllocator.allocate(allocation); assert RoutingNodes.assertShardStats(allocation.routingNodes()); } //......}复制代码
- AllocationService的reroute方法主要是构建RoutingAllocation,然后在进行gatewayAllocator.allocateUnassigned及shardsAllocator.allocate(allocation)
BalancedShardsAllocator.allocate
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
public class BalancedShardsAllocator implements ShardsAllocator { //...... public void allocate(RoutingAllocation allocation) { if (allocation.routingNodes().size() == 0) { /* with no nodes this is pointless */ return; } final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); balancer.allocateUnassigned(); balancer.moveShards(); balancer.balance(); } //......}复制代码
- BalancedShardsAllocator的allocate方法,则创建Balancer,然后执行Balancer的allocateUnassigned()、moveShards()、balance()方法
小结
- RoutingService的构造器要求输入clusterService及allocationService;其reroute方法主要是向clusterService提交ClusterStateUpdateTask,其execute方法是委托给allocationService.reroute
- AllocationService的reroute方法主要是构建RoutingAllocation,然后在进行gatewayAllocator.allocateUnassigned及shardsAllocator.allocate(allocation)
- BalancedShardsAllocator的allocate方法,则创建Balancer,然后执行Balancer的allocateUnassigned()、moveShards()、balance()方法