Categorygithub.com/rivian/delta-go
modulepackage
0.0.0-20240823013242-0a4c7ae810f6
Repository: https://github.com/rivian/delta-go.git
Documentation: pkg.go.dev

# README

Delta Go

License

A native implementation of the Delta protocol in go. This library started as a port of delta-rs.

This project is in alpha and the API is under development.

Current implementation is designed for highly concurrent writes; reads are not yet supported.

Our use case is to ingest data, write it to the Delta table folder as a Parquet file using a Parquet go library, and then add the Parquet file to the Delta table using delta-go.

Features

Cloud Integrations

StorageStatusComment
Localdone
S3 - AWSdoneRequires lock for concurrent writes

Supported Operations

OperationStatusDescription
CreatedoneCreate a new table
AppenddoneAppend data in a Parquet file to a table
CheckpointdoneCreate a V1 checkpoint for a table. Note that the optional log cleanup has not been fully tested.

Protocol Support Level

Writer VersionRequirementStatus
Version 2Append Only Tablesdone
Version 2Column Invariants
Version 3Enforce delta.checkpoint.writeStatsAsJson
Version 3Enforce delta.checkpoint.writeStatsAsStruct
Version 3CHECK constraints
Version 4Change Data Feed
Version 4Generated Columns
Version 5Column Mapping
Version 6Identity Columns
Version 7Table Features
Reader VersionRequirementStatus
Version 2Column Mapping
Version 3Table Features (requires reader V7)

Usage

Create a table in S3. This table is configured to use DynamoDB LogStore locking to enable multi-cluster S3 support.

	store, err := s3store.New(s3Client, baseURI)
	logStore, err := dynamodblogstore.New(dynamodblogstore.Options{Client: dynamoDBClient, TableName: deltaLogStoreTableName})
	table := delta.NewTableWithLogStore(store, nillock.New(), logStore)
	metadata := delta.NewTableMetaData("Test Table", "test description", new(delta.Format).Default(), schema, []string{}, make(map[string]string))
	err := table.Create(*metadata, new(delta.Protocol).Default(), delta.CommitInfo{}, []delta.Add{})

Append data to the table. The data is in a parquet file located at parquetRelativePath; the path is relative to the baseURI.

	add, _, err := delta.NewAdd(store, storage.NewPath(parquetRelativePath), make(map[string]string))
	transaction := table.CreateTransaction(delta.NewTransactionOptions())
	transaction.AddActions([]deltalib.Action{add})
	operation := delta.Write{Mode: delta.Append}
	appMetaData := make(map[string]any)
	appMetaData["isBlindAppend"] = true
	transaction.SetAppMetadata(appMetaData)
	transaction.SetOperation(operation)
	v, err := transaction.CommitLogStore()

There are also some simple examples available in the examples/ folder.

Storage configuration on S3

If delta-go and other client(s) are being used to write to the same Delta table on S3, then it is important to configure all clients to use multi-cluster LogStore to avoid write conflicts.

# Packages

No description provided by the author
Package lock contains the resources required to create a lock.
Package logstore contains the resources required to create a log store.
Package state contains the resources required to create a state store.
Package storage contains the resources required to interact with an object store.

# Functions

ActionsFromLogEntries retrieves all the actions from a log.
BaseCommitURI returns the base path of a commit URI.
CommitOrCheckpointVersionFromURI returns true plus the version if the URI is a valid commit or checkpoint filename.
CommitURIFromVersion returns the URI of commit version.
CommitVersionFromURI returns true plus the version if the URI is a valid commit filename.
CreateCheckpoint creates a checkpoint for a table located at the store for the given version If expired log cleanup is enabled on this table, then after a successful checkpoint, run the cleanup to delete expired logs Returns whether the checkpoint was created and any error If the lock cannot be obtained, does not retry - if other processes are checkpointing there's no need to duplicate the effort.
DoesCheckpointVersionExist returns true if the given checkpoint version exists, either as a single- or multi-part checkpoint.
GetSchema recursively walks over the given struct interface i and extracts SchemaTypeStruct StructFields using reflect This is not currently being used in production and results should be inspected before being used.
IsValidCommitOrCheckpointURI returns true if a URI is a valid commit or checkpoint file name.
IsValidCommitURI returns true if a URI is a valid commit filename (not a checkpoint file, and not a temp commit).
LogEntryFromActions retrieves a log entry from a list of actions.
NewAdd returns a new Add action, using the given location and partition values The modification time will be set to now The size and stats will be retrieved from the parquet file at the given location It also returns a list of columns that did not have stats set in the parquet file.
NewCheckpointConfiguration returns the default configuration for creating checkpoints.
NewOptimizeCheckpointConfiguration returns a default enabled optimization configuration with a working folder in the table store's _delta_log/.tmp/ folder but no concurrency enabled.
NewTable creates a new Table struct without loading any data from backing storage.
NewTableMetaData creates a new TableMetaData instance.
NewTableState creates an empty table state for the given version.
NewTableStateFromActions generates table state from a list of actions.
NewTableStateFromCommit reads a specific commit version and returns the contained TableState.
NewTableWithLogStore creates a new Table instance with a log store configured.
NewTransactionOptions sets the default transaction options.
OpenTable loads the latest version of the table If the table reader or writer version is greater than the client supports, the table will still be opened, but an error will also be returned.
OpenTableWithConfiguration loads the latest version of the table, using the given configuration for optimization settings.
OpenTableWithVersion loads the table at this specific version If the table reader or writer version is greater than the client supports, the table will still be opened, but an error will also be returned.
OpenTableWithVersionAndConfiguration loads the table at this specific version using the given configuration for optimization settings.
StatsFromJSON parses JSON into a Stats object.
StatsFromParquet retrieves stats directly from the Parquet file in the Add action It does not currently support nested types, or logical types that can't be generated in Spark (UUID, interval, JSON, BSON) It also will not return stats for timestamps stored in int96 columns because the Parquet file won't have those stats.
UpdateStats computes Stats.NullCount, Stats.MinValues, Stats.MaxValues for a given k,v struct property the struct property is passed in as a pointer to ensure that it can be evaluated as nil[NULL] TODO Handle struct types.

