class Memo::Service

Overview

Main service class for semantic search operations

Encapsulates configuration and provides clean API for indexing and search.

Quick Start

# Initialize with default service (mock, preloaded)
memo = Memo::Service.new(db_path: "/var/data/memo.db")

# Configure a real service
memo.create_service(
  name: "openai",
  format: "openai",
  model: "text-embedding-3-small",
  dimensions: 1536,
  max_tokens: 8191
)
memo.set_default_service("openai")

# Switch to it
memo.use_service("openai", api_key: ENV["OPENAI_API_KEY"])

# Index documents
memo.index(source_type: "event", source_id: 123, text: "Document text...")

# Search
results = memo.search(query: "search query", limit: 10)

# Clean up
memo.close

Service Configuration

Services are named configurations for embedding providers:

memo.create_service(
  name: "azure-prod",
  format: "openai",
  base_url: "https://mycompany.openai.azure.com/",
  model: "text-embedding-ada-002",
  dimensions: 1536,
  max_tokens: 8191
)

# Switch services at runtime
memo.use_service("azure-prod", api_key: ENV["AZURE_API_KEY"])

Database

Memo stores all data in a single SQLite file at the provided #db_path. Contains: services, embeddings, chunks, projections, texts, queue

Defined in:

memo/service.cr

Constructors

Instance Method Summary

Constructor Detail

def self.new(db_path : String, api_key : String | Nil = nil, service : String | Nil = nil, format : String | Nil = nil, base_url : String | Nil = nil, model : String | Nil = nil, dimensions : Int32 | Nil = nil, max_tokens : Int32 | Nil = nil, chunking_max_tokens : Int32 = 2000, store_text : Bool = true, build_vocab : Bool = true, batch_size : Int32 = 100, max_retries : Int32 = 3, table_prefix : String = "memo_") #

Initialize service with database path

Use EITHER:

  • service: Name of pre-configured service (from ServiceProvider.create)
  • format: API format ("openai", "mock") to configure inline

Required:

  • db_path: Full path to database file (e.g., "/var/data/memo.db")
  • api_key: API key (not needed for mock format)

Optional:

  • service: Name of pre-configured service to use
  • format: API format for inline configuration (default "openai")
  • base_url: Custom API endpoint (for OpenAI-compatible APIs)
  • model: Embedding model (default depends on format)
  • dimensions: Vector dimensions (auto-detected from model)
  • max_tokens: Token limit (auto-detected from model)
  • store_text: Enable text storage in texts table (default true)
  • build_vocab: Enable incremental vocabulary building (default true, requires store_text)
  • chunking_max_tokens: Max tokens per chunk (default 2000)

Example:

memo = Memo::Service.new(
  db_path: "/var/data/memo.db",
  format: "openai",
  api_key: ENV["OPENAI_API_KEY"]
)

[View source]
def self.new(db : DB::Database, api_key : String | Nil = nil, service : String | Nil = nil, format : String | Nil = nil, base_url : String | Nil = nil, model : String | Nil = nil, dimensions : Int32 | Nil = nil, max_tokens : Int32 | Nil = nil, chunking_max_tokens : Int32 = 2000, store_text : Bool = true, build_vocab : Bool = true, batch_size : Int32 = 100, max_retries : Int32 = 3, db_path : String | Nil = nil, table_prefix : String = "memo_") #

Initialize service with existing database connection

Use this when caller manages the connection lifecycle. Caller is responsible for closing the connection.

The db_path is queried from the database for CLI use. Pass db_path explicitly if the pragma query doesn't work for your setup.


[View source]

Instance Method Detail

def batch_size : Int32 #

[View source]
def build_vocab(batch_size : Int32 = 2000, clear_existing : Bool = true) : Int32 #

Build vocabulary from indexed content

Extracts unique words from all stored texts, embeds them in batches, and stores in the vocab table for word-level similarity search.

Requires text storage to be enabled.

Options:

  • batch_size: Number of words to embed per API call (default 2000)
  • clear_existing: Whether to clear existing vocab first (default true)

Returns number of words stored

Example:

memo.build_vocab()
results = memo.like("database")

[View source]
def build_vocab? : Bool #

Track whether vocab building is enabled


[View source]
def chunking_config : Config::Chunking #

[View source]
def clear_completed_queue : Int32 #

Clear completed items from the queue

Removes successfully processed items (status = 0). Returns number of items removed.


[View source]
def clear_queue : Int32 #

Clear all items from the queue

Removes all items regardless of status. Returns number of items removed.


[View source]
def clear_vocab #

Clear vocabulary for current service


[View source]
def close #

Close database connection

