-
-
Notifications
You must be signed in to change notification settings - Fork 35.9k
diagnostics_channel: add opt-in subscriber suppression via bypassBy and bypassed() #63651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 23 commits
6a9412f
7b20b8a
a786501
e4aea85
a5c4940
7e415c0
8e6d7cb
07e7e97
9468795
b8194af
66e0cc9
01d9cc0
f9f428a
ddf8179
909f0fc
e630f72
cd5c009
3fdcd8e
716a4eb
baa6ee1
5bc0fb8
1e854d2
f3fc5cf
993bc0d
24a87f4
614bc7e
fcef870
ec9316b
64ac224
805045d
365ab7b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,7 +2,6 @@ | |
|
|
||
| const { | ||
| ArrayPrototypeAt, | ||
| ArrayPrototypeIndexOf, | ||
| ArrayPrototypePush, | ||
| ArrayPrototypePushApply, | ||
| ArrayPrototypeSlice, | ||
|
|
@@ -15,6 +14,7 @@ const { | |
| ReflectApply, | ||
| SafeFinalizationRegistry, | ||
| SafeMap, | ||
| SafeSet, | ||
| SymbolDispose, | ||
| SymbolHasInstance, | ||
| } = primordials; | ||
|
|
@@ -36,6 +36,33 @@ const { subscribers: subscriberCounts } = dc_binding; | |
| const { WeakReference } = require('internal/util'); | ||
| const { isPromise } = require('internal/util/types'); | ||
|
|
||
| let suppressionStorage; | ||
|
|
||
| function getSuppressionsStorage() { | ||
| if (suppressionStorage === undefined) { | ||
| const { AsyncLocalStorage } = require('async_hooks'); | ||
| suppressionStorage = new AsyncLocalStorage(); | ||
| } | ||
| return suppressionStorage; | ||
| } | ||
|
|
||
| function withSuppressionsContext(set, fn, thisArg, args) { | ||
| return getSuppressionsStorage().run( | ||
| set, | ||
| () => ReflectApply(fn, thisArg, args), | ||
| ); | ||
| } | ||
|
|
||
| function validateBypassKey(value, name) { | ||
| if (value == null) { | ||
| throw new ERR_INVALID_ARG_TYPE(name, ['object', 'symbol'], value); | ||
| } | ||
| const type = typeof value; | ||
| if (type !== 'object' && type !== 'symbol') { | ||
| throw new ERR_INVALID_ARG_TYPE(name, ['object', 'symbol'], value); | ||
| } | ||
| } | ||
|
|
||
| // Can't delete when weakref count reaches 0 as it could increment again. | ||
| // Only GC can be used as a valid time to clean up the channels map. | ||
| class WeakRefMap extends SafeMap { | ||
|
|
@@ -71,16 +98,23 @@ function markActive(channel) { | |
| // eslint-disable-next-line no-use-before-define | ||
| ObjectSetPrototypeOf(channel, ActiveChannel.prototype); | ||
| channel._subscribers = []; | ||
| channel._bypassSubscribers = null; | ||
| channel._stores = new SafeMap(); | ||
| channel._bypassStores = null; | ||
| } | ||
|
|
||
| function maybeMarkInactive(channel) { | ||
| // When there are no more active subscribers or bound, restore to fast prototype. | ||
| if (!channel._subscribers.length && !channel._stores.size) { | ||
| if (!channel._subscribers.length && | ||
| !channel._stores.size && | ||
| !channel._bypassSubscribers?.length && | ||
| !channel._bypassStores?.size) { | ||
| // eslint-disable-next-line no-use-before-define | ||
| ObjectSetPrototypeOf(channel, Channel.prototype); | ||
| channel._subscribers = undefined; | ||
| channel._stores = undefined; | ||
| channel._bypassSubscribers = undefined; | ||
| channel._bypassStores = undefined; | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -91,12 +125,11 @@ class RunStoresScope { | |
| // eslint-disable-next-line no-restricted-globals | ||
| using stack = new DisposableStack(); | ||
|
|
||
| // Enter stores using withScope | ||
| // Normal stores - exactly as before, zero extra cost | ||
| if (activeChannel._stores) { | ||
| for (const entry of activeChannel._stores.entries()) { | ||
| const store = entry[0]; | ||
| const transform = entry[1]; | ||
|
|
||
| let newContext = data; | ||
| if (transform) { | ||
| try { | ||
|
|
@@ -108,7 +141,28 @@ class RunStoresScope { | |
| continue; | ||
| } | ||
| } | ||
| stack.use(store.withScope(newContext)); | ||
| } | ||
| } | ||
|
|
||
| // Bypass stores - only entered if bypass stores exist | ||
| if (activeChannel._bypassStores) { | ||
| const activeKeys = getSuppressionsStorage().getStore(); | ||
| for (const entry of activeChannel._bypassStores.entries()) { | ||
| const store = entry[0]; | ||
| const { transform, subscriberId } = entry[1]; | ||
| if (activeKeys?.has(subscriberId)) continue; | ||
| let newContext = data; | ||
| if (transform) { | ||
| try { | ||
| newContext = transform(data); | ||
| } catch (err) { | ||
| process.nextTick(() => { | ||
| triggerUncaughtException(err, false); | ||
| }); | ||
| continue; | ||
| } | ||
| } | ||
| stack.use(store.withScope(newContext)); | ||
| } | ||
| } | ||
|
|
@@ -127,69 +181,158 @@ class RunStoresScope { | |
|
|
||
| // TODO(qard): should there be a C++ channel interface? | ||
| class ActiveChannel { | ||
| subscribe(subscription) { | ||
| subscribe(subscription, options = {}) { | ||
| validateFunction(subscription, 'subscription'); | ||
| this._subscribers = ArrayPrototypeSlice(this._subscribers); | ||
| ArrayPrototypePush(this._subscribers, subscription); | ||
| const subscriberId = options?.subscriberId; | ||
|
|
||
| if (subscriberId !== undefined) { | ||
| validateBypassKey(subscriberId, 'subscriberId'); | ||
| // Bypass path - lazy separate array | ||
| if (this._bypassSubscribers === null) { | ||
| this._bypassSubscribers = []; | ||
| } | ||
| this._bypassSubscribers = ArrayPrototypeSlice(this._bypassSubscribers); | ||
| ArrayPrototypePush(this._bypassSubscribers, { handler: subscription, subscriberId }); | ||
| } else { | ||
| // Normal path - plain function, zero extra cost | ||
| this._subscribers = ArrayPrototypeSlice(this._subscribers); | ||
| ArrayPrototypePush(this._subscribers, subscription); | ||
| } | ||
|
|
||
| channels.incRef(this.name); | ||
| if (this._index !== undefined) subscriberCounts[this._index]++; | ||
| } | ||
|
|
||
| unsubscribe(subscription) { | ||
| const index = ArrayPrototypeIndexOf(this._subscribers, subscription); | ||
| if (index === -1) return false; | ||
| // Check normal subscribers first | ||
| let index = -1; | ||
| for (let i = 0; i < this._subscribers.length; i++) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the reason for moving from
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oopsie , i think i just need to address the bypass subscribers since they only contains that id and handler ok on it |
||
| if (this._subscribers[i] === subscription) { | ||
| index = i; | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| const before = ArrayPrototypeSlice(this._subscribers, 0, index); | ||
| const after = ArrayPrototypeSlice(this._subscribers, index + 1); | ||
| this._subscribers = before; | ||
| ArrayPrototypePushApply(this._subscribers, after); | ||
| if (index !== -1) { | ||
| const before = ArrayPrototypeSlice(this._subscribers, 0, index); | ||
| const after = ArrayPrototypeSlice(this._subscribers, index + 1); | ||
| this._subscribers = before; | ||
| ArrayPrototypePushApply(this._subscribers, after); | ||
| channels.decRef(this.name); | ||
| if (this._index !== undefined) subscriberCounts[this._index]--; | ||
| maybeMarkInactive(this); | ||
| return true; | ||
| } | ||
|
|
||
| channels.decRef(this.name); | ||
| if (this._index !== undefined) subscriberCounts[this._index]--; | ||
| maybeMarkInactive(this); | ||
| // Check bypass subscribers | ||
| if (this._bypassSubscribers !== null) { | ||
| let bypassIndex = -1; | ||
| for (let i = 0; i < this._bypassSubscribers.length; i++) { | ||
| if (this._bypassSubscribers[i].handler === subscription) { | ||
| bypassIndex = i; | ||
| break; | ||
| } | ||
| } | ||
| if (bypassIndex !== -1) { | ||
| const before = ArrayPrototypeSlice(this._bypassSubscribers, 0, bypassIndex); | ||
| const after = ArrayPrototypeSlice(this._bypassSubscribers, bypassIndex + 1); | ||
| this._bypassSubscribers = before; | ||
| ArrayPrototypePushApply(this._bypassSubscribers, after); | ||
| if (this._bypassSubscribers.length === 0) { | ||
| this._bypassSubscribers = null; | ||
| } | ||
| channels.decRef(this.name); | ||
| if (this._index !== undefined) subscriberCounts[this._index]--; | ||
| maybeMarkInactive(this); | ||
|
Flarna marked this conversation as resolved.
Outdated
|
||
| return true; | ||
| } | ||
| } | ||
|
|
||
| return true; | ||
| return false; | ||
| } | ||
|
|
||
| bindStore(store, transform) { | ||
| const replacing = this._stores.has(store); | ||
| if (!replacing) { | ||
| channels.incRef(this.name); | ||
| if (this._index !== undefined) subscriberCounts[this._index]++; | ||
| bindStore(store, transform, options = {}) { | ||
| const subscriberId = options?.subscriberId; | ||
|
|
||
| if (subscriberId !== undefined) { | ||
| validateBypassKey(subscriberId, 'subscriberId'); | ||
| // Bypass path - lazy separate SafeMap | ||
| if (this._bypassStores === null) { | ||
| this._bypassStores = new SafeMap(); | ||
| } | ||
| const replacing = this._bypassStores.has(store); | ||
| if (!replacing) { | ||
| channels.incRef(this.name); | ||
| if (this._index !== undefined) subscriberCounts[this._index]++; | ||
| } | ||
| this._bypassStores.set(store, { transform, subscriberId }); | ||
| } else { | ||
| // Normal path - plain transform, zero extra cost | ||
| const replacing = this._stores.has(store); | ||
| if (!replacing) { | ||
| channels.incRef(this.name); | ||
| if (this._index !== undefined) subscriberCounts[this._index]++; | ||
| } | ||
|
Flarna marked this conversation as resolved.
Outdated
|
||
| this._stores.set(store, transform); | ||
| } | ||
| this._stores.set(store, transform); | ||
| } | ||
|
|
||
| unbindStore(store) { | ||
| if (!this._stores.has(store)) { | ||
| return false; | ||
| if (this._stores.has(store)) { | ||
| this._stores.delete(store); | ||
| channels.decRef(this.name); | ||
| if (this._index !== undefined) subscriberCounts[this._index]--; | ||
| maybeMarkInactive(this); | ||
| return true; | ||
| } | ||
|
|
||
| this._stores.delete(store); | ||
|
|
||
| channels.decRef(this.name); | ||
| if (this._index !== undefined) subscriberCounts[this._index]--; | ||
| maybeMarkInactive(this); | ||
| if (this._bypassStores?.has(store)) { | ||
| this._bypassStores.delete(store); | ||
| if (this._bypassStores.size === 0) { | ||
| this._bypassStores = null; | ||
| } | ||
| channels.decRef(this.name); | ||
| if (this._index !== undefined) subscriberCounts[this._index]--; | ||
| maybeMarkInactive(this); | ||
|
Flarna marked this conversation as resolved.
Outdated
|
||
| return true; | ||
| } | ||
|
|
||
| return true; | ||
| return false; | ||
| } | ||
|
|
||
| get hasSubscribers() { | ||
| return true; | ||
| } | ||
|
|
||
| publish(data) { | ||
| // Normal path - no ALS lookup, plain function call, zero overhead | ||
| const subscribers = this._subscribers; | ||
| for (let i = 0; i < (subscribers?.length || 0); i++) { | ||
| for (let i = 0; i < subscribers.length; i++) { | ||
| try { | ||
| const onMessage = subscribers[i]; | ||
| onMessage(data, this.name); | ||
| subscribers[i](data, this.name); | ||
| } catch (err) { | ||
| process.nextTick(() => { | ||
| triggerUncaughtException(err, false); | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| // Bypass path - only entered if bypass subscribers exist | ||
| if (this._bypassSubscribers !== null) { | ||
| const activeKeys = getSuppressionsStorage().getStore(); | ||
| const bypassSubscribers = this._bypassSubscribers; | ||
| for (let i = 0; i < bypassSubscribers.length; i++) { | ||
| try { | ||
| const { handler, subscriberId } = bypassSubscribers[i]; | ||
| if (activeKeys?.has(subscriberId)) continue; | ||
| handler(data, this.name); | ||
| } catch (err) { | ||
| process.nextTick(() => { | ||
| triggerUncaughtException(err, false); | ||
| }); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| withStoreScope(data) { | ||
|
|
@@ -221,18 +364,18 @@ class Channel { | |
| prototype === ActiveChannel.prototype; | ||
| } | ||
|
|
||
| subscribe(subscription) { | ||
| subscribe(subscription, options) { | ||
| markActive(this); | ||
| this.subscribe(subscription); | ||
| this.subscribe(subscription, options); | ||
| } | ||
|
|
||
| unsubscribe() { | ||
| return false; | ||
| } | ||
|
|
||
| bindStore(store, transform) { | ||
| bindStore(store, transform, options) { | ||
| markActive(this); | ||
| this.bindStore(store, transform); | ||
| this.bindStore(store, transform, options); | ||
| } | ||
|
|
||
| unbindStore() { | ||
|
|
@@ -366,12 +509,12 @@ class BoundedChannel { | |
| this.end?.hasSubscribers; | ||
| } | ||
|
|
||
| subscribe(handlers) { | ||
| subscribe(handlers, options) { | ||
| for (let i = 0; i < boundedEvents.length; ++i) { | ||
| const name = boundedEvents[i]; | ||
| if (!handlers[name]) continue; | ||
|
|
||
| this[name]?.subscribe(handlers[name]); | ||
| this[name]?.subscribe(handlers[name], options); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -458,26 +601,26 @@ class TracingChannel { | |
| this.error?.hasSubscribers; | ||
| } | ||
|
|
||
| subscribe(handlers) { | ||
| subscribe(handlers, options) { | ||
| // Subscribe to call window (start/end) | ||
| if (handlers.start || handlers.end) { | ||
| this.#callWindow.subscribe({ | ||
| start: handlers.start, | ||
| end: handlers.end, | ||
| }); | ||
| }, options); | ||
| } | ||
|
|
||
| // Subscribe to continuation window (asyncStart/asyncEnd) | ||
| if (handlers.asyncStart || handlers.asyncEnd) { | ||
| this.#continuationWindow.subscribe({ | ||
| start: handlers.asyncStart, | ||
| end: handlers.asyncEnd, | ||
| }); | ||
| }, options); | ||
| } | ||
|
|
||
| // Subscribe to error channel | ||
| if (handlers.error) { | ||
| this.error.subscribe(handlers.error); | ||
| this.error.subscribe(handlers.error, options); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -633,10 +776,22 @@ function tracingChannel(nameOrChannels) { | |
|
|
||
| dc_binding.linkNativeChannel((name) => channel(name)); | ||
|
|
||
| function suppressed(key, fn, thisArg, ...args) { | ||
| validateFunction(fn, 'fn'); | ||
|
|
||
| validateBypassKey(key, 'key'); | ||
|
|
||
| const currentSet = getSuppressionsStorage().getStore(); | ||
| const next = currentSet ? new SafeSet(currentSet) : new SafeSet(); | ||
| next.add(key); | ||
| return withSuppressionsContext(next, fn, thisArg, args); | ||
| } | ||
|
|
||
| module.exports = { | ||
| channel, | ||
| hasSubscribers, | ||
| subscribe, | ||
| suppressed, | ||
| tracingChannel, | ||
| unsubscribe, | ||
| boundedChannel, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.