package
0.0.0-20250311023717-5c21e974eed8
Repository: https://github.com/thrasher-corp/gocryptotrader.git
Documentation: pkg.go.dev

# README

GoCryptoTrader Exchange Stream Package

This package is part of the GoCryptoTrader project and is responsible for handling exchange streaming data.

Overview

The stream package uses Gorilla Websocket and provides functionalities to connect to various cryptocurrency exchanges and handle real-time data streams.

Features

  • Handle real-time market data streams
  • Unified interface for managing data streams
  • Multi-connection management - a system that can be used to manage multiple connections to the same exchange
  • Connection monitoring - a system that can be used to monitor the health of the websocket connections. This can be used to check if the connection is still alive and if it is not, it will attempt to reconnect
  • Traffic monitoring - will reconnect if no message is sent for a period of time defined in your config
  • Subscription management - a system that can be used to manage subscriptions to various data streams
  • Rate limiting - a system that can be used to rate limit the number of requests sent to the exchange
  • Message ID generation - a system that can be used to generate message IDs for websocket requests
  • Websocket message response matching - can be used to match websocket responses to the requests that were sent

Usage

Default single websocket connection

Here is a basic example of how to setup the stream package for websocket:

package main

import (
    "github.com/thrasher-corp/gocryptotrader/exchanges/stream"
    exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
    "github.com/thrasher-corp/gocryptotrader/exchanges/request"
)

type Exchange struct {
    exchange.Base
}

// In the exchange wrapper this will set up the initial pointer field provided by exchange.Base
func (e *Exchange) SetDefault() {
    e.Websocket = stream.NewWebsocket()
	e.WebsocketResponseMaxLimit = exchange.DefaultWebsocketResponseMaxLimit
	e.WebsocketResponseCheckTimeout = exchange.DefaultWebsocketResponseCheckTimeout
	e.WebsocketOrderbookBufferLimit = exchange.DefaultWebsocketOrderbookBufferLimit
}

// In the exchange wrapper this is the original setup pattern for the websocket services 
func (e *Exchange) Setup(exch *config.Exchange) error {
    // This sets up global connection, sub, unsub and generate subscriptions for each connection defined below.
    if err := e.Websocket.Setup(&stream.WebsocketSetup{
		ExchangeConfig:                         exch,
		DefaultURL:                             connectionURLString,
		RunningURL:                             connectionURLString,
		Connector:                              e.WsConnect,
		Subscriber:                             e.Subscribe,
		Unsubscriber:                           e.Unsubscribe,
		GenerateSubscriptions:                  e.GenerateDefaultSubscriptions,
		Features:                               &e.Features.Supports.WebsocketCapabilities,
		MaxWebsocketSubscriptionsPerConnection: 240,
		OrderbookBufferConfig: buffer.Config{ Checksum: e.CalculateUpdateOrderbookChecksum },
	}); err != nil {
		return err
	}

    // This is a public websocket connection
	if err := ok.Websocket.SetupNewConnection(&stream.ConnectionSetup{
		URL:                  connectionURLString,
		ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
		ResponseMaxLimit:     exchangeWebsocketResponseMaxLimit,
		RateLimit:            request.NewRateLimitWithWeight(time.Second, 2, 1),
	}); err != nil {
		return err
	}

    // This is a private websocket connection 
	return ok.Websocket.SetupNewConnection(&stream.ConnectionSetup{
		URL:                  privateConnectionURLString,
		ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
		ResponseMaxLimit:     exchangeWebsocketResponseMaxLimit,
		Authenticated:        true,
		RateLimit:            request.NewRateLimitWithWeight(time.Second, 2, 1),
	})
}

Multiple websocket connections

The example below provides the now optional multi connection management system which allows for more connections to be maintained and established based off URL, connections types, asset types etc.

