CI GitHub release GitHub

TJ Lavin 🚴

"Keep on peddalin'!" - A Crystal job queue library for background processing

TJ Lavin is a lightweight Crystal wrapper around LavinMQ that makes background job processing simple and fun. Named after the legendary BMX rider and Challenge host, it's designed to keep your application pedaling smoothly even under pressure.

✨ Features

🚀 Quick Start

Installation

Add this to your shard.yml:

dependencies:
  tj_lavin:
    github: russ/tj_lavin

Then run:

shards install

Configuration

Configure TJ Lavin in your application:

TJLavin.configure do |settings|
  settings.amqp_url = ENV["AMQP_URL"]
  settings.routing_key = ENV["AMQP_ROUTING_KEY"]? # defaults to "tjlavin"
  settings.default_exchange = ENV["AMQP_DEFAULT_EXCHANGE"]? # defaults to ""
  settings.delayed_exchange = ENV["AMQP_DELAYED_EXCHANGE"]? # defaults to "tjlavin.delayed"
end

Creating a Job

Define your background jobs by inheriting from QueuedJob:

class BMXWorker < TJLavin::QueuedJob
  param name : String
  param skill_level : Int32 = 5

  def perform
    puts "Keep on peddalin' #{name}! Your skill level is #{skill_level}/10 🚴‍♂️"
    # Your job logic here
  end
end

Enqueuing Jobs

Queue up some work:

# Simple job
BMXWorker.new(name: "BMX Bandit").enqueue

# High priority job (0-255, higher = more priority, default is 0)
BMXWorker.new(name: "Speed Demon").enqueue(priority: 10)

# Delayed job
BMXWorker.new(name: "Future Rider").enqueue(delay: 30.seconds)

Named Queues

By default, all jobs use the "tjlavin" queue. You can assign jobs to specific queues using the queue macro:

class SendEmailWorker < TJLavin::QueuedJob
  # No queue declaration — uses the default "tjlavin" queue
  param user_id : UUID

  def perform
    # ...
  end
end

class MigrateGifMediaWorker < TJLavin::QueuedJob
  queue "media_processing"

  param media_id : UUID

  def perform
    # ...
  end
end

Enqueuing works the same way — the job already knows its queue:

SendEmailWorker.new(user_id: user.id).enqueue
MigrateGifMediaWorker.new(media_id: media.id).enqueue(priority: 100)

Running Workers

You can control which queues a worker process consumes from:

# Consume from ALL registered queues (default)
TJLavin::Runner.start

# Consume only from the default queue (e.g., web server workers)
TJLavin::Runner.start(queues: ["tjlavin"])

# Consume only media processing jobs (e.g., dedicated media workers)
TJLavin::Runner.start(queues: ["media_processing"])

# Consume from multiple specific queues
TJLavin::Runner.start(queues: ["tjlavin", "media_processing"])

For Lucky/Avram applications, set up your worker process:

# worker.cr
require "./app"

Habitat.raise_if_missing_settings!

if LuckyEnv.development?
  Avram::Migrator::Runner.new.ensure_migrated!
  Avram::SchemaEnforcer.ensure_correct_column_mappings!
end

# Disable query cache for better performance in workers
Avram.settings.query_cache_enabled = false

# Start processing jobs
TJLavin::Runner.start

Then run your worker:

crystal worker.cr

Testing with specs

To test your workers, include a spec helper like this:

module TJLavin
  abstract class QueuedJob < Job
    ENQUEUED_JOBS = [] of String

    def enqueue(priority : Int32 = 0, delay : Time::Span = 0.seconds) : JobRun
      ENQUEUED_JOBS << self.class.name
      TJLavin::JobRun.new("nothing")
    end
  end
end

Spec.before_each do
  TJLavin::QueuedJob::ENQUEUED_JOBS.clear
end

Then check to make sure your job was enqueued:

TJLavin::QueuedJob::ENQUEUED_JOBS.should contain("MyWorker")

📚 Documentation

🤝 Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create a new Pull Request

📄 License

Apache License 2.0 - see LICENSE file for details.


Keep on peddalin'! 🚴‍♂️