ElasticSearch源码解析之文档索引


es5.0版本主要改进应该是lucene版本升级到6.0,利用插件机制把netty从core中剥离出来接入NetworkModule,代码在Netty3Plugin.

public void onModule(NetworkModule networkModule) {
        if (networkModule.canRegisterHttpExtensions()) {
            networkModule.registerHttpTransport(NETTY_HTTP_TRANSPORT_NAME, Netty3HttpServerTransport.class);
        }
        networkModule.registerTransport(NETTY_TRANSPORT_NAME, Netty3Transport.class);
    }

client通过reset api来调用es的索引接口,RestIndexAction的handleRequest处理文档索引请求,调用TransportReplicationAction在ReroutePhase完成分片,参看OperationRouting的shardid算法.

static int generateShardId(IndexMetaData indexMetaData, String id, @Nullable String routing) {
        final int hash;
        if (routing == null) {
            hash = Murmur3HashFunction.hash(id);
        } else {
            hash = Murmur3HashFunction.hash(routing);
        }
        // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
        // of original index to hash documents
        return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
    }

找到数据主分片,本地处理或是通过netty转发到所在节点处理

if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
                performLocalAction(state, primary, node);
            } else {
                performRemoteAction(state, primary, node);
            }

TransportReplicationAction在AsyncPrimaryAction的onResponse完成消息处理,通过调用ReplicationOperation的execute方法,完成索引在primary shard和replica shard的处理,参看

public void execute() throws Exception {
        final String writeConsistencyFailure = checkWriteConsistency ? checkWriteConsistency() : null;
        final ShardRouting primaryRouting = primary.routingEntry();
        final ShardId primaryId = primaryRouting.shardId();
        if (writeConsistencyFailure != null) {
            finishAsFailed(new UnavailableShardsException(primaryId,
                "{} Timeout: [{}], request: [{}]", writeConsistencyFailure, request.timeout(), request));
            return;
        }

        totalShards.incrementAndGet();
        pendingShards.incrementAndGet();
        primaryResult = primary.perform(request);
        final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
        assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term";
        if (logger.isTraceEnabled()) {
            logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
        }

        performOnReplicas(primaryId, replicaRequest);

        successfulShards.incrementAndGet();
        decPendingAndFinishIfNeeded();
    }

在主shard的处理部分调用TransportWriteAction的shardOperationOnPrimary,进而调用TransportIndexAction的executeIndexRequestOnPrimary

public static WriteResult<IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard,
            MappingUpdatedAction mappingUpdatedAction) throws Exception {
        Engine.Index operation = prepareIndexOperationOnPrimary(request, indexShard);
        Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
        final ShardId shardId = indexShard.shardId();
        if (update != null) {
            mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
            operation = prepareIndexOperationOnPrimary(request, indexShard);
            update = operation.parsedDoc().dynamicMappingsUpdate();
            if (update != null) {
                throw new ReplicationOperation.RetryOnPrimaryException(shardId,
                    "Dynamic mappings are not available on the node that holds the primary yet");
            }
        }
        final boolean created = indexShard.index(operation);

        // update the version on request so it will happen on the replicas
        final long version = operation.version();
        request.version(version);
        request.versionType(request.versionType().versionTypeForReplicationAndRecovery());

        assert request.versionType().validateVersionForWrites(request.version());

        IndexResponse response = new IndexResponse(shardId, request.type(), request.id(), request.version(), created);
        return new WriteResult<>(response, operation.getTranslogLocation());
    }

IndexShard的index方法通过调用InternalEngine的innerIndex完成写lucene索引操作,并将index operation写入transaction log,防止flush前断电导致索引数据丢失

private boolean innerIndex(Index index) throws IOException {
        try (Releasable ignored = acquireLock(index.uid())) {
            lastWriteNanos = index.startTime();
            final long currentVersion;
            final boolean deleted;
            final VersionValue versionValue = versionMap.getUnderLock(index.uid());
            if (versionValue == null) {
                currentVersion = loadCurrentVersionFromIndex(index.uid());
                deleted = currentVersion == Versions.NOT_FOUND;
            } else {
                currentVersion = checkDeletedAndGCed(versionValue);
                deleted = versionValue.delete();
            }

            final long expectedVersion = index.version();
            if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) return false;

            final long updatedVersion = updateVersion(index, currentVersion, expectedVersion);

            final boolean created = indexOrUpdate(index, currentVersion, versionValue);

            maybeAddToTranslog(index, updatedVersion, Translog.Index::new, NEW_VERSION_VALUE);

            return created;
        }
    }

注意maybeAddToTranslog并未完成translog的sync操作,而是在TransportWriteAction的postWriteActions部分

 boolean fsyncTranslog = indexShard.getTranslogDurability() == Translog.Durability.REQUEST && location != null && indexShard.getTranslog().sizeInBytes()>=indexShard.indexSettings().getFlushThresholdSize().getBytes();
        if (fsyncTranslog) {
            indexShard.sync(location);
        }

此处做了一些定制化改造,对应并发客户段不使用bulk批量操作,而采用单条小索引请求.此处不开启Translog.Durability.ASYNC异步提交日志模式,而是根据translog中的未提交到lucene中的字节数(上一次flush到现在缓存的数据)来判断是否需要做日志同步