博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊elasticsearch的RoutingService
阅读量:6293 次
发布时间:2019-06-22

本文共 6581 字,大约阅读时间需要 21 分钟。

本文主要研究一下elasticsearch的RoutingService

RoutingService

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java

public class RoutingService extends AbstractLifecycleComponent {    private static final Logger logger = LogManager.getLogger(RoutingService.class);    private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute";    private final ClusterService clusterService;    private final AllocationService allocationService;    private AtomicBoolean rerouting = new AtomicBoolean();    @Inject    public RoutingService(ClusterService clusterService, AllocationService allocationService) {        this.clusterService = clusterService;        this.allocationService = allocationService;    }    @Override    protected void doStart() {    }    @Override    protected void doStop() {    }    @Override    protected void doClose() {    }    /**     * Initiates a reroute.     */    public final void reroute(String reason) {        try {            if (lifecycle.stopped()) {                return;            }            if (rerouting.compareAndSet(false, true) == false) {                logger.trace("already has pending reroute, ignoring {}", reason);                return;            }            logger.trace("rerouting {}", reason);            clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")",                new ClusterStateUpdateTask(Priority.HIGH) {                    @Override                    public ClusterState execute(ClusterState currentState) {                        rerouting.set(false);                        return allocationService.reroute(currentState, reason);                    }                    @Override                    public void onNoLongerMaster(String source) {                        rerouting.set(false);                        // no biggie                    }                    @Override                    public void onFailure(String source, Exception e) {                        rerouting.set(false);                        ClusterState state = clusterService.state();                        if (logger.isTraceEnabled()) {                            logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state:\n{}",                                source, state), e);                        } else {                            logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]",                                source, state.version()), e);                        }                    }                });        } catch (Exception e) {            rerouting.set(false);            ClusterState state = clusterService.state();            logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e);        }    }}复制代码
  • RoutingService的构造器要求输入clusterService及allocationService;其reroute方法主要是向clusterService提交ClusterStateUpdateTask,其execute方法是委托给allocationService.reroute

AllocationService.reroute

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

public class AllocationService {	//......    /**     * Reroutes the routing table based on the live nodes.     * 

* If the same instance of ClusterState is returned, then no change has been made. */ public ClusterState reroute(ClusterState clusterState, String reason) { return reroute(clusterState, reason, false); } /** * Reroutes the routing table based on the live nodes. *

* If the same instance of ClusterState is returned, then no change has been made. */ protected ClusterState reroute(ClusterState clusterState, String reason, boolean debug) { ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState); RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState); // shuffle the unassigned nodes, just so we won't have things like poison failed shards routingNodes.unassigned().shuffle(); RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState, clusterInfoService.getClusterInfo(), currentNanoTime()); allocation.debugDecision(debug); reroute(allocation); if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) { return clusterState; } return buildResultAndLogHealthChange(clusterState, allocation, reason); } private void reroute(RoutingAllocation allocation) { assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes"; assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() : "auto-expand replicas out of sync with number of nodes in the cluster"; // now allocate all the unassigned to available nodes if (allocation.routingNodes().unassigned().size() > 0) { removeDelayMarkers(allocation); gatewayAllocator.allocateUnassigned(allocation); } shardsAllocator.allocate(allocation); assert RoutingNodes.assertShardStats(allocation.routingNodes()); } //......}复制代码

  • AllocationService的reroute方法主要是构建RoutingAllocation,然后在进行gatewayAllocator.allocateUnassigned及shardsAllocator.allocate(allocation)

BalancedShardsAllocator.allocate

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

public class BalancedShardsAllocator implements ShardsAllocator {	//......    public void allocate(RoutingAllocation allocation) {        if (allocation.routingNodes().size() == 0) {            /* with no nodes this is pointless */            return;        }        final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);        balancer.allocateUnassigned();        balancer.moveShards();        balancer.balance();    }	//......}复制代码
  • BalancedShardsAllocator的allocate方法,则创建Balancer,然后执行Balancer的allocateUnassigned()、moveShards()、balance()方法

小结

  • RoutingService的构造器要求输入clusterService及allocationService;其reroute方法主要是向clusterService提交ClusterStateUpdateTask,其execute方法是委托给allocationService.reroute
  • AllocationService的reroute方法主要是构建RoutingAllocation,然后在进行gatewayAllocator.allocateUnassigned及shardsAllocator.allocate(allocation)
  • BalancedShardsAllocator的allocate方法,则创建Balancer,然后执行Balancer的allocateUnassigned()、moveShards()、balance()方法

doc

转载于:https://juejin.im/post/5cdabbd3e51d453af7192b94

你可能感兴趣的文章
一线架构师实践指南:云时代下双活零切换的七大关键点
查看>>
ART世界探险(19) - 优化编译器的编译流程
查看>>
玩转Edas应用部署
查看>>
music-音符与常用记号
查看>>
sql操作命令
查看>>
zip 数据压缩
查看>>
Python爬虫学习系列教程
查看>>
【数据库优化专题】MySQL视图优化(二)
查看>>
【转载】每个程序员都应该学习使用Python或Ruby
查看>>
PHP高级编程之守护进程,实现优雅重启
查看>>
PHP字符编码转换类3
查看>>
rsync同步服务配置手记
查看>>
http缓存知识
查看>>
Go 时间交并集小工具
查看>>
iOS 多线程总结
查看>>
webpack是如何实现前端模块化的
查看>>
TCP的三次握手四次挥手
查看>>
关于redis的几件小事(六)redis的持久化
查看>>
webpack4+babel7+eslint+editorconfig+react-hot-loader 搭建react开发环境
查看>>
Maven 插件
查看>>