func (e *Exchange) Setup(exch *config.Exchange) error {
    // This sets up global connection, sub, unsub and generate subscriptions for each connection defined below.
    if err := e.Websocket.Setup(&stream.WebsocketSetup{
		ExchangeConfig:               exch,
		Features:                     &e.Features.Supports.WebsocketCapabilities,
		FillsFeed:                    e.Features.Enabled.FillsFeed,
		TradeFeed:                    e.Features.Enabled.TradeFeed,
		UseMultiConnectionManagement: true,
	})
	if err != nil {
		return err
	}
	// Spot connection
	err = g.Websocket.SetupNewConnection(&stream.ConnectionSetup{
		URL:                      connectionURLStringForSpot,
		RateLimit:                request.NewWeightedRateLimitByDuration(gateioWebsocketRateLimit),
		ResponseCheckTimeout:     exch.WebsocketResponseCheckTimeout,
		ResponseMaxLimit:         exch.WebsocketResponseMaxLimit,
        // Custom handlers for the specific connection:
		Handler:                  e.WsHandleSpotData,
		Subscriber:               e.SpotSubscribe,
		Unsubscriber:             e.SpotUnsubscribe,
		GenerateSubscriptions:    e.GenerateDefaultSubscriptionsSpot,
		Connector:                e.WsConnectSpot,
		BespokeGenerateMessageID: e.GenerateWebsocketMessageID,
	})
	if err != nil {
		return err
	}
	// Futures connection - USDT margined
	err = g.Websocket.SetupNewConnection(&stream.ConnectionSetup{
		URL:                  connectionURLStringForSpotForFutures,
		RateLimit:            request.NewWeightedRateLimitByDuration(gateioWebsocketRateLimit),
		ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
		ResponseMaxLimit:     exch.WebsocketResponseMaxLimit,
        // Custom handlers for the specific connection:
		Handler: func(ctx context.Context, incoming []byte) error {	return e.WsHandleFuturesData(ctx, incoming, asset.Futures)	},
		Subscriber:               e.FuturesSubscribe,
		Unsubscriber:             e.FuturesUnsubscribe,
		GenerateSubscriptions:    func() (subscription.List, error) { return e.GenerateFuturesDefaultSubscriptions(currency.USDT) },
		Connector:                e.WsFuturesConnect,
		BespokeGenerateMessageID: e.GenerateWebsocketMessageID,
	})
	if err != nil {
		return err
	}
}

# Packages

# Functions

NewMatch returns a new Match.
NewWebsocket initialises the websocket struct.
SetupGlobalReporter sets a reporter interface to be used for all exchange requests.

# Constants

Websocket functionality list and state consts.
Websocket functionality list and state consts.
Websocket functionality list and state consts.
Websocket functionality list and state consts.

# Variables

Public websocket errors.
Public websocket errors.
Public websocket errors.
Public websocket errors.
ErrSignatureNotMatched is returned when a signature does not match a request.
Public websocket errors.
Public websocket errors.
Public websocket errors.
Public websocket errors.
Public websocket errors.
Public websocket errors.
Public websocket errors.

# Structs

ConnectionSetup defines variables for an individual stream connection.
ConnectionWrapper contains the connection setup details to be used when attempting a new connection.
FundingData defines funding data.
KlineData defines kline feed.
Match is a distributed subtype that handles the matching of requests and responses in a timely manner, reducing the need to differentiate between connections.
PingHandler container for ping handler settings.
Response defines generalised data from the stream connection.
UnhandledMessageWarning defines a container for unhandled message warnings.
Websocket defines a return type for websocket connections via the interface wrapper for routine processing.
WebsocketConnection contains all the data needed to send a message to a WS connection.
WebsocketPositionUpdated reflects a change in orders/contracts on an exchange.
WebsocketSetup defines variables for setting up a websocket connection.

# Interfaces

Connection defines a streaming services connection.
Inspector is used to verify messages via SendMessageReturnResponsesWithInspection It inspects the []bytes websocket message and returns true if the message is the final message in a sequence of expected messages.
Reporter interface groups observability functionality over Websocket request latency.

# Type aliases

ClosureFrame is a closure function that wraps monitoring variables with observer, if the return is true the frame will exit.