Should be called when done with service to free resources. Safe to call multiple times.

Note: If service was initialized with an existing db connection, close is a no-op (caller owns the connection).


[View source]
def create_service(name : String, format : String, model : String, dimensions : Int32, max_tokens : Int32, base_url : String | Nil = nil) : ServiceProvider::Info #

Create a new service configuration

Creates a named service configuration that can be used later with Service.new(service: "name", ...).

Example:

memo.create_service(
  name: "azure-prod",
  format: "openai",
  base_url: "https://mycompany.openai.azure.com/",
  model: "text-embedding-ada-002",
  dimensions: 1536,
  max_tokens: 8191
)

[View source]
def db : DB::Database #

[View source]
def db_path : String | Nil #

[View source]
def default_service : ServiceProvider::Info | Nil #

Get the default service configuration


[View source]
def delete(source_id : ExternalId, source_type : String | Nil = nil) : Int32 #

Delete all chunks for a source

Removes all chunks with the given source_id (and optionally source_type). Orphaned embeddings (not referenced by any chunk) are also cleaned up.

Returns number of chunks deleted.

source_id: External source ID (Int64 or String). source_type: Optional filter to only delete chunks with matching source_type. If nil and source_id is Int64, searches integer IDs across all types. If nil and source_id is String, searches string IDs across all types.


[View source]
def delete_service(name : String, force : Bool = false) : Bool #

Delete a service configuration

By default, fails if the service has any associated embeddings. Use force: true to delete the service and all associated data.

Returns true if deleted, false if not found.


[View source]
def dimensions : Int32 #

[View source]
def enqueue(source_type : String, source_id : ExternalId, text : String, pair_id : ExternalId | Nil = nil, parent_id : ExternalId | Nil = nil) #

Enqueue a document for later embedding

Adds the document to the embed_queue table without embedding it. Use process_queue to embed queued items.

If the source is already in the queue, the text is updated.


[View source]
def enqueue(doc : Document) #

Enqueue a document (Document overload)


[View source]
def enqueue_batch(docs : Array(Document)) #

Enqueue multiple documents for later embedding

More efficient than calling enqueue() multiple times. Resolves all external IDs to internal IDs in a single transaction.


[View source]
def file_count : Int64 #

Count indexed files


[View source]
def get_file(path : String) : Files::FileRecord | Nil #

Get file record by path


[View source]
def get_file_by_hash(hash : Bytes) : Files::FileRecord | Nil #

Get file record by content hash


[View source]
def get_file_by_source(source_id : Int64) : Files::FileRecord | Nil #

Get file record by source ID


[View source]
def get_service(name : String) : ServiceProvider::Info | Nil #

Get a service configuration by name

Returns nil if not found.


[View source]
def get_source_text(internal_source_id : Int64) : String | Nil #

Get source text by internal source_id


[View source]
def index(source_type : String, source_id : ExternalId | Nil, text : String, pair_id : ExternalId | Nil = nil, parent_id : ExternalId | Nil = nil) : Int32 #

Index a document

Enqueues the document and processes it immediately with retry support. Returns number of chunks successfully stored.

Supports both integer and string source IDs:

  • Int64: Time-based, sortable IDs (e.g., Unix timestamps)
  • String: UUIDs and other text identifiers

When source_id is nil, memo creates and manages the source internally. Useful for CLI and cases where external ID correlation isn't needed.


[View source]
def index(doc : Document) : Int32 #

Index a document (Document overload)

Convenience method that accepts a Document struct.


[View source]
def index_batch(docs : Array(Document)) : Int32 #

Index multiple documents in a batch

Enqueues all documents and processes them with retry support. More efficient than calling index() multiple times.

Returns total number of documents successfully processed.


[View source]
def index_files(root : String, ignore_file : String = ".gitignore", incremental : Bool = true, dry_run : Bool = false, &block : String, Symbol -> ) : Tuple(Int32, Int32, Int32) #

Index files from a directory

Walks directory, respects ignore files, skips binary files, and indexes text content. Tracks file metadata for incremental updates.

Options:

  • root: Directory to index
  • ignore_file: Ignore file name (default ".gitignore")
  • incremental: Skip unchanged files based on mtime (default true)
  • dry_run: List files without indexing (default false)

Returns tuple of (indexed_count, skipped_count, total_files)

Example:

indexed, skipped, total = memo.index_files("/path/to/project")

[View source]
def index_files(root : String, ignore_file : String = ".gitignore", incremental : Bool = true, dry_run : Bool = false) : Tuple(Int32, Int32, Int32) #

Index files without progress callback


[View source]
def like(query : String, limit : Int32 = 10, min_score : Float64 = 0.5) : Array(Vocab::Result) #

