Edit File: channel.js
"use strict"; /* * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ Object.defineProperty(exports, "__esModule", { value: true }); exports.ChannelImplementation = void 0; const call_stream_1 = require("./call-stream"); const channel_credentials_1 = require("./channel-credentials"); const resolving_load_balancer_1 = require("./resolving-load-balancer"); const subchannel_pool_1 = require("./subchannel-pool"); const picker_1 = require("./picker"); const constants_1 = require("./constants"); const filter_stack_1 = require("./filter-stack"); const call_credentials_filter_1 = require("./call-credentials-filter"); const deadline_filter_1 = require("./deadline-filter"); const compression_filter_1 = require("./compression-filter"); const resolver_1 = require("./resolver"); const logging_1 = require("./logging"); const max_message_size_filter_1 = require("./max-message-size-filter"); const http_proxy_1 = require("./http_proxy"); const uri_parser_1 = require("./uri-parser"); const connectivity_state_1 = require("./connectivity-state"); const channelz_1 = require("./channelz"); /** * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args */ const MAX_TIMEOUT_TIME = 2147483647; let nextCallNumber = 0; function getNewCallNumber() { const callNumber = nextCallNumber; nextCallNumber += 1; if (nextCallNumber >= Number.MAX_SAFE_INTEGER) { nextCallNumber = 0; } return callNumber; } class ChannelImplementation { constructor(target, credentials, options) { var _a, _b, _c, _d; this.credentials = credentials; this.options = options; this.connectivityState = connectivity_state_1.ConnectivityState.IDLE; this.currentPicker = new picker_1.UnavailablePicker(); /** * Calls queued up to get a call config. Should only be populated before the * first time the resolver returns a result, which includes the ConfigSelector. */ this.configSelectionQueue = []; this.pickQueue = []; this.connectivityStateWatchers = []; this.configSelector = null; /** * This is the error from the name resolver if it failed most recently. It * is only used to end calls that start while there is no config selector * and the name resolver is in backoff, so it should be nulled if * configSelector becomes set or the channel state becomes anything other * than TRANSIENT_FAILURE. */ this.currentResolutionError = null; // Channelz info this.channelzEnabled = true; this.callTracker = new channelz_1.ChannelzCallTracker(); this.childrenTracker = new channelz_1.ChannelzChildrenTracker(); if (typeof target !== 'string') { throw new TypeError('Channel target must be a string'); } if (!(credentials instanceof channel_credentials_1.ChannelCredentials)) { throw new TypeError('Channel credentials must be a ChannelCredentials object'); } if (options) { if (typeof options !== 'object') { throw new TypeError('Channel options must be an object'); } } this.originalTarget = target; const originalTargetUri = uri_parser_1.parseUri(target); if (originalTargetUri === null) { throw new Error(`Could not parse target name "${target}"`); } /* This ensures that the target has a scheme that is registered with the * resolver */ const defaultSchemeMapResult = resolver_1.mapUriDefaultScheme(originalTargetUri); if (defaultSchemeMapResult === null) { throw new Error(`Could not find a default scheme for target name "${target}"`); } this.callRefTimer = setInterval(() => { }, MAX_TIMEOUT_TIME); (_b = (_a = this.callRefTimer).unref) === null || _b === void 0 ? void 0 : _b.call(_a); if (this.options['grpc.enable_channelz'] === 0) { this.channelzEnabled = false; } this.channelzTrace = new channelz_1.ChannelzTrace(); this.channelzRef = channelz_1.registerChannelzChannel(target, () => this.getChannelzInfo(), this.channelzEnabled); if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_INFO', 'Channel created'); } if (this.options['grpc.default_authority']) { this.defaultAuthority = this.options['grpc.default_authority']; } else { this.defaultAuthority = resolver_1.getDefaultAuthority(defaultSchemeMapResult); } const proxyMapResult = http_proxy_1.mapProxyName(defaultSchemeMapResult, options); this.target = proxyMapResult.target; this.options = Object.assign({}, this.options, proxyMapResult.extraOptions); /* The global boolean parameter to getSubchannelPool has the inverse meaning to what * the grpc.use_local_subchannel_pool channel option means. */ this.subchannelPool = subchannel_pool_1.getSubchannelPool(((_c = options['grpc.use_local_subchannel_pool']) !== null && _c !== void 0 ? _c : 0) === 0); const channelControlHelper = { createSubchannel: (subchannelAddress, subchannelArgs) => { const subchannel = this.subchannelPool.getOrCreateSubchannel(this.target, subchannelAddress, Object.assign({}, this.options, subchannelArgs), this.credentials); if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef()); } return subchannel; }, updateState: (connectivityState, picker) => { this.currentPicker = picker; const queueCopy = this.pickQueue.slice(); this.pickQueue = []; this.callRefTimerUnref(); for (const { callStream, callMetadata, callConfig, dynamicFilters } of queueCopy) { this.tryPick(callStream, callMetadata, callConfig, dynamicFilters); } this.updateState(connectivityState); }, requestReresolution: () => { // This should never be called. throw new Error('Resolving load balancer should never call requestReresolution'); }, addChannelzChild: (child) => { if (this.channelzEnabled) { this.childrenTracker.refChild(child); } }, removeChannelzChild: (child) => { if (this.channelzEnabled) { this.childrenTracker.unrefChild(child); } } }; this.resolvingLoadBalancer = new resolving_load_balancer_1.ResolvingLoadBalancer(this.target, channelControlHelper, options, (configSelector) => { if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded'); } this.configSelector = configSelector; this.currentResolutionError = null; /* We process the queue asynchronously to ensure that the corresponding * load balancer update has completed. */ process.nextTick(() => { const localQueue = this.configSelectionQueue; this.configSelectionQueue = []; this.callRefTimerUnref(); for (const { callStream, callMetadata } of localQueue) { this.tryGetConfig(callStream, callMetadata); } this.configSelectionQueue = []; }); }, (status) => { if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"'); } if (this.configSelectionQueue.length > 0) { this.trace('Name resolution failed with calls queued for config selection'); } if (this.configSelector === null) { this.currentResolutionError = status; } const localQueue = this.configSelectionQueue; this.configSelectionQueue = []; this.callRefTimerUnref(); for (const { callStream, callMetadata } of localQueue) { if (callMetadata.getOptions().waitForReady) { this.callRefTimerRef(); this.configSelectionQueue.push({ callStream, callMetadata }); } else { callStream.cancelWithStatus(status.code, status.details); } } }); this.filterStackFactory = new filter_stack_1.FilterStackFactory([ new call_credentials_filter_1.CallCredentialsFilterFactory(this), new deadline_filter_1.DeadlineFilterFactory(this), new max_message_size_filter_1.MaxMessageSizeFilterFactory(this.options), new compression_filter_1.CompressionFilterFactory(this, this.options), ]); this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2)); const error = new Error(); logging_1.trace(constants_1.LogVerbosity.DEBUG, 'channel_stacktrace', '(' + this.channelzRef.id + ') ' + 'Channel constructed \n' + ((_d = error.stack) === null || _d === void 0 ? void 0 : _d.substring(error.stack.indexOf('\n') + 1))); } getChannelzInfo() { return { target: this.originalTarget, state: this.connectivityState, trace: this.channelzTrace, callTracker: this.callTracker, children: this.childrenTracker.getChildLists() }; } trace(text, verbosityOverride) { logging_1.trace(verbosityOverride !== null && verbosityOverride !== void 0 ? verbosityOverride : constants_1.LogVerbosity.DEBUG, 'channel', '(' + this.channelzRef.id + ') ' + uri_parser_1.uriToString(this.target) + ' ' + text); } callRefTimerRef() { var _a, _b, _c, _d; // If the hasRef function does not exist, always run the code if (!((_b = (_a = this.callRefTimer).hasRef) === null || _b === void 0 ? void 0 : _b.call(_a))) { this.trace('callRefTimer.ref | configSelectionQueue.length=' + this.configSelectionQueue.length + ' pickQueue.length=' + this.pickQueue.length); (_d = (_c = this.callRefTimer).ref) === null || _d === void 0 ? void 0 : _d.call(_c); } } callRefTimerUnref() { var _a, _b; // If the hasRef function does not exist, always run the code if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) { this.trace('callRefTimer.unref | configSelectionQueue.length=' + this.configSelectionQueue.length + ' pickQueue.length=' + this.pickQueue.length); (_b = (_a = this.callRefTimer).unref) === null || _b === void 0 ? void 0 : _b.call(_a); } } pushPick(callStream, callMetadata, callConfig, dynamicFilters) { this.pickQueue.push({ callStream, callMetadata, callConfig, dynamicFilters }); this.callRefTimerRef(); } /** * Check the picker output for the given call and corresponding metadata, * and take any relevant actions. Should not be called while iterating * over pickQueue. * @param callStream * @param callMetadata */ tryPick(callStream, callMetadata, callConfig, dynamicFilters) { var _a, _b; const pickResult = this.currentPicker.pick({ metadata: callMetadata, extraPickInfo: callConfig.pickInformation, }); const subchannelString = pickResult.subchannel ? '(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() : '' + pickResult.subchannel; this.trace('Pick result for call [' + callStream.getCallNumber() + ']: ' + picker_1.PickResultType[pickResult.pickResultType] + ' subchannel: ' + subchannelString + ' status: ' + ((_a = pickResult.status) === null || _a === void 0 ? void 0 : _a.code) + ' ' + ((_b = pickResult.status) === null || _b === void 0 ? void 0 : _b.details)); switch (pickResult.pickResultType) { case picker_1.PickResultType.COMPLETE: if (pickResult.subchannel === null) { callStream.cancelWithStatus(constants_1.Status.UNAVAILABLE, 'Request dropped by load balancing policy'); // End the call with an error } else { /* If the subchannel is not in the READY state, that indicates a bug * somewhere in the load balancer or picker. So, we log an error and * queue the pick to be tried again later. */ if (pickResult.subchannel.getConnectivityState() !== connectivity_state_1.ConnectivityState.READY) { logging_1.log(constants_1.LogVerbosity.ERROR, 'Error: COMPLETE pick result subchannel ' + subchannelString + ' has state ' + connectivity_state_1.ConnectivityState[pickResult.subchannel.getConnectivityState()]); this.pushPick(callStream, callMetadata, callConfig, dynamicFilters); break; } /* We need to clone the callMetadata here because the transparent * retry code in the promise resolution handler use the same * callMetadata object, so it needs to stay unmodified */ callStream.filterStack .sendMetadata(Promise.resolve(callMetadata.clone())) .then((finalMetadata) => { var _a, _b, _c; const subchannelState = pickResult.subchannel.getConnectivityState(); if (subchannelState === connectivity_state_1.ConnectivityState.READY) { try { const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream)); (_a = pickResult.subchannel) === null || _a === void 0 ? void 0 : _a.getRealSubchannel().startCallStream(finalMetadata, callStream, [...dynamicFilters, ...pickExtraFilters]); /* If we reach this point, the call stream has started * successfully */ (_b = callConfig.onCommitted) === null || _b === void 0 ? void 0 : _b.call(callConfig); (_c = pickResult.onCallStarted) === null || _c === void 0 ? void 0 : _c.call(pickResult); } catch (error) { const errorCode = error.code; if (errorCode === 'ERR_HTTP2_GOAWAY_SESSION' || errorCode === 'ERR_HTTP2_INVALID_SESSION') { /* An error here indicates that something went wrong with * the picked subchannel's http2 stream right before we * tried to start the stream. We are handling a promise * result here, so this is asynchronous with respect to the * original tryPick call, so calling it again is not * recursive. We call tryPick immediately instead of * queueing this pick again because handling the queue is * triggered by state changes, and we want to immediately * check if the state has already changed since the * previous tryPick call. We do this instead of cancelling * the stream because the correct behavior may be * re-queueing instead, based on the logic in the rest of * tryPick */ this.trace('Failed to start call on picked subchannel ' + subchannelString + ' with error ' + error.message + '. Retrying pick', constants_1.LogVerbosity.INFO); this.tryPick(callStream, callMetadata, callConfig, dynamicFilters); } else { this.trace('Failed to start call on picked subchanel ' + subchannelString + ' with error ' + error.message + '. Ending call', constants_1.LogVerbosity.INFO); callStream.cancelWithStatus(constants_1.Status.INTERNAL, `Failed to start HTTP/2 stream with error: ${error.message}`); } } } else { /* The logic for doing this here is the same as in the catch * block above */ this.trace('Picked subchannel ' + subchannelString + ' has state ' + connectivity_state_1.ConnectivityState[subchannelState] + ' after metadata filters. Retrying pick', constants_1.LogVerbosity.INFO); this.tryPick(callStream, callMetadata, callConfig, dynamicFilters); } }, (error) => { // We assume the error code isn't 0 (Status.OK) callStream.cancelWithStatus(typeof error.code === 'number' ? error.code : constants_1.Status.UNKNOWN, `Getting metadata from plugin failed with error: ${error.message}`); }); } break; case picker_1.PickResultType.QUEUE: this.pushPick(callStream, callMetadata, callConfig, dynamicFilters); break; case picker_1.PickResultType.TRANSIENT_FAILURE: if (callMetadata.getOptions().waitForReady) { this.pushPick(callStream, callMetadata, callConfig, dynamicFilters); } else { callStream.cancelWithStatus(pickResult.status.code, pickResult.status.details); } break; case picker_1.PickResultType.DROP: callStream.cancelWithStatus(pickResult.status.code, pickResult.status.details); break; default: throw new Error(`Invalid state: unknown pickResultType ${pickResult.pickResultType}`); } } removeConnectivityStateWatcher(watcherObject) { const watcherIndex = this.connectivityStateWatchers.findIndex((value) => value === watcherObject); if (watcherIndex >= 0) { this.connectivityStateWatchers.splice(watcherIndex, 1); } } updateState(newState) { logging_1.trace(constants_1.LogVerbosity.DEBUG, 'connectivity_state', '(' + this.channelzRef.id + ') ' + uri_parser_1.uriToString(this.target) + ' ' + connectivity_state_1.ConnectivityState[this.connectivityState] + ' -> ' + connectivity_state_1.ConnectivityState[newState]); if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_INFO', connectivity_state_1.ConnectivityState[this.connectivityState] + ' -> ' + connectivity_state_1.ConnectivityState[newState]); } this.connectivityState = newState; const watchersCopy = this.connectivityStateWatchers.slice(); for (const watcherObject of watchersCopy) { if (newState !== watcherObject.currentState) { if (watcherObject.timer) { clearTimeout(watcherObject.timer); } this.removeConnectivityStateWatcher(watcherObject); watcherObject.callback(); } } if (newState !== connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE) { this.currentResolutionError = null; } } tryGetConfig(stream, metadata) { if (stream.getStatus() !== null) { /* If the stream has a status, it has already finished and we don't need * to take any more actions on it. */ return; } if (this.configSelector === null) { /* This branch will only be taken at the beginning of the channel's life, * before the resolver ever returns a result. So, the * ResolvingLoadBalancer may be idle and if so it needs to be kicked * because it now has a pending request. */ this.resolvingLoadBalancer.exitIdle(); if (this.currentResolutionError && !metadata.getOptions().waitForReady) { stream.cancelWithStatus(this.currentResolutionError.code, this.currentResolutionError.details); } else { this.configSelectionQueue.push({ callStream: stream, callMetadata: metadata, }); this.callRefTimerRef(); } } else { const callConfig = this.configSelector(stream.getMethod(), metadata); if (callConfig.status === constants_1.Status.OK) { if (callConfig.methodConfig.timeout) { const deadline = new Date(); deadline.setSeconds(deadline.getSeconds() + callConfig.methodConfig.timeout.seconds); deadline.setMilliseconds(deadline.getMilliseconds() + callConfig.methodConfig.timeout.nanos / 1000000); stream.setConfigDeadline(deadline); // Refreshing the filters makes the deadline filter pick up the new deadline stream.filterStack.refresh(); } if (callConfig.dynamicFilterFactories.length > 0) { /* These dynamicFilters are the mechanism for implementing gRFC A39: * https://github.com/grpc/proposal/blob/master/A39-xds-http-filters.md * We run them here instead of with the rest of the filters because * that spec says "the xDS HTTP filters will run in between name * resolution and load balancing". * * We use the filter stack here to simplify the multi-filter async * waterfall logic, but we pass along the underlying list of filters * to avoid having nested filter stacks when combining it with the * original filter stack. We do not pass along the original filter * factory list because these filters may need to persist data * between sending headers and other operations. */ const dynamicFilterStackFactory = new filter_stack_1.FilterStackFactory(callConfig.dynamicFilterFactories); const dynamicFilterStack = dynamicFilterStackFactory.createFilter(stream); dynamicFilterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => { this.tryPick(stream, filteredMetadata, callConfig, dynamicFilterStack.getFilters()); }); } else { this.tryPick(stream, metadata, callConfig, []); } } else { stream.cancelWithStatus(callConfig.status, 'Failed to route call to method ' + stream.getMethod()); } } } _startCallStream(stream, metadata) { this.tryGetConfig(stream, metadata.clone()); } close() { this.resolvingLoadBalancer.destroy(); this.updateState(connectivity_state_1.ConnectivityState.SHUTDOWN); clearInterval(this.callRefTimer); if (this.channelzEnabled) { channelz_1.unregisterChannelzRef(this.channelzRef); } this.subchannelPool.unrefUnusedSubchannels(); } getTarget() { return uri_parser_1.uriToString(this.target); } getConnectivityState(tryToConnect) { const connectivityState = this.connectivityState; if (tryToConnect) { this.resolvingLoadBalancer.exitIdle(); } return connectivityState; } watchConnectivityState(currentState, deadline, callback) { if (this.connectivityState === connectivity_state_1.ConnectivityState.SHUTDOWN) { throw new Error('Channel has been shut down'); } let timer = null; if (deadline !== Infinity) { const deadlineDate = deadline instanceof Date ? deadline : new Date(deadline); const now = new Date(); if (deadline === -Infinity || deadlineDate <= now) { process.nextTick(callback, new Error('Deadline passed without connectivity state change')); return; } timer = setTimeout(() => { this.removeConnectivityStateWatcher(watcherObject); callback(new Error('Deadline passed without connectivity state change')); }, deadlineDate.getTime() - now.getTime()); } const watcherObject = { currentState, callback, timer, }; this.connectivityStateWatchers.push(watcherObject); } /** * Get the channelz reference object for this channel. The returned value is * garbage if channelz is disabled for this channel. * @returns */ getChannelzRef() { return this.channelzRef; } createCall(method, deadline, host, parentCall, propagateFlags) { if (typeof method !== 'string') { throw new TypeError('Channel#createCall: method must be a string'); } if (!(typeof deadline === 'number' || deadline instanceof Date)) { throw new TypeError('Channel#createCall: deadline must be a number or Date'); } if (this.connectivityState === connectivity_state_1.ConnectivityState.SHUTDOWN) { throw new Error('Channel has been shut down'); } const callNumber = getNewCallNumber(); this.trace('createCall [' + callNumber + '] method="' + method + '", deadline=' + deadline); const finalOptions = { deadline: deadline, flags: propagateFlags !== null && propagateFlags !== void 0 ? propagateFlags : constants_1.Propagate.DEFAULTS, host: host !== null && host !== void 0 ? host : this.defaultAuthority, parentCall: parentCall, }; const stream = new call_stream_1.Http2CallStream(method, this, finalOptions, this.filterStackFactory, this.credentials._getCallCredentials(), callNumber); if (this.channelzEnabled) { this.callTracker.addCallStarted(); stream.addStatusWatcher(status => { if (status.code === constants_1.Status.OK) { this.callTracker.addCallSucceeded(); } else { this.callTracker.addCallFailed(); } }); } return stream; } } exports.ChannelImplementation = ChannelImplementation; //# sourceMappingURL=channel.js.map
Back to File Manager