Skip to content

Commit 9912a64

Browse files
authored
Merge 67efa0a into 7edf984
2 parents 7edf984 + 67efa0a commit 9912a64

File tree

3 files changed

+160
-12
lines changed

3 files changed

+160
-12
lines changed

plugins/node/opentelemetry-instrumentation-aws-lambda/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
"@opentelemetry/api": "1.0.2",
5050
"@opentelemetry/core": "0.24.0",
5151
"@opentelemetry/node": "0.24.0",
52+
"@opentelemetry/tracing": "^0.24.0",
5253
"@types/mocha": "7.0.2",
5354
"@types/node": "14.17.9",
5455
"codecov": "3.8.3",
@@ -64,7 +65,6 @@
6465
"@opentelemetry/propagator-aws-xray": "^0.25.0",
6566
"@opentelemetry/resources": "^0.24.0",
6667
"@opentelemetry/semantic-conventions": "^0.24.0",
67-
"@opentelemetry/tracing": "^0.24.0",
6868
"@types/aws-lambda": "8.10.81"
6969
}
7070
}

plugins/node/opentelemetry-instrumentation-aws-lambda/src/instrumentation.ts

+31-11
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ import {
4444
SemanticAttributes,
4545
SemanticResourceAttributes,
4646
} from '@opentelemetry/semantic-conventions';
47-
import { BasicTracerProvider } from '@opentelemetry/tracing';
4847