# Constants

AddActionKey represents an Add action.
Append causes files to be appended to the target location.
AppendOnlyDeltaConfigKey represents the Delta configuration to specify whethere a table is append-only.
AppendOutputMode causes only new rows to be written when new data is available.
* array:.
AutoOptimizeAutoCompactDeltaConfigKey represents the Delta configuration to specify whether auto compaction needs to be enabled.
AutoOptimizeOptimizeWriteDeltaConfigKey represents the Delta configuration to specify whether optimized writing needs to be enabled.
* binary: a sequence of binary data.
* boolean: bool.
* byte: i8.
CDCActionKey represents a CDC action.
CheckpointIntervalDeltaConfigKey represents the Delta configuration to specify a checkpoint interval.
CheckpointWriteStatsAsJSONDeltaConfigKey represents the Delta configuration to specify whether stats need to be written as a JSON object in a checkpoint.
CheckpointWriteStatsAsStructDeltaConfigKey represents the Delta configuration to specify whether stats need to be written as a struct in a checkpoint.
ColumnMappingModeDeltaConfigKey represents the Delta configuration to specify whether column mapping needs to be enabled.
CommitInfoActionKey represents a CommitInfo action.
Complete causes the full output (all rows) to be written whenever new data is available.
DataSkippingNumIndexedColsDeltaConfigKey represents the Delta configuration to specify the number of columns for which to collect stats.
* date: A calendar date, represented as a year-month-day triple without a timezone.
DeletedFileRetentionDurationDeltaConfigKey represents the Delta configuration to specify the retention duration of a deleted file.
* double: f64.
EnableChangeDataFeedDeltaConfigKey represents the Delta configuration to specify whether change data feed needs to be enabled.
EnableExpiredLogCleanupDeltaConfigKey represents the Delta configuration to specify whether expired commit logs need be cleaned up.
ErrorIfExists causes an operation to fail if files exist for the target.
* float: f32.
FormatActionKey represents a Format action.
Ignore causes an operation to not proceed or change any data if files exist for the target.
* integer: i32.
IsolationLevelDeltaConfigKey represents the Delta configuration to specify what isolation level to use.
LogRetentionDurationDeltaConfigKey represents the Delta configuration to specify the retention duration of commit logs.
* long // undocumented, i64?.
* map:.
MetaDataActionKey represents a metaData action.
MinReaderVersionDeltaConfigKey represents the Delta configuration tp specify the minimum reader version.
MinWriterVersionDeltaConfigKey represents the Delta configuration to specify the minimum writer version.
Overwrite causes a target location to be overwritten.
ProtocolActionKey represents a Protocol action.
RandomizeFilePrefixesDeltaConfigKey represents the Delta configuration to specify whether file prefixes should be randomized.
RandomPrefixLengthDeltaConfigKey represents the Delta configuration to specify the number of characters generated for random prefixes.
RemoveActionKey represents a Remove action.
SetTransactionRetentionDurationDeltaConfigKey represents the Delta configuration to specify the retention duration of a transaction.
* short: i16.
* string: utf8.
* struct:.
TargetFileSizeDeltaConfigKey represents the Delta configuration to specify the target size of a file.
* timestamp: Microsecond precision timestamp without a timezone.
TransactionActionKey represents a Txn action.
TuneFileSizesForRewritesDeltaConfigKey represents the Delta configuration to specify whether file sizes need to be tuned for rewrites.
Unknown is the schema data type representing an unknown.
Update causes only rows with updates to be written when new or changed data is available.

# Variables

