Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add isolate module #5861

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Add the new `go.opentelemetry.io/contrib/instrgen` package to provide auto-generated source code instrumentation. (#3068, #3108)
- Support for stdoutlog exporter in `go.opentelemetry.io/contrib/config`. (#5850)
- Add `go.opentelemetry.io/contrib/processors/isolate` module.
This module provides an isolating log processor. (#5861)

### Removed

Expand Down
1 change: 1 addition & 0 deletions CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ instrumentation/net/http/otelhttp/ @open-te
instrumentation/runtime/ @open-telemetry/go-approvers @MadVikingGod

processors/baggagecopy @open-telemetry/go-approvers @codeboten @MikeGoldsmith
processors/isolate @open-telemetry/go-approvers @pellared
processors/minsev @open-telemetry/go-approvers @MrAlias

propagators/autoprop/ @open-telemetry/go-approvers @MrAlias
Expand Down
25 changes: 25 additions & 0 deletions processors/isolate/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package isolate_test

import (
"go.opentelemetry.io/contrib/processors/isolate"
"go.opentelemetry.io/otel/sdk/log"
)

func Example() {
// Log processing pipelines that process and emit telemetry.
var p1 log.Processor
var p2 log.Processor
var p3 log.Processor

// Register the processors using
// isolate.NewLogProcessor and the log.WithProcessor option
// so that the log records are not shared between pipelines.
_ = log.NewLoggerProvider(
log.WithProcessor(isolate.NewLogProcessor(p1)),
log.WithProcessor(isolate.NewLogProcessor(p2)),
log.WithProcessor(isolate.NewLogProcessor(p3)),
)
}
23 changes: 23 additions & 0 deletions processors/isolate/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
module go.opentelemetry.io/contrib/processors/isolate

go 1.21

require (
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/otel/log v0.4.0
go.opentelemetry.io/otel/sdk/log v0.4.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
golang.org/x/sys v0.22.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
33 changes: 33 additions & 0 deletions processors/isolate/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
go.opentelemetry.io/otel/log v0.4.0 h1:/vZ+3Utqh18e8TPjuc3ecg284078KWrR8BRz+PQAj3o=
go.opentelemetry.io/otel/log v0.4.0/go.mod h1:DhGnQvky7pHy82MIRV43iXh3FlKN8UUKftn0KbLOq6I=
go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q=
go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s=
go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE=
go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg=
go.opentelemetry.io/otel/sdk/log v0.4.0 h1:1mMI22L82zLqf6KtkjrRy5BbagOTWdJsqMY/HSqILAA=
go.opentelemetry.io/otel/sdk/log v0.4.0/go.mod h1:AYJ9FVF0hNOgAVzUG/ybg/QttnXhUePWAupmCqtdESo=
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
60 changes: 60 additions & 0 deletions processors/isolate/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package isolate provides an isolating processor that can be used to
// configure independent processing pipelines.
package isolate // import "go.opentelemetry.io/contrib/processors/isolate"

import (
"context"

"go.opentelemetry.io/otel/sdk/log"
)

// NewLogProcessor returns a new [LogProcessor] that wraps the downstream
// [log.Processor].
//
// If downstream is nil a default No-Op [log.Processor] is used. The returned
// processor will not be enabled for nor emit any records.
func NewLogProcessor(downstream log.Processor) *LogProcessor {
if downstream == nil {
downstream = defaultProcessor

Check warning on line 21 in processors/isolate/processor.go

View check run for this annotation

Codecov / codecov/patch

processors/isolate/processor.go#L21

Added line #L21 was not covered by tests
}
return &LogProcessor{Processor: downstream}
}

// LogProcessor is an [log.Processor] implementation clones the received log
// records in order to no share mutable data with subsequent registered processors.
//
// If the wrapped [log.Processor] is nil, calls to the LogProcessor methods
// will panic.
//
// Use [NewLogProcessor] to create a new LogProcessor that ensures
// no panics.
type LogProcessor struct {
log.Processor
}

// Compile time assertion that LogProcessor implements log.Processor.
var _ log.Processor = (*LogProcessor)(nil)

// OnEmit clones the record and calls the wrapped downstream processor.
func (p *LogProcessor) OnEmit(ctx context.Context, record log.Record) error {
record = record.Clone()
return p.Processor.OnEmit(ctx, record)
}

// Enabled clones the record and calls the wrapped downstream processor.
func (p *LogProcessor) Enabled(ctx context.Context, record log.Record) bool {
record = record.Clone()
return p.Processor.Enabled(ctx, record)
}

var defaultProcessor = noopProcessor{}
Copy link
Member Author

@pellared pellared Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is another usage of an no-op processor in a processor implementation.
I think we can think about adding it the the SDK.

See also: #5817 (comment)

EDIT: I created open-telemetry/opentelemetry-go#5580


type noopProcessor struct{}

func (p noopProcessor) OnEmit(context.Context, log.Record) error { return nil }
func (p noopProcessor) Enabled(context.Context, log.Record) bool { return false }
func (p noopProcessor) Shutdown(context.Context) error { return nil }
func (p noopProcessor) ForceFlush(context.Context) error { return nil }

Check warning on line 60 in processors/isolate/processor.go

View check run for this annotation

Codecov / codecov/patch

processors/isolate/processor.go#L57-L60

Added lines #L57 - L60 were not covered by tests
140 changes: 140 additions & 0 deletions processors/isolate/processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package isolate

import (
"context"
"strconv"
"testing"

"github.com/stretchr/testify/assert"

logapi "go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/sdk/log"
)

const testAttrCount = 10

var testCtx = context.WithValue(context.Background(), "k", "v") //nolint // Simplify for testing.

func TestLogProcessorOnEmit(t *testing.T) {
wrapped := &processor{ReturnErr: assert.AnError}

p := NewLogProcessor(wrapped)

var r log.Record
for i := 0; i < testAttrCount; i++ {
r.AddAttributes(logapi.Int(strconv.Itoa(i), i))
}

assert.ErrorIs(t, p.OnEmit(testCtx, r), assert.AnError)

// Assert passthrough of the arguments.
if assert.Len(t, wrapped.OnEmitCalls, 1) {
assert.Equal(t, testCtx, wrapped.OnEmitCalls[0].Ctx)
assert.Equal(t, r, wrapped.OnEmitCalls[0].Record)
}

// Assert that the record is not being affected by subsequent modifications.
r.AddAttributes(logapi.String("foo", "bar"))
assert.Equal(t, testAttrCount, wrapped.OnEmitCalls[0].Record.AttributesLen(), "should be isolated from subsequent modifications")
}

func TestLogProcessorEnabled(t *testing.T) {
wrapped := &processor{}

p := NewLogProcessor(wrapped)

var r log.Record
for i := 0; i < testAttrCount; i++ {
r.AddAttributes(logapi.Int(strconv.Itoa(i), i))
}

assert.True(t, p.Enabled(testCtx, r))

// Assert passthrough of the arguments.
if assert.Len(t, wrapped.EnabledCalls, 1) {
assert.Equal(t, testCtx, wrapped.EnabledCalls[0].Ctx)
assert.Equal(t, r, wrapped.EnabledCalls[0].Record)
}

// Assert that the record is not being affected by subsequent modifications.
r.AddAttributes(logapi.String("foo", "bar"))
assert.Equal(t, testAttrCount, wrapped.EnabledCalls[0].Record.AttributesLen(), "should be isolated from subsequent modifications")
}

type args struct {
Ctx context.Context
Record log.Record
}

type processor struct {
ReturnErr error

OnEmitCalls []args
EnabledCalls []args
ForceFlushCalls []context.Context
ShutdownCalls []context.Context
}

func (p *processor) OnEmit(ctx context.Context, r log.Record) error {
p.OnEmitCalls = append(p.OnEmitCalls, args{ctx, r})
return p.ReturnErr
}

func (p *processor) Enabled(ctx context.Context, r log.Record) bool {
p.EnabledCalls = append(p.EnabledCalls, args{ctx, r})
return true
}

func (p *processor) Shutdown(ctx context.Context) error {
p.ShutdownCalls = append(p.ShutdownCalls, ctx)
return p.ReturnErr
}

func (p *processor) ForceFlush(ctx context.Context) error {
p.ForceFlushCalls = append(p.ForceFlushCalls, ctx)
return p.ReturnErr
}

func BenchmarkLogProcessor(b *testing.B) {
var ok bool
var err error

var r log.Record
r.SetBody(logapi.StringValue("message"))

var rWithShared log.Record
for i := 0; i < testAttrCount; i++ {
rWithShared.AddAttributes(logapi.Int(strconv.Itoa(i), i))
}

testCases := []struct {
desc string
r log.Record
}{
{
desc: "Record without shared data",
r: r,
},
{
desc: "Record with shared data",
r: rWithShared,
},
}

p := NewLogProcessor(noopProcessor{})

for _, tc := range testCases {
b.Run(tc.desc, func(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
ok = p.Enabled(testCtx, tc.r)
err = p.OnEmit(testCtx, tc.r)
}
})
}

_, _ = ok, err
}
1 change: 1 addition & 0 deletions versions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ module-sets:
version: v0.1.0
modules:
- go.opentelemetry.io/contrib/processors/baggagecopy
- go.opentelemetry.io/contrib/processors/isolate
- go.opentelemetry.io/contrib/processors/minsev
experimental-detectors:
version: v0.0.1
Expand Down