package
4.2.1
Repository: https://github.com/datadog/kafka-kit.git
Documentation: pkg.go.dev

# README

Overview

This package provides a ZooKeeper backed, coarse grained distributed lock. The lock path is determined at instantiation time. At request time, locks are enqueued and block until the lock is either acquired or the context deadline is met. Locks can be configured with an optional TTL.

Further implementation notes:

  • Locks are enqueued and granted in order as locks ahead are relinquished or timed out.
  • Session timeouts/disconnects are handled through ZooKeeper sessions with automatic cleanup; locks that fail to acquire before the context timeout are removed from the queue even if the lock session is still active.
  • Setting a ZooKeeperLockConfig.TTL value > 0 enables lock TTLs. Take note that TTL expirations are handled at request time from contending locks; if service A is not using TTLs and service B is, service B can forcibly abort service A locks.

Examples

Example implementation in zookeeper-example:

package main

import (
	"context"
	"flag"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	zklocking "github.com/DataDog/kafka-kit/v4/cluster/zookeeper"
)

func main() {
	timeout := flag.Duration("timeout", 3*time.Second, "lock wait timeout")
	owner := flag.String("owner", "user1", "the lock owner ID")
	flag.Parse()

	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

	// Init a Lock.
	cfg := zklocking.ZooKeeperLockConfig{
		Address:  "localhost:2181",
		Path:     "/my/locks",
		TTL: 30000,
		OwnerKey: "owner",
	}

	lock, _ := zklocking.NewZooKeeperLock(cfg)
	ctx, c := context.WithTimeout(context.WithValue(context.Background(), "owner", *owner), *timeout)
	defer c()

	// Try the lock.
	if err := lock.Lock(ctx); err != nil {
		log.Println(err)
	} else {
		log.Println("I've got the lock!")
		defer log.Println("I've released the lock")
		defer lock.Unlock(ctx)
	}

	<-sigs
}

Running the test in two terminals:

% ./zookeeper-example
2021/12/08 10:46:31 I've got the lock!
% ./zookeeper-example
2021/12/08 10:46:39 attempt to acquire lock timed out

Same test, exiting the first lock while the second lock is waiting:

% ./zookeeper-example
2021/12/08 10:46:58 I've got the lock!
^C2021/12/08 10:47:00 I've released the lock
% ./zookeeper-example
2021/12/08 10:47:00 I've got the lock!

Ephemeral lock znodes visible at the configured path:

[zk: localhost:2181(CONNECTED) 8] ls /my/locks
[_c_83c1bcf372c265e9ac7ee364e5d3bac5-lock-0000000027, _c_979cb11f40bb3dbc6908edeaac8f2de1-lock-0000000028]
[zk: localhost:2181(CONNECTED) 9] ls /my/locks
[_c_64c30aea1b15839542824a7b47d49ce3-lock-0000000029]

# Packages

No description provided by the author

# Functions

NewZooKeeperLock returns a ZooKeeperLock.
NewZooKeeperLock takes a ZooKeeperLockConfig and ZooKeeperClient and returns a ZooKeeperLock.

# Variables

ErrAlreadyOwnLock is returned if Lock is called with a context holding an OwnerKey equal to that of an active lock.
ErrInvalidSeqNode is returned when sequential znodes are being parsed for a trailing integer ID, but one isn't found.
ErrLockingTimedOut is returned when a lock couldn't be acquired by the context deadline.
ErrNotLockOwner is returned when Unlock is attempting to be called where the requestor's OwnerKey value does not equal the current lock owner.
ErrOwnerAlreadySet is returned when SetOwner is being called on a lock where the owner field is non-nil.

# Structs

ErrExpireLockFailed is returned when a lock with an expired TTL fails to purge.
ErrLockingFailed is a general failure.
ErrUnlockingFailed is a general failure.
LockEntries is a container of locks.
ZooKeeperLock implements a Lock.
ZooKeeperLockConfig holds ZooKeeperLock configurations.

# Interfaces

ZooKeeperClient interface.