package
2.5.0+incompatible
Repository: https://github.com/grafana/tempo.git
Documentation: pkg.go.dev

# README

pipeline

This package contains elements for building discrete/http and streaming/grpc request pipelines. It is designed to be used as building blocks that are constructed in frontend.go. Something like the following:

	searchPipeline := pipeline.Build(
		asyncPipeline(cfg, newAsyncSearchSharder(reader, o, cfg.Search.Sharder, logger), logger),
		[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare},
		next)

  http := newSearchHTTPHandler(cfg, searchPipeline, logger)
  grpc := newSearchStreamingGRPCHandler(cfg, searchPipeline, apiPrefix, logger)

standard pipeline

The standard pipeline looks like the following:

collector -> pipeline -> http.Roundtripper

The final http.Roundtripper is the old cortex.frontend which then farms requests out to the queriers. This can also easily be replaced with a mock to write integration tests.

collector

There are two collectors that are designed to be used directly with a desired endpoint: GRPC and HTTP. Both types of collectors require a combiner from the frontend/combiner package. These type aware combiners handle the specifics of unmarshalling and combining the type while the collectors handles the accumulation of responses and the ergonomics of streaming/discrete responses.

The combiners are responsible for aggregating and combining the results as well as translating the results into either an HTTP or GRPC response.

GRPC

The GRPC collector is designed to return a series of streaming diffs which can then be returned to a client from a GRPC server.

func NewGRPCCollector[T combiner.TResponse](next AsyncRoundTripper[*http.Response], combiner combiner.GRPCCombiner[T], send func(T) error) *GRPCCollector[T]

It takes a callback that happens to nicely match up with the server.Send function which allows a user to write something like:

  collector := pipeline.NewGRPCCollector[*tempopb.SearchResponse](next, c, srv.Send)
  err := collector.RoundTrip(req)

HTTP

The HTTP collector is designed to return a single, discrete response like a traditional http endpoint:

func NewHTTPCollector(next AsyncRoundTripper[*http.Response], combiner combiner.Combiner) http.RoundTripper

Since it returns a roundtripper it's quite easy to use with existing http code. Something like:

		// build and use roundtripper
		combiner := combiner.NewTypedSearch(int(limit))
		rt := pipeline.NewHTTPCollector(next, combiner)

		return rt.RoundTrip(req)

pipeline

The pipeline referenced above can be built in many different ways, but a standard pipeline will look like:

| ------------ async ------------- | | ------------- sync -------------- | async multitenant -> async sharding -> cache -> status code rewrite -> retry

The first two elements are part of the asynchronous pipeline. These elements often create many jobs for one request and asynchronously pipe responses back to the collector level for recombination.

The last 3 elements are part of the synchronous pipeline. This part of the pipeline always returns one response for each request.

async multitenant

Creates one job for every tenant in the tenant header and passes them forward.

async sharding

Most pipelines include a "job sharding" step that breaks the request into many smaller requests to be farmed out to queriers. "async sharding" is this step. It is not required for all endpoints.

cache

If the context includes a cache key the cache layer will automatically attempt to retrieve it from cache and shortcircuit the reponse back up the pipeline.

status code rewrite

This steps maps status codes from the querier into status codes appropriate for the pipeline. In particular it will take 400s and map them to 500.

retry

Retry is a lift and shift of the previous retry middleware. If a

error handling

  • All errors in the pipeline must be propagated back faithfully
  • A pipeline item should turn an error into a context aware response if possible. i.e.
    if err := parse(req.Url); err != nil {
      return http.Response(400, err), nil
    }
    
  • The combiner aggregates responses and turns them into a GRPC response/error or an HTTP response/error

context cancellation

Currently context is only cancelled in the collectors. The collector is watching for any of the following events to know that all subjobs can be cancelled:

  • Error return i.e. err return param on pipeline items
  • Bad Request return i.e. The combiner receives a 500 or 400
  • Response limit reached
  • All jobs exhausted
  • Context canceled (due to client disconnect)

# Functions

Build takes a slice of async, sync middleware and a http.RoundTripper and builds a request pipeline.
No description provided by the author
No description provided by the author
NewAsyncSharderChan creates a new AsyncResponse that shards requests to the next AsyncRoundTripper[combiner.PipelineResponse] using a limited number of goroutines.
NewAsyncSharderFunc creates a new AsyncResponse that shards requests to the next AsyncRoundTripper[combiner.PipelineResponse].
NewBadRequest creates a new AsyncResponse that wraps a single http.Response with a 400 status code and the provided error message.
No description provided by the author
No description provided by the author
NewHTTPCollector returns a new http collector.
NewHTTPToAsyncResponse creates a new AsyncResponse that wraps a single http.Response.
No description provided by the author
NewMultiTenantMiddleware returns a middleware that takes a request and fans it out to each tenant It currently accepts a success and failure counter, to prevent metrics collisions with.
No description provided by the author
NewNoopMiddleware returns a middleware that is a passthrough only.
No description provided by the author
NewStatusCodeAdjustWare exists to adjust status codes from the queriers for the rest of the pipeline to consume.
No description provided by the author
NewSuccessfulResponse creates a new AsyncResponse that wraps a single http.Response with a 200 status code and the provided body.

# Variables

No description provided by the author

# Structs

No description provided by the author

# Interfaces

AsyncMiddleware is used to build pipelines of pipeline.Roundtrippers.
No description provided by the author
Middleware is used to build pipelines of pipeline.Roundtrippers.
No description provided by the author

# Type aliases

AsyncMiddlewareFunc is like http.HandlerFunc, but for Middleware.
No description provided by the author
MiddlewareFunc is like http.HandlerFunc, but for Middleware.
No description provided by the author