Categorygithub.com/purehyperbole/dht
modulepackage
0.0.0-20220912080845-a0ce14d7443e
Repository: https://github.com/purehyperbole/dht.git
Documentation: pkg.go.dev

# README

DHT Go Reference Go Report Card Build Status

A Kademlia DHT implementation for Go with a focus on performance and ease of use. It is not seek to conform to any existing standards or implementations.

  • implements a 160 bit keyspace
  • replication factor (K) of 20
  • wire protocol using flatbuffers
  • SO_REUSEPORT to concurrently handle requests on the same port
  • asynchronous api
  • supports values larger than MTU

Usage

In order to start a cluster of nodes, you will first need a bootstrap node that all other nodes can connect to first. To start a bootstrap node:

func main() {
    cfg := &dht.Config{
        ListenAddress: "127.0.0.1:9000", // udp address to bind to
        Listeners: 4,                    // number of socket listeners, defaults to GOMAXPROCS
        Timeout: time.Minute / 2         // request timeout, defaults to 1 minute
    }

    dht, err := dht.New(cfg)
    if err != nil {
        log.Fatal(err)
    }

    log.Println("bootstrap node started!")
}

Once a bootstrap node is up and runing, you can add other nodes to the network:

func main() {
    cfg := &dht.Config{
        ListenAddress: "127.0.0.1:9001", // udp address to bind to
        BootstrapAddresses: []string{
            "127.0.0.1:9000",
        },
        Listeners: 4,                    // number of socket listeners, defaults to GOMAXPROCS
        Timeout: time.Minute / 2         // request timeout, defaults to 1 minute
    }

    dht, err := dht.New(cfg)
    if err != nil {
        log.Fatal(err)
    }

    log.Println("node started!")
}

From any node you can then store values as follows:

func main() {
    ...

    // helper function to construct a sha1 hash that
    // will be used as the values key
    myKey := dht.Key("my-awesome-key")
    myValue := []byte("my-even-more-awesome-value")

    // stores a value for a given amount of time
    dht.Store(myKey, myValue, time.Hour, func(err error) {
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("successfully stored key: %s -> %s", string(myKey), string(myValue))
    })
}

Once your value is stored, you can retreive it from the network as follows:

func main() {
    ...

    // finds the value. please note it is not safe to use the value outside
    // of the provided callback unless it is copied
    dht.Find(myKey, func(value []byte, err error) {
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("successfully retrieved key: %s -> %s !\n", string(myKey), string(value))
    })
}

OS Tuning

For most linux distros, socket send and receive buffers are set very low. This will almost certainly result in large amounts of packet loss at higher throughput levels as these buffers get overrun.

How large these buffers will need to be will be dependent on your workload, so you should experiment to find the correct value.

You can temporarily increase the of the read and write buffers via sysctl:

# set the rmem and wmem buffers to ~128 MB each
$ sysctl -w net.core.rmem_max=536870912 && sysctl -w net.core.rmem_default=134217728 && sysctl -w net.core.wmem_max=536870912 && sysctl -w net.core.wmem_default=134217728

Development

To re-generate the flatbuffer definitions for the wire protocol:

$ make generate

To run tests:

$ go test -v -race

To run benchmarks:

$ go test -v -bench=.

Implemented

  • routing
  • storage (custom)
  • storage (in-memory)
  • ping
  • store
  • findNode
  • findValue
  • benchmarks
  • node join/leave
  • user defined storage
  • multiple values per store request
  • handles packets larger than MTU
  • multiple values per key
  • batch socket reads and writes
  • peer refresh
  • key refresh
  • latency based route selection

Future Improvements

  • io_uring socket handler
  • storage (persistent)
  • NAT traversal
  • support SO_REUSEPORT on mac/windows
  • configurable logging
  • ntp time

# Packages

No description provided by the author

# Functions

Key creates a new 20 byte key hasehed with sha1 from a string, byte slice or int.
New creates a new dht.
ValuesFrom filters results to only those that were created after a given timestmap this is useful for repeat queries where duplicates ideally should be avoided.

# Constants

K number of nodes in a bucket.
KEY_BITS number of bits in a key.
KEY_BYTES number of bytes in a key.
MaxEventSize the maximum size of an event packet size.
MaxPacketSize the size of packets we will send according to MTU, minus a 8 bytes for the UDP header.
MaxPayloadSize the maximum payload of our packet.
PacketHeaderSize the size of the header we use to reconstruct data.

# Variables

ErrRequestTimeout returned when a pending request has not recevied a response before the TTL period.

# Structs

Config configuration parameters for the dht.
DHT represents the distributed hash table.
FindOption for configuring find requests.
Value represents the value to be stored.

# Interfaces

Storage defines the storage interface used by the DLT.