module
0.0.0-20250111101459-8043eea4c44c
Repository: https://github.com/genc-murat/crystalcache.git
Documentation: pkg.go.dev
# README
CrystalCache
CrystalCache is a high-performance, Redis-compatible in-memory data store and cache implementation in Go. It provides a robust, concurrent-safe caching solution with comprehensive support for advanced features including replication, ACL management, and various specialized data structures.
Table of Contents
- Features
- Architecture
- Installation
- Configuration
- Usage Examples
- Data Types and Commands
- Security
- Persistence
- Replication
- Monitoring
- Performance Optimization
- Client Management
- Advanced Features
- Troubleshooting
- Contributing
Features
Core Features
-
Master-Slave Replication
- Asynchronous replication support
- Automatic failover capabilities
- Full and incremental synchronization
- Replica read-only mode
-
Access Control Lists (ACL)
- Fine-grained permission management
- Command-level access control
- User authentication and authorization
- Default user configuration
-
Transaction Support
- MULTI/EXEC/DISCARD commands
- Optimistic locking with WATCH
- Atomic operations
- Pipeline support for batch operations
-
Persistence
- Append-Only File (AOF) persistence
- Configurable sync intervals
- Automatic recovery on startup
- Background saving
Data Structures
Basic Data Types
-
Strings
- Binary-safe strings
- Maximum size: 512MB
- Atomic increment/decrement
- Bit operations
-
Lists
- Linked lists
- Both ends insertion/extraction
- Blocking operations
- Range operations
-
Sets
- Unordered collections
- Set operations (union, intersection, difference)
- Random member selection
- Membership testing
-
Hashes
- Field-value pairs
- Incremental operations
- Multiple field operations
- Field existence testing
-
Sorted Sets
- Score-based ordering
- Range operations
- Lexicographical operations
- Aggregation operations
Advanced Data Types
-
Streams
- Append-only log structures
- Consumer groups
- Message acknowledgment
- Range queries
-
Bitmaps
- Bit-level operations
- Space-efficient storage
- Counting and finding operations
- Bitfield operations
-
JSON
- Document storage
- Path-based operations
- Array manipulations
- Atomic updates
-
Time Series
- Time-based data storage
- Aggregation functions
- Retention policies
- Downsampling
Probabilistic Data Structures
-
HyperLogLog
- Cardinality estimation
- Merge operations
- Standard error: 0.81%
-
Bloom Filters
- Membership testing
- False positive rate configuration
- Space-efficient storage
-
Count-Min Sketch
- Frequency estimation
- Configurable accuracy
- Memory-efficient counting
-
Cuckoo Filters
- Item deletion support
- Better space efficiency than Bloom filters
- Configurable false positive rate
-
Top-K
- Heavy hitters tracking
- Count-based filtering
- Decay support
-
T-Digest
- Quantile approximation
- Merge operations
- Configurable compression
Spatial Data Structures
- Geospatial Indexes
- Location-based queries
- Radius searches
- Distance calculations
- Geohash support
Architecture
Core Components
-
Client Handler
- Connection management
- Protocol parsing (RESP)
- Command queuing
-
Command Router
- Command validation
- ACL enforcement
- Handler dispatch
-
Storage Engine
- Memory management
- Data structure implementation
- Index management
-
Persistence Layer
- AOF writing
- Recovery management
- Sync coordination
Installation
From Source
# Clone the repository
git clone https://github.com/genc-murat/crystalcache.git
# Change to project directory
cd crystalcache
# Install dependencies
go mod download
# Build
go build -o crystalcache cmd/main.go
# Run tests
go test ./...
Using Go Get
go get github.com/genc-murat/crystalcache
Configuration
Server Configuration
type ServerConfig struct {
MaxConnections int // Maximum number of concurrent connections
ReadTimeout time.Duration // Read timeout for client connections
WriteTimeout time.Duration // Write timeout for client connections
IdleTimeout time.Duration // Idle timeout for client connections
}
Connection Pool Configuration
type Config struct {
InitialSize int // Initial number of connections in pool
MaxSize int // Maximum number of connections
ReadTimeout time.Duration // Read timeout for pooled connections
WriteTimeout time.Duration // Write timeout for pooled connections
IdleTimeout time.Duration // How long connections can remain idle
RetryAttempts int // Number of retry attempts for failed operations
RetryDelay time.Duration // Delay between retry attempts
}
AOF Configuration
type AOFConfig struct {
Path string // Path to AOF file
SyncInterval time.Duration // Interval between fsync operations
BufferSize int // Write buffer size
Enabled bool // Whether AOF is enabled
}
Complete Server Setup Example
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/genc-murat/crystalcache/internal/cache"
"github.com/genc-murat/crystalcache/internal/pool"
"github.com/genc-murat/crystalcache/internal/server"
"github.com/genc-murat/crystalcache/internal/storage"
)
func main() {
// Server configuration
serverConfig := server.ServerConfig{
MaxConnections: 1000,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 60 * time.Second,
}
// Initialize cache with defragmentation
memCache := cache.NewMemoryCache()
memCache.StartDefragmentation(5*time.Minute, 0.25)
// Configure AOF persistence
aofConfig := storage.DefaultAOFConfig()
aofConfig.Path = "database.aof"
aofConfig.SyncInterval = 2 * time.Second
aofStorage, err := storage.NewAOF(aofConfig)
if err != nil {
log.Fatal(err)
}
defer aofStorage.Close()
// Initialize server
server := server.NewServer(memCache, aofStorage, nil, serverConfig)
server.SetMaster(true)
// Configure connection pool
poolConfig := pool.Config{
InitialSize: 10,
MaxSize: 100,
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
IdleTimeout: 60 * time.Second,
RetryAttempts: 3,
RetryDelay: 100 * time.Millisecond,
}
factory := pool.NewConnFactory("localhost:6379", 5*time.Second)
connectionPool, err := pool.NewConnectionPool(poolConfig, factory.CreateConnection)
if err != nil {
log.Fatal(err)
}
defer connectionPool.Close()
server.SetConnectionPool(connectionPool)
// Setup metrics endpoint
go func() {
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
metrics := server.GetMetrics()
json.NewEncoder(w).Encode(metrics)
})
log.Printf("Metrics server starting on :2112")
if err := http.ListenAndServe(":2112", nil); err != nil {
log.Printf("Metrics server error: %v", err)
}
}()
// Start the server
go server.Start(":6379")
// Graceful shutdown
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("Shutting down server...")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.Printf("Error during shutdown: %v", err)
}
}
Usage Examples
Basic Operations
String Operations
// SET command
SET("key", "value")
SET("key", "value", "EX", 60) // With expiration
// GET command
value := GET("key")
// INCR command
newValue := INCR("counter")
List Operations
// LPUSH command
LPUSH("list", "value1", "value2")
// LRANGE command
values := LRANGE("list", 0, -1)
// BRPOP command
value := BRPOP("list", 5) // With timeout
Hash Operations
// HSET command
HSET("hash", "field1", "value1", "field2", "value2")
// HGET command
value := HGET("hash", "field1")
// HINCRBY command
newValue := HINCRBY("hash", "counter", 1)
Advanced Operations
Transaction Example
MULTI()
SET("key1", "value1")
INCR("counter")
result := EXEC()
Pipeline Example
pipeline := client.Pipeline()
pipeline.SET("key1", "value1")
pipeline.INCR("counter")
pipeline.EXEC()
Geospatial Operations
// Add locations
GEOADD("locations", -122.27652, 37.80574, "place1")
GEOADD("locations", -122.27652, 37.80574, "place2")
// Find nearby locations
nearby := GEORADIUS("locations", -122.27652, 37.80574, 5, "km")
JSON Operations
// Set JSON document
JSON.SET("user", ".", {
"name": "John",
"age": 30,
"address": {
"city": "New York"
}
})
// Get specific path
city := JSON.GET("user", "$.address.city")
// Array operations
JSON.ARRAPPEND("user", "$.hobbies", "reading")
Security
ACL Configuration
// Create user with specific permissions
server.ACL("SETUSER", "myuser", "on", "allcommands", "allkeys")
// Create user with limited permissions
server.ACL("SETUSER", "readonly", "on", "get", "~keys:*")
// Remove user
server.ACL("DELUSER", "myuser")
Authentication Example
// Authenticate with username and password
AUTH("username", "password")
// Old-style authentication
AUTH("password")
Replication
Master Setup
server.SetMaster(true)
Slave Setup
// Connect to master
server.StartReplication("master-host", "6379")
// Check replication status
info := server.GetReplicationInfo()
Replication Monitoring
// Get replication status
status := server.GetReplicationInfo()
// Check connected slaves
count := server.GetReplicaCount()
Monitoring
Metrics Endpoint
// Setup metrics endpoint
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
metrics := server.GetMetrics()
json.NewEncoder(w).Encode(metrics)
})
Available Metrics
- Connection statistics
- Command statistics
- Memory usage
- Replication status
- Cache hit/miss ratio
- Operation latencies
Performance Optimization
Memory Management
- Regular defragmentation
- Memory-efficient data structures
- Lazy deletion of expired keys
Connection Management
- Connection pooling
- Keep-alive settings
- Timeout configurations
Command Processing
- Pipeline support
- Batch operations
- Optimized data structures
Troubleshooting
Common Issues
- Connection Issues
ERROR: Connection refused
SOLUTION: Check if server is running and port is correct
- Memory Issues
ERROR: Out of memory
SOLUTION: Configure appropriate maxmemory and policy
- Replication Issues
ERROR: Replication sync failed
SOLUTION: Check network connectivity and authentication
License
This project is licensed under the MIT License - see the LICENSE file for details.
Contact
For questions and feedback, please:
- Open an issue on GitHub