ErrActionJSONFormat is returned when there is an error reading actions from a commit log.
ErrActionUnknown is returned when there is an unknown action in a commit log.
ErrAddGenerateStats is returned when an add action cannot generate stats.
ErrAddZeroSize is returned when an add action has zero size.
ErrCDCNotSupported is returned if a CDC action is seen when generating a checkpoint.
ErrCheckpointAddZeroSize is returned if there is an Add action with size 0 because including this would cause subsequent Optimize operations to fail.
ErrCheckpointAlreadyExists is returned when trying to create a checkpoint but it already exists.
ErrCheckpointEntryMultipleActions is returned if a checkpoint entry has more than one non-null action.
ErrCheckpointIncomplete is returned when trying to read a multi-part checkpoint but not all parts exist.
ErrCheckpointInvalidMultipartFileName is returned when a multi-part checkpoint file has the wrong number of parts in the filename.
ErrCheckpointOptimizationWorkingFolder is returned if there is a problem with the optimization working folder.
ErrCheckpointRowCountMismatch is returned when the checkpoint is generated with a different row count than expected from the table state.
ErrConfigValidation is returned when a Delta configuration cannot be validated.
ErrConvertingCheckpointAdd is returned if there is an error converting an Add action to checkpoint format.
ErrExceededCommitRetryAttempts is returned when the maximum number of commit retry attempts has been exceeded.
ErrFailedToAcknowledgeCommit is returned when a commit fails to be acknowledged.
ErrFailedToCopyTempFile is returned when a temp file fails to be copied into a commit URI.
ErrInvalidVersion is returned when a version is invalid.
ErrLockFailed is returned a lock fails unexpectedly.
ErrMissingMetadata is returned if trying to create a checkpoint with no metadata.
ErrNotATable is returned when a Delta table is not valid.
ErrNotImplemented is returned when a feature has not been implemented.
ErrParseSchema is returned when parsing the schema from JSON fails.
ErrReadingCheckpoint is returned if there is an error reading a checkpoint.
ErrUnableToLoadVersion is returned when a version cannot be loaded.
ErrUnsupportedReaderVersion is returned when a reader version is unsupported.
ErrUnsupportedWriterVersion is returned when a writer version is unsupported.
ErrVersionOutOfOrder is returned if the versions are out of order when loading the table state This would indicate an internal logic error.

# Structs

An Add action is typed to allow the stats_parsed and partitionValues_parsed fields to be written to checkpoints with the correct schema without using reflection.
CDC represents a CDC action.
CheckPoint holds the metadata for a checkpoint file.
CheckpointConfiguration contains additional configuration for checkpointing.
CheckpointEntry contains a single entry in the checkpoint Parquet file All but one of the pointers should be nil.
Create represents a Delta `Create` operation.
Format describes the data format of files in the table.
MetaData represents the action that describes the metadata of the table.
OnDiskTableState contains information about the table state that is stored on disk instead of in memory.
OptimizeCheckpointConfiguration holds settings for optimizing checkpoint read and write operations.
PreparedCommit holds the URI of a temp commit.
Protocol represents the action used to increase the version of the Delta protocol required to read or write to the table.
Remove represents a tombstone (deleted file) in the Delta log.
SchemaField describes a specific field of the Delta table schema.
SchemaTypeArray represents an array field.
SchemaTypeMap represents a map field.
SchemaTypeStruct represents a struct in the schema.
Stats contains statistics about a Parquet file in an Add action.
StreamingUpdate represents a Delta `StreamingUpdate` operation.
Table represents a Delta table.
TableMetaData represents the metadata of a Delta table.
TableState maintains the current known state of a table This is used in reading and generating checkpoints If on-disk optimization is enabled, some of the information here is empty as the state is offloaded to disk to reduce memory use.
Transaction represents a Delta transaction.
TransactionOptions customizes the behavior of a transaction.
Txn represents the action used by streaming systems to track progress using application-specific versions to enable idempotency.
Write represents a Delta `Write` operation.

# Interfaces

Action represents a Delta log action that describes a parquet data file part of the table.
Operation represents the operation performed when creating a new log entry with one or more actions.
SchemaDataType is one of: SchemaDataTypeName | SchemaTypeArray | SchemaTypeMap | SchemaTypeStruct We can't use a union constraint because the type is recursive.

# Type aliases

ActionKey represents a Delta action.
CommitInfo represents a CommitInfo action.
ConfigKey represents a Delta configuration.
GUID is a type alias for a string expected to match a GUID/UUID format.
OutputMode represents the output mode used in streaming operations.
SaveMode represents the save mode used when performing a Operation.
Schema represents the schema of the Delta table.
SchemaDataTypeName contains the string .
StatsDecimal allows us to store decimal stats as a string and write to JSON without quotes.
StatsFloat32 allows us to marshal and unmarshal inf and -inf as strings.
StatsFloat64 allows us to marshal and unmarshal inf and -inf as strings.