Categorygithub.com/go-teal/teal
repository
0.2.1
Repository: https://github.com/go-teal/teal.git
Documentation: pkg.go.dev

# Packages

No description provided by the author
No description provided by the author

# README

Teal

In the changing field of data engineering, having strong, scalable, and user-friendly tools is essential. We introduce Teal, a new open-source ETL tool designed to improve your data transformation and orchestration.

Teal combines the best features of tools like dbt, Dagster, and Airflow, while solving common problems found in traditional Python-based solutions. Our goal is to provide data engineers and analysts with a powerful, easy-to-use platform that simplifies complex workflows and increases productivity.

Why Choose Teal?

  • Scalable Architecture: Easily scale your data pipelines to handle datasets of any size, ensuring high performance and reliability.
  • Flexible Integration: Integrate smoothly with various data sources and destinations, offering great flexibility and connectivity.
  • Optimized Performance with Go: Teal uses Go's concurrency model with goroutines and channels to maximize performance and efficiency. This ensures your data pipelines run quickly and reliably, making the best use of your system's resources.
  • Go Stack Advantage: Built on the efficient Go stack, Teal offers high performance, low latency, and excellent scalability. The simplicity and power of Go provide a solid foundation for managing complex ETL workflows.

QuickStart

Installation

go install github.com/go-teal/teal/cmd/teal@latest

Creating your project

mkdir my_test_project
cd my_test_project

Init your project from scratch

teal init
❯ ls -al
total 16
drwxr-xr-x@ 6 wwtlf  wwtlf  192 24 Jun 21:23 .
drwxr-xr-x  5 wwtlf  wwtlf  160 24 Jun 21:21 ..
drwxr-xr-x@ 3 wwtlf  wwtlf   96 24 Jun 07:46 assets
-rw-r--r--@ 1 wwtlf  wwtlf  302 24 Jun 07:51 config.yaml
drwxr-xr-x@ 2 wwtlf  wwtlf   64 24 Jun 20:03 docs
-rw-r--r--@ 1 wwtlf  wwtlf  137 24 Jun 07:46 profile.yaml

Update config.yaml

version: '1.0.0'
module: github.com/my_user/my_test_project
connections:
  - name: default
    type: duckdb
    config:
      path: ./store/test.duckdb            
      extensions:
        - postgres
        - httpfs         
      # extraParams: 
      #   - name: "name"
      #     value: "value"
  1. module param will be used as a module in go.mod
  2. Make sure the dir from the path exists.

Update profile.yaml

version: '1.0.0'
name: 'my-test-project'
connection: 'default'
models: 
  stages:
    - name: staging
    - name: dds  
    - name: mart
  1. name will be used as a name for the binary file

Generate go project

teal gen

You'll see the following outpout

project-path: .
config-file: ./config.yaml
Building: staging.addresses.sql
Building: staging.transactions.sql
Building: staging.wallets.sql
Building: dds.dim_addresses.sql
Building: dds.fact_transactions.sql
Building: mart.mart_wallet_report.sql
Files 10
./cmd/my-test-project/main._go .................................................. [OK]
./go.mod ........................................................................ [OK]
./internal/assets/staging.addresses.go .......................................... [OK]
./internal/assets/staging.transactions.go ....................................... [OK]
./internal/assets/staging.wallets.go ............................................ [OK]
./internal/assets/dds.dim_addresses.go .......................................... [OK]
./internal/assets/dds.fact_transactions.go ...................................... [OK]
./internal/assets/mart.mart_wallet_report.go .................................... [OK]
./internal/assets/configs.go .................................................... [OK]
./docs/graph.wsd ................................................................ [OK]

Your DAG is depicted in the PlantUML file graph.wsd DAG

  1. Rename main._go to my-test-project.go
  2. Uncomment the following line: _ "github.com/marcboeker/go-duckdb" in my-test-project.go.
  3. Run go mod tidy
  4. Final project structure:
.
├── assets
│   └── models
│       ├── dds
│       │   ├── dim_addresses.sql
│       │   └── fact_transactions.sql
│       ├── mart
│       │   └── mart_wallet_report.sql
│       └── staging
│           ├── addresses.sql
│           ├── transactions.sql
│           └── wallets.sql
├── cmd
│   └── my-test-project
│       └── main.go
├── config.yaml
├── docs
│   └── graph.wsd
├── go.mod
├── go.sum
├── internal
│   └── assets
│       ├── configs.go
│       ├── dds.dim_addresses.go
│       ├── dds.fact_transactions.go
│       ├── mart.mart_wallet_report.go
│       ├── staging.addresses.go
│       ├── staging.transactions.go
│       └── staging.wallets.go
├── profile.yaml
└── store
    ├── addresses.csv    
    ├── transactions.csv
    └── wallets.csv

Run your project

go run ./cmd/my-test-project

Explore my-test-project.go

package main