4948
import {
5049
APIGatewayProxyEventHeaders,
@@ -73,7 +72,7 @@ const headerGetter: TextMapGetter<APIGatewayProxyEventHeaders> = {
7372
export const traceContextEnvironmentKey = '_X_AMZN_TRACE_ID';
7473

7574
export class AwsLambdaInstrumentation extends InstrumentationBase {
76-
private _tracerProvider: TracerProvider | undefined;
75+
private _forceFlush?: () => Promise<void>;
7776

7877
constructor(protected override _config: AwsLambdaInstrumentationConfig = {}) {
7978
super('@opentelemetry/instrumentation-aws-lambda', VERSION, _config);
@@ -228,7 +227,27 @@ export class AwsLambdaInstrumentation extends InstrumentationBase {
228227

229228
override setTracerProvider(tracerProvider: TracerProvider) {
230229
super.setTracerProvider(tracerProvider);
231-
this._tracerProvider = tracerProvider;
230+
this._forceFlush = this._getForceFlush(tracerProvider);
231+
}
232+
233+
private _getForceFlush(tracerProvider: TracerProvider) {
234+
if (!tracerProvider) return undefined;
235+
236+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
237+
let currentProvider: any = tracerProvider;
238+
239+
if (typeof currentProvider.getDelegate === 'function') {
240+
currentProvider = currentProvider.getDelegate();
241+
}
242+
243+
if (typeof currentProvider.getActiveSpanProcessor === 'function') {
244+
const activeSpanProcessor = currentProvider.getActiveSpanProcessor();
245+
if (typeof activeSpanProcessor.forceFlush === 'function') {
246+
return activeSpanProcessor.forceFlush.bind(activeSpanProcessor);
247+
}
248+
}
249+
250+
return undefined;
232251
}
233252

234253
private _wrapCallback(original: Callback, span: Span): Callback {
@@ -267,15 +286,16 @@ export class AwsLambdaInstrumentation extends InstrumentationBase {
267286
}
268287

269288
span.end();
270-
if (this._tracerProvider instanceof BasicTracerProvider) {
271-
this._tracerProvider
272-
.getActiveSpanProcessor()
273-
.forceFlush()
274-
.then(
275-
() => callback(),
276-
() => callback()
277-
);
289+
290+
if (this._forceFlush) {
291+
this._forceFlush().then(
292+
() => callback(),
293+
() => callback()
294+
);
278295
} else {
296+
diag.error(
297+
'Spans may not be exported for the lambda function because we are not force flushing before callback.'
298+
);
279299
callback();
280300
}
281301
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
// We access through node_modules to allow it to be patched.
18+
/* eslint-disable node/no-extraneous-require */
19+
20+
import * as path from 'path';
21+
22+
import { AwsLambdaInstrumentation } from '../../src';
23+
import {
24+
BatchSpanProcessor,
25+
InMemorySpanExporter,
26+
} from '@opentelemetry/tracing';
27+
import { NodeTracerProvider } from '@opentelemetry/node';
28+
import { Context } from 'aws-lambda';
29+
import * as assert from 'assert';
30+
import { ProxyTracerProvider, TracerProvider } from '@opentelemetry/api';
31+
32+
const memoryExporter = new InMemorySpanExporter();
33+
34+
describe('force flush', () => {
35+
let instrumentation: AwsLambdaInstrumentation;
36+
37+
let oldEnv: NodeJS.ProcessEnv;
38+
39+
const ctx = {
40+
functionName: 'my_function',
41+
invokedFunctionArn: 'my_arn',
42+
awsRequestId: 'aws_request_id',
43+
} as Context;
44+
45+
const initializeHandler = (handler: string, provider: TracerProvider) => {
46+
process.env._HANDLER = handler;
47+
48+
instrumentation = new AwsLambdaInstrumentation();
49+
instrumentation.setTracerProvider(provider);
50+
};
51+
52+
const lambdaRequire = (module: string) =>
53+
require(path.resolve(__dirname, '..', module));
54+
55+
beforeEach(() => {
56+
oldEnv = { ...process.env };
57+
process.env.LAMBDA_TASK_ROOT = path.resolve(__dirname, '..');
58+
});
59+
60+
afterEach(() => {
61+
process.env = oldEnv;
62+
instrumentation.disable();
63+
64+
memoryExporter.reset();
65+
});
66+
67+
it('should force flush NodeTracerProvider', async () => {
68+
const provider = new NodeTracerProvider();
69+
provider.addSpanProcessor(new BatchSpanProcessor(memoryExporter));
70+
provider.register();
71+
let forceFlushed = false;
72+
const forceFlush = () =>
73+
new Promise<void>(resolve => {
74+
forceFlushed = true;
75+
resolve();
76+
});
77+
provider.activeSpanProcessor.forceFlush = forceFlush;
78+
initializeHandler('lambda-test/sync.handler', provider);
79+
80+
await new Promise((resolve, reject) => {
81+
lambdaRequire('lambda-test/sync').handler(
82+
'arg',
83+
ctx,
84+
(err: Error, res: any) => {
85+
if (err) {
86+
reject(err);
87+
} else {
88+
resolve(res);
89+
}
90+
}
91+
);
92+
});
93+
94+
assert.strictEqual(forceFlushed, true);
95+
});
96+
97+
it('should force flush ProxyTracerProvider with NodeTracerProvider', async () => {
98+
const nodeTracerProvider = new NodeTracerProvider();
99+
nodeTracerProvider.addSpanProcessor(new BatchSpanProcessor(memoryExporter));
100+
nodeTracerProvider.register();
101+
const provider = new ProxyTracerProvider();
102+
provider.setDelegate(nodeTracerProvider);
103+
let forceFlushed = false;
104+
const forceFlush = () =>
105+
new Promise<void>(resolve => {
106+
forceFlushed = true;
107+
resolve();
108+
});
109+
nodeTracerProvider.activeSpanProcessor.forceFlush = forceFlush;
110+
initializeHandler('lambda-test/sync.handler', provider);
111+
112+
await new Promise((resolve, reject) => {
113+
lambdaRequire('lambda-test/sync').handler(
114+
'arg',
115+
ctx,
116+
(err: Error, res: any) => {
117+
if (err) {
118+
reject(err);
119+
} else {
120+
resolve(res);
121+
}
122+
}
123+
);
124+
});
125+
126+
assert.strictEqual(forceFlushed, true);
127+
});
128+
});

0 commit comments

Comments
 (0)