Find words semantically similar to the query

Searches the vocabulary table for words with similar embeddings. Requires vocabulary to be built first with build_vocab().

Returns array of VocabResult with word, score, and frequency.

Example:

results = memo.like("database")
results.each do |r|
  puts "#{r.word}: #{r.score}"
end

[View source]
def list_files(limit : Int32 = 100, offset : Int32 = 0) : Array(Files::FileRecord) #

List indexed files


[View source]
def list_services : Array(ServiceProvider::Info) #

List all service configurations

Returns array of service info, ordered by creation time (newest first).


[View source]
def list_services_by_format(format : String) : Array(ServiceProvider::Info) #

List service configurations by format

Returns array of service info for the specified API format.


[View source]
def mark_as_read(chunk_ids : Array(Int64)) #

Mark chunks as read (increment read_count)


[View source]
def process_queue : Int32 #

Process queued items

Embeds pending items from the queue using the service's batch_size. Returns number of items successfully processed.

Failed items have their status set to the error code and can be retried up to max_retries times.

NOTE Queue items are not atomically claimed. This is intentional - single-worker processing is the expected use case since we're hitting one API endpoint with batched requests. Parallel workers would hit rate limits and add complexity without benefit.


[View source]
def process_queue_async #

Process queued items asynchronously

Spawns a fiber to process the queue and returns immediately. Use queue_stats to check progress.


[View source]
def projection_vectors : Array(Array(Float64)) #

[View source]
def provider : Providers::Base #

[View source]
def queue_config : Config::Queue #

[View source]
def queue_stats : QueueStats #

Get queue statistics

Returns counts of pending and failed items.


[View source]
def reindex(source_type : String) : Int32 #

Re-index all content of a given source type

Deletes existing embeddings and queues text for re-embedding. Requires text storage to be enabled.

Returns number of items queued for re-indexing.


[View source]
def reindex(source_type : String, &block : ExternalId -> String) : Int32 #

Re-index all content of a given source type using a block to fetch text

Use this when text storage is disabled. The block receives the external source_id and should return the text to embed.

Returns number of items queued for re-indexing.

Example:

memo.reindex("article") do |source_id|
  app.get_article_text(source_id)
end
memo.process_queue

[View source]
def search(query : String, limit : Int32 = 10, min_score : Float64 = 0.7, source_type : String | Nil = nil, source_id : ExternalId | Nil = nil, pair_id : ExternalId | Nil = nil, parent_id : ExternalId | Nil = nil, like : String | Array(String) | Nil = nil, match : String | Nil = nil, sql_where : String | Nil = nil, include_text : Bool = true) : Array(Search::Result) #

Search for semantically similar chunks

Automatically generates query embedding and searches.

Returns array of search results ranked by similarity.

source_id: Filter by external source ID (Int64 or String).

like: LIKE pattern(s) to filter by text content. Single string or array of strings for AND filtering. Example: like: "%cats%" or like: ["%cats%", "%dogs%"] Only works when text storage is enabled.

match: FTS5 full-text search query. Supports AND, OR, NOT, prefix*, "phrases". Example: match: "cats OR dogs", match: "quick brown*" Only works when text storage is enabled.

sql_where: Raw SQL fragment for filtering chunks. Used with ATTACH to filter by external database tables. Example: "c.source_id IN (SELECT id FROM main.artifact WHERE kind = 'goal')" Note: c.source_id here is the internal ID, not external.

include_text: If true, includes text content in search results. Only works when text storage is enabled.


[View source]
def service_id : Int64 #

[View source]
def service_name : String #

[View source]
def service_stats(name : String) : ServiceProvider::Stats | Nil #

Get usage statistics for a service


[View source]
def set_default_service(name : String) : Bool #

Set a service as the default

Returns true if successful, false if service not found.


[View source]
def stats : Stats #

Get statistics about indexed content

Returns counts of embeddings, chunks, and unique sources.


[View source]
def table_prefix : String #

[View source]
def text_storage? : Bool #

Track whether text storage is enabled


[View source]
def update_service(name : String, base_url : String | Nil = nil, max_tokens : Int32 | Nil = nil) : ServiceProvider::Info | Nil #

Update a service configuration

Can update base_url and max_tokens. Returns the updated service info, or nil if not found.


[View source]
def use_service(name : String, api_key : String | Nil = nil) #

Switch to a different service

Changes the current provider and service configuration. The api_key is required for non-mock services.

Example:

memo.use_service("azure-prod", api_key: ENV["AZURE_API_KEY"])

[View source]
def vocab_stats : Int64 #

Get vocabulary statistics


[View source]