import (
  _ "github.com/marcboeker/go-duckdb"

  "fmt"
  "os"

  "github.com/rs/zerolog"
  "github.com/rs/zerolog/log"

  "github.com/go-teal/teal/pkg/core"
  "github.com/go-teal/teal/pkg/dags"
  "github.com/my_user/my_test_project/internal/assets"
)

func main() {
  log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
  fmt.Println("my-test-project")
  core.GetInstance().Init("config.yaml", ".")
  config := core.GetInstance().Config
  dag := dags.InitChannelDag(assets.DAG, assets.ProjectAssets, config, "instance 1")
  wg := dag.Run()
  result := <-dag.Push("TEST", nil, make(chan map[string]interface{}))
  fmt.Println(result)
  dag.Stop()
  wg.Wait()
}

What this code does:

  1. dag.Run() builds a DAG based on Ref from your .sql models, where each node is an asset and each edge is a GO channel.
  2. result := <-dag.Push("TEST", nil, make(chan map[string]interface{})) triggers the execution of this DAG synchronously.
  3. dag.Stop() sends the deactivation command.

Configuration

config.yaml

version: '1.0.0'
module: github.com/my_user/my_test_project
connections:
  - name: default
    type: duckdb
    config:
      path: ./store/test.duckdb            
      extensions:
        - postgres
        - httpfs         
      # extraParams: 
      #   - name: "name"
      #     value: "value"
  1. Teal supports multiple connections.

  2. The following databases are supported at the moment (v0.2.1):

    • DuckDB, see the specific config params.
    • PostgreSQL, see the specific config params.
ParamTypeDescription
versionString constant1.0.0
moduleStringGenerated Go module name
connectionsArray of objectsArray of database connections
connections.nameStringName of the connection used in the model profile
connections.typeStringDriver name of the database connection duckdb, postgres

profile.yaml

version: '1.0.0'
name: 'my-test-project'
connection: 'default'
models: 
  stages:
    - name: staging
      models:
        - name: model1
        # see models pfofiles
          tests:
            - name: "test.name"
            # see test pfofiles
    - name: dds  
    - name: mart
      models:
        - name: custom_asset
          materialization: 'raw'
          connection: 'default'
          raw_upstreams:
            - "dds.model1"
            - "dds.model2"
ParamTypeDescription
versionString constant1.0.0
nameStringGenerated folder name for main.go.
connectionStringConnection from config.yaml by default.
models.stagesArray of stagesList of stages for models. For each stage, a folder assets/models/<stage name> must be created in advance.
models.stagesSee: Model Profile
models.stages.name: <stage name>.models.<name: model name>.testsSee: Test ProfileTest cases defined in the model profiles are executed immediately after the execution of the model itself.
models.stages.name: <stage name>.models.<name: model name>.raw_upstreamsSee: Raw assetsA list of upstreams that supply data to this raw asset or that must be executed before this asset is run.

Model Profile

The asset profile can be specified via the profile.yaml file or via a GO template in your sql model file in the sub-template {{ define "profile.yaml" }} ... {{ end }}:

{{ define "profile.yaml" }}
    connection: 'default'
    materialization: 'table'  
    is_data_framed: true
    primary_key_fields:
      - "id"
    indexes:
      - name: "wallet"
        unique: false
        fields:
          - "wallet_id"
{{ end }}

select
    id,
    wallet_id,
    wallet_address,
    currency
    from read_csv('store/addresses.csv',
    delim = ',',
    header = true,
    columns = {
        'id': 'INT',
        'wallet_id': 'VARCHAR',
        'wallet_address': 'VARCHAR',
        'currency': 'VARCHAR'}
    )
ParamTypeDefault valueDescription
nameStringfilenameThe model name must match the file name, disregarding the system extension (.sql).
connectionStringprofile.connectionThe connection name from config.yaml.
materializationStringtableSee Materializations.
is_data_framedbooleanfalseSee Cross-database references.
persist_inputsbooleanfalseSee Cross-database references.
primary_key_fieldsArray of stringList of fields for the primary unique index
indexesArray of IndexesList of indexes for the asset (only for the table and incremental materializations)
indexes.<name: IndexName>StringName of the index
indexes.<name: IndexName>.Uniquebooleanfalseflag of the uniqueness of the Index
indexes.<name: IndexName>.fieldsArray of stringList of fields for the index

Materializations

MaterializationsDescription
tableThe result of an SQL query execution is stored in the table corresponding to the model name. If the table does not exist, it will be created. If the table already exists, it will be cleared using the truncate method.
incrementalThe result of the query execution is added to the existing table. If the table does not exist, it will be created.
viewThe SQL query is saved as a view.
customA custom SQL query is executed; no tables or views are created.
rawA custom Go function is executed.

Template functions

Static and dynamic functions

Functions in double braces {{ Ref "staging.model" }} are static, i.e. values are substituted at the moment of project generation. Functions in triple braces {{{ Ref "staging.model" }}} are dynamic, i.e. they are executed at the moment of activation of your asset. After project generation, triple brackets are replaced by double brackets in the source code of assetts

List of functions

Native available functions:

FunctionInput ParametersOutput dataDescription
Ref"<staging name>.<model name>"stringRef is the main function on which the DAG is based. It points to the model that will be replaced by the table name after the template is executed.
thisNonestringThe this function returns the name of the current table.
IsIncrementalNonebooleanThe IsIncremental function returns a flag indicating whether the model is being executed in incremental mode.

Databases

DuckDB

  1. Specific config params:
ParamTypeDescription
connections.typeStringduckdb
extensionsArray of stringsList of DuckDB extensions. Extensions will be installed during the creation of the database and loaded before the asset execution.
pathStringPath to the DuckDB database file.
path_envStringEnvironment variable that contains the path to the data file. If set, the path setting is ignored
extraParamsObjectPairs of name-value parameters for DuckDB configuration.

PostgreSQL

  1. Specific config params:
ParamTypeDescription
connections.typeStringpostgres
hostStringThe hostname or IP address of the PostgreSQL server.
host_envStringThe environment variable name for the PostgreSQL server hostname or IP address.
portStringThe port number on which the PostgreSQL server is running. Default is typically 5432.
port_envStringThe environment variable name for the PostgreSQL server port number.
databaseStringThe name of the database to connect to on the PostgreSQL server.
database_envStringThe environment variable name for the PostgreSQL database name.
userStringThe username for authenticating to the PostgreSQL server.
user_envStringThe environment variable name for the PostgreSQL username.
passwordStringThe password for authenticating to the PostgreSQL server.
password_envStringThe environment variable name for the PostgreSQL password.
db_root_certStringPath to the root certificate file for SSL connections to the PostgreSQL server.
db_root_cert_envStringThe environment variable name for the path to the root certificate file for SSL connections.
db_certStringPath to the client certificate file for SSL connections to the PostgreSQL server.
db_cert_envStringThe environment variable name for the path to the client certificate file for SSL connections.
db_keyStringPath to the client key file for SSL connections to the PostgreSQL server.
db_key_envStringThe environment variable name for the path to the client key file for SSL connections.
db_sslnmodeStringThe SSL mode for connections to the PostgreSQL server. Options include disable, require, verify-ca, and verify-full.
db_sslnmode_envStringThe environment variable name for specifying the SSL mode for connections.

General Architecture

Classes

Cross database references

Cross-database references allow seamless queries to be executed, enabling the retrieval of results from an asset connected to another database, even if it uses a different database driver.

The following two model profile parameters control cross-database references:

  • is_data_framed: When this flag is set to True, the result of the query execution is saved to the gota.DataFrame structure. This structure is then passed to the next node in your DAG.
  • persist_inputs: When this flag is set to True, all incoming parameters in the form of a gota.DataFrame structure are saved to a temporary table in the database connection configured in the model profile's connection parameter. You don't need to modify the reference to the asset for this to happen.

cross-db-ref

Raw Assets

Raw assets are custom functions written in Go that can accept and return dataframes and contain any other custom logic.

Raw assets must implement the following function interface:

type ExecutorFunc func(input map[string]interface{}, modelProfile *configs.ModelProfile) (interface{}, error)

Retrieving a dataframe from an upstream is done as follows:

df := input["dds.model1"].(*dataframe.DataFrame)

At the same time, the is_data_framed flag must be set in the upstream asset.
A custom asset can return a dataframe, which can then be seamlessly (see: Cross-database references) used in an SQL query or another custom dataframe.

Registration and declaration of a raw asset

A raw asset must be registered in the main function.

processing.GetExecutors().Execurots["<staging>.<asset name>"] = youPackage.YouRawAssetFunction

Upstream dependencies in a DAG are set through the raw_upstreams parameters in the model profile (see: profile.yaml).

Data testing

Simple model testing

Simple tests verify data integrity after processing an SQL query, which should return the number of rows. If the returned count is zero, the test is considered successfully passed.

Tests for models should be added to the folder: assets/tests or assets/tests/<stage name>.

Example:

{{- define "profile.yaml" }}
    connection: 'default'          
{{-  end }}
select pk_id, count(pk_id) as c from {{ Ref "dds.fact_transactions" }} group by pk_id having c > 1

The generated source code for testing is located in the modeltests package.
To call all test cases, add the following line to your main.go file: modeltests.TestAll().

Test cases defined in the model profiles are executed immediately after the execution of the model itself.
For the tests to be executed immediately after the models, the DAG must be initialized with the following command:
dag := dags.InitChannelDagWithTests(assets.DAG, assets.ProjectAssets, modeltests.ProjectTests, config, "instance 1")

Test profile

ParamTypeDefault valueDescription
connectionStringprofile.connectionThe connection name from config.yaml.

Road Map

see CHANGELOG.md

[0.3.0+]

Features

  • Advanced Tests
  • Seeds
  • Database Sources
  • Pre/Post-hooks
  • Embedded UI Dashboard
  • DataVault

Database support

  • MySQL
  • ClickHouse
  • SnowFlake
  • Apache Spark

Workflow

  • Temporal.io
  • Kafka Distributed

Contact