class CryBase::CouchBase::Services::KV::Client

Overview

Talks the Couchbase memcached binary protocol against a single KV node over plaintext or TLS according to the endpoint. Composed from RequestWriter, ResponseReader, and Bucket mixins.

On construct: connects to endpoint, performs the TLS handshake when endpoint.tls?, sends HELLO (advertising Constants::FEATURE_SELECT_BUCKET), SASL_AUTH (PLAIN), and SELECT_BUCKET for bucket. Then exposes #get / #set / #delete / #touch / counter operations against that bucket.

endpoint = CryBase::CouchBase::Endpoint.new(
  "node1", 11210, CryBase::CouchBase::Service::KV, false
)
kv = KV::Client.new(endpoint, "user", "pass", "default")
kv.set("hello", "world")
kv.get("hello") # => "world".to_slice
kv.delete("hello")
kv.close

For connection strings, use .from_string:

kv = KV::Client.from_string("couchbase://user:pass@node1/default")

Out of scope (deliberate, can be layered on top later):

Included Modules

Defined in:

crybase/couchbase/services/kv/client.cr

Constructors

Instance Method Summary

Constructor Detail

def self.from_string(uri : String, username : String | Nil = nil, password : String | Nil = nil, bucket : String | Nil = nil, connect_timeout : Time::Span = 5.seconds, *, tls_verify : Bool | Nil = nil, tls_hostname : String | Nil = nil, tls_context : OpenSSL::SSL::Context::Client | Nil = nil) : Client #

Builds a KV endpoint from uri, connects, authenticates, and selects a bucket. The first host in the connection string is used.

username, password, and #bucket may be passed explicitly or embedded as couchbase://user:pass@host/bucket. Query parameters currently supported by this helper: tls_verify and tls_hostname.

kv = KV::Client.from_string("couchbases://user:pass@node1:11217/default?tls_verify=false")

[View source]
def self.new(endpoint : Endpoint, username : String, password : String, bucket : String, connect_timeout : Time::Span = 5.seconds, *, tls_verify : Bool = true, tls_hostname : String | Nil = nil, tls_context : OpenSSL::SSL::Context::Client | Nil = nil) #

Connects, optionally performs TLS, then performs HELLO, SASL PLAIN auth, and SELECT_BUCKET, leaving the client ready for KV operations.

tls_verify and tls_hostname apply only when endpoint.tls?. Provide tls_context to use a custom CA or other OpenSSL settings; when supplied, it is used as-is.

Raises:

  • IO::Error / Socket::Error — socket connect failed
  • OpenSSL::SSL::Error — TLS handshake or certificate verification failed
  • KV::AuthFailed — SASL auth or bucket selection denied
  • KV::Error — server returned any other non-success status

The socket is closed and the exception re-raised if any handshake step fails.


[View source]

Instance Method Detail

def bucket : String #

The bucket selected during the construction handshake.


[View source]
def close : Nil #

Closes the underlying socket. Idempotent — safe to call when already closed.


[View source]
def decrement(key : String, delta : UInt64 = 1_u64, initial : UInt64 = 0_u64, expiry : UInt32 = 0_u32) : UInt64 #

Atomically decrements the unsigned integer document at key by delta. Couchbase counters do not go below zero.


[View source]
def delete(key : String) : Nil #

Deletes the document at key. Raises NotFound if absent.

kv.delete("hello")

[View source]
def endpoint : Endpoint #

The Endpoint this client is connected to.


[View source]
def get(key : String, expiry : UInt32 | Nil = nil) : Bytes #

Fetches the document at key. When expiry is provided, fetches and updates the document expiration atomically. Raises NotFound if absent.

bytes = kv.get("user:42")
JSON.parse(String.new(bytes))

[View source]
def get(key : String, type : T.class, expiry : UInt32 | Nil = nil) : T forall T #

Compatibility alias for #get_as(key, type, expiry).


[View source]
def get_as(key : String, type : T.class, expiry : UInt32 | Nil = nil) : T forall T #

Fetches the document at key and decodes it as type.

Use this for values written with #set from a type that includes JSON::Serializable. String and Bytes are decoded without JSON. When expiry is provided, Couchbase fetches the document and updates its expiration atomically.

struct Profile
  include JSON::Serializable

  property name : String
  property score : Int32
end

profile = kv.get_as("user:42", Profile)

[View source]
def increment(key : String, delta : UInt64 = 1_u64, initial : UInt64 = 0_u64, expiry : UInt32 = 0_u32) : UInt64 #

Atomically increments the unsigned integer document at key by delta. If the key is missing, Couchbase creates it with initial and applies expiry.


[View source]
def set(key : String, value : String | Bytes, expiry : UInt32 = 0_u32) : UInt64 #

Stores value at key with optional expiry (seconds, or unix timestamp if greater than 30 days). Returns the new CAS token.

cas = kv.set("hello", "world")
cas = kv.set("hello", "world".to_slice, expiry: 60_u32)

[View source]
def set(key : String, value : T, expiry : UInt32 = 0_u32) : UInt64 forall T #

Stores value at key, encoding JSON::Serializable values as JSON and all other non-raw values with to_s. Returns the new CAS token.

Use get_as(key, Type) to load JSON-backed objects back into their original type. String and Bytes are stored unchanged by the raw overload above.

struct Profile
  include JSON::Serializable

  property name : String
  property score : Int32
end

cas = kv.set("user:42", Profile.new("ada", 42))

[View source]
def touch(key : String, expiry : UInt32) : UInt64 #

Updates the document expiration without changing its value. Returns the new CAS token.


[View source]