Categorygithub.com/bitdabbler/fluent
modulepackage
0.0.0-20231130015539-ae7757c11d69
Repository: https://github.com/bitdabbler/fluent.git
Documentation: pkg.go.dev

# README

fluent

A full Fluent logging stack in Go, including

  • fluent.Handler - serializes structured logs (implements slog.Handler)
  • fluent.Client - manages connections and writing to the Fluent server
  • fluent.Encoder - provides a common encoder/buffer, bridging the Handler and Client
import (
    // ...
    "github.com/bitdabbler/fluent"
)

Purpose

Why write a yet another Fluent client and yet another structured log handler? In short, efficiency.

Let's focus on the Fluent Message event mode, with the msgpack form: [tag, time, record, option], where record is equivalent to the Go type map[string]any.

ref: https://github.com/fluent/fluent/wiki/Forward-Protocol-Specification-v1#message-modes

Consider an API such as:

// NOT this libary
client.Send(tag string, timestamp time.Time, record map[string]any)

This hypothetical API (similar to those found in existing libraries) demonstrates the inefficiencies we seek to avoid. First, the log attributes (key-value pairs) had to be collected into the map[string]any for the record field. Second, all of the fields for the Message have to be copied across one extra function boundary. Additionally, those fields are often then collected into an object that represents one Message.

For optimal efficiency, the fluent.Client sends directly from the buffers used by the fluent.Handler. This allows log handlers to serialize log values without first marshaling them into intermediate objects, avoiding redundant serialization steps and excess copying.

Efficiency optimizations:

  • the Handler and the Client directly use the same encoders/buffers
  • comprehensive resource pooling to minimize heap allocations
  • log preludes are encoded only once per pool, and are copied into each Encoder only once, no matter how many times the Encoder is used
  • shared log attributes (WithAttrs) are encoded only once, no matter how many times they are used by the Handler
  • where map values have a length that can change, we overallocate a single byte for the msgpack map header, so that we perform neither look-ahead nor extra copying when the length changes (an example of this occurs when the key of a child group Attr is an empty string, causing its own Attrs to get serialized into the parent's scope)

Basic Usage

h, err := fluent.NewHandler(fluentHost, fluentTag, nil)
if err != nil {
    log.Fatal(err)
}
l := slog.New(h)

// use locally
l.Info("the message", slog.Int("key", 42))

// or set it as the slog package-level logger
slog.SetDefault(l)

// and then use it globally
slog.Info("the message", slog.Int("key", 42))

Details

In the above example, we let the Handler deal with setting up the Client and the EncoderPool.

For the Client, it sets the level of concurrency to 2, and the send queue depth to 16, making writes to the server asynchronous. For the EncoderPool, it uses only the default EncoderOptions. However, you can use an alternative constructor to fully customize everything.

// customize the Client
c, err := fluent.NewClient(fluentHost, &fluent.ClientOptions{
    Port: fluentPort,
    DialTimeout: time.Seconds * 5,
    SkipEagerDial: true,
})
if err != nil {
  log.Fatal(err)
}

// customize the EncoderPool
p, err := fluent.NewEncoderPool(fluentTag, &fluent.EncoderOptions{
    UseCoarseTimestamps: true,
})
if err != nil {
  log.Fatal(err)
}

// customize the Handler
h := fluent.NewHandlerCustom(c, p, &fluent.HandlerOptions{
    AddSource: true,
    TimeFormat: time.RFC1123Z,
})

l := slog.New(h)
slog.SetDefault(l)
slog.Info("another message", slog.String("path", "/enlightenment"))

fluent.Handler

Constructors:

  • NewHandler(host, tag string, opts *HandlerOptions) (*Handler, error)
  • NewHandlerCustom(client Sink, pool *EncoderPool, opts *HandlerOptions) *Handler

Configuration options

OptionTypeDefault
AddSourceboolfalse
TimeFormatstringtime.RFC3339Nano
Levelslog.Levelerslog.LevelInfo
Verboseboolfalse

Passing log values through context.Context

A Handler can extract a slog.Attr from a context.Context. You can use a slog.Group to add multiple values.

// use the fluent.ContextKey
ctx := context.WithValue(context.Background(), fluent.ContextKey, 
	slog.Group("req",
		slog.String("method", r.Method),
		slog.String("url", r.URL.String()),
	)
)

// log with context, resulting in a payload with the record field:
// {level:INFO,msg:success,req:{method:Get,url:www.example.com}}
slog.InfoContext(ctx, "success") 

Graceful shutdown

The Shutdown method calls Client.Shutdown(). That immediately closes the send queue channel, so the caller must guarantee that no more calls to the Handler methods will occur.

Shutdown blocks while the send queue is drained and all workers shutdown.

// we: 
//   - are in a higher level graceful shutdown function
//   - used `slog.SetDefault` to ensure it was used globally

// create a new Handler that only logs locally to stdout
l := slog.New(slog.NewJSONHandler(os.Stdout, nil))

// atomically switch over to that logger, so that no subsequent
// logging calls will use the `Handler` instance
slog.SetDefault(l)

// now it is safe to shutdown the Handler instance's Client
//
// this blocks until either
//   (a) the write queue is completely drained, or
//   (b) the timeout expired (no limit with context.Background())
h.Shutdown(timeoutCtx)

fluent.Client

Constructors:

  • NewClient(host string, opts *ClientOptions) (*Client, error)
  • NewClientContext(ctx context.Context, host string, opts *ClientOptions) (*Client, error)

Configuration options

OptionTypeDefault
Portint24224
Networkstringtcp
InsecureSkipVerifyboolfalse
DialTimeouttime.Duration30 seconds
SkipEagerDialintfalse (connect eagerly in New)
MaxEagerDialTriesint10
Concurrencyint1
QueueDepthint0 (writes are synchronous)
DropIfQueueFullboolfalse (blocks if queue is full)
WriteTimeouttime.Duration0 (no timeout)
MaxWriteTriesint3
Verboseboolfalse

Concurrency

Use the concurrency settings to enable the Client spin up mutliple workers internally. The workers maintain completely independent connections to the server, for thread safety with minimal locking. The default concurrency level is 1, ensuring that all logs are written out serially.

fluent.Encoder(Pool)

Constructors

  • NewEncoderPool(tag string, opts *EncoderOptions) (*EncoderPool, error)
  • NewEncoder(bufferCap int) *Encoder

Configuration Options

Functional option---Default
Modefluent.EventModeMessageMode
NewBufferCapint1KiB
MaxBufferCapint8KiB
UseCoarseTimestampsboolfalse
RequestACKsboolfalse

Design Decisions, Tradeoffs, and Current Limitations

Not Implemented (yet):

  • Handshake messages
  • [Compressed][Packed]Forward event modes (and related options)
  • explicit ACKing

The current structures and interfaces were designed with Forward event mode and explicit ACK support in mind, so the path to implement them should be smooth.

Explicit ACKs

In the Option, the protocol specification discusses the chunk option, stating:

"chunk: Clients MAY send the chunk option to confirm the server receives event records. The value is a string of Base64 representation of 128 bits unique_id which is an ID of a set of events." (emphasis added).

The chunk option is used for explicit ACKing. Whether that is intended to apply to Message event node is ambiguous. It refers to a "set of events", which relates to the other event modes, not the Message event mode, where each message includes only a single event. Additionally, prior to the Option section, the spec repeatedly and exclusively uses "chunk" to refer a binary chunk of a MessagePackEventStream.

On the other hand, the Message specification includes an optional 4th "option" value in the msgpack array, chunk/ACK support is the only option that appears applicable to Message event mode, and other libraries have included ACK support with this mode.

JSON serialization

JSON serialization is not implemented, as it is less efficient and offers no functional advantage. The log forwarder and the tools used to review logs are separate concerns. The serialized key-value pairs should appear the same regardless of how they are serialized and transported.

Attr Rewrite Hook

We do not currently provide an Attr rewriting hook analogous to the ReplaceAttr hook provided by the standard library's HandlerOptions, used by the built in TextHandler and JSONHandler. Omitting it a provides only a minimal efficiency gain, and results in the inability to rewrite Attr keys dynamically.

However, the main use case for Attr rewriting is to redact sensitive values or change the logged value, not the key. This functionality is better handled by wrapping the value in a type that implements LogValuer, as seen in the example https://pkg.go.dev/log/slog#example-LogValuer-Secret.

Protocol/Specification References

# Functions

DefaultClientOptions returns *ClientOptions with all default values.
DefaultEncoderOptions returns *DefaultEncoderOptions with all default values.
DefaultHandlerOptions returns *DefaultHandlerOptions with all default values.
InternalLogger returns the Logger used to write out internal logs, where logs get written when something goes wrong in the logging stack itself.
New creates a new Fluent client and connects to the Fluent server immediately, returning an error if it is unable to establish the connection.
NewClientContext creates a new Fluent client and connects to the Fluent server immediately, returning an error if it is unable to establish the initial connections.
NewEncoder returns a newly allocated Encoder.
NewEncoderPool creates a shared *Encoder pool that returns Encoders with the log prelude, including the outer msgpack array and the tag, pre-encoded.
NewHandler wraps uses an EncoderPool with default options, and a Client that uses default options except Concurrency is set to 2 and The QueueDepth is set to 16, for asynchronous sending.
NewHandlerCustom allows creates a Handler that wrap a Client and an EncoderPool that are fully customizable by the caller.
SetInternalLogger makes l the internal logger.
TestHandler tests a [slog.Handler].

# Constants

CompressedForwardMode indicates the Fluent CompressedPackedForward mode.
ForwardMode indicates the Fluent Forward mode.
MessageMode indicates the Fluent Message mode.
PackedForwardMode indicates the Fluent PackedForward mode.
No description provided by the author
No description provided by the author

# Variables

ContextKey is used to extract a log value from context.Context.

# Structs

Client represents a Fluent client.
ClientOptions are used to customize the Fluent Client.
Encoder provides a mspgack encoder and its underlying bytes.Buffer.
EncoderOptions are used to customize the Encoders and the Encoder pool.
Pool defines a shared *Encoder pool, used to minimize heap allocations.
Handler is an adapter that serializes Go structured logs out to Fluent msgpack arrays, without first serializing them to intermediate data structures, such as map[string]any.
HandlerOptions are used to customize the Fluent slog.Handler.

# Interfaces

Sink interface defines the Client API.

# Type aliases

No description provided by the author