设计

基本的设计思路是,在相应的场合创建 message,通过 Active Job 广播到 Action Cable。

Message

为了在 rails 中实现消息通知系统,简单调研了一些开源项目的实现方式。大致逻辑是,建立一个如下所示的 Message 表:

id title content type recevier actor target path
  标题 内容 消息类型 接收者 操作者 操作的对象 跳转地址

在需要推送消息的场合,比如创建评论,@他人等,在 Comment model 中添加一个after_create动作来创建 message。web socket 的推送只显示 title,消息列表可以显示 title 和 content。点击消息时按照 path 来跳转。

Action Cable

为了实现 web socket 推送,我们还需要开启 action cable1。关于 action cable,请参见DHH 的教学视频

开启cable.jsroutes

// app/assets/javascripts/cable.js
//= require action_cable
//= require_self
//= require_tree ./channels
(function() {
  this.App || (this.App = {});
  App.cable = ActionCable.createConsumer();
}).call(this);
# config/routes.rb
Rails.application.routes.draw do
  mount ActionCable.server => "/cable"
end

创建连接。这里连接的时候会写入 current user,供接下来按照 user 订阅用。

# app/channels/application_cable/connection.rb
module ApplicationCable
  class Connection < ActionCable::Connection::Base
    identified_by :current_user

    def connect
      self.current_user = find_verified_user
    end

    protected
    def find_verified_user
      # 我使用了 devise,可以这样获取 current user
      if current_user = env['warden'].user
        current_user
      else
        reject_unauthorized_connection
      end
    end
  end
end

创建 channel:rails g channel messages

# app/channels/messages_channel.rb
class MessagesChannel < ApplicationCable::Channel
  def subscribed
    # 从 connection 可以拿到 current user
    stream_from "messages_#{current_user.id}"
  end
  def unsubscribed
    # Any cleanup needed when channel is unsubscribed
  end
end
# app/assets/javascripts/channels/messages.coffee
jQuery(document).on 'turbolinks:load', ->
  App.messages = App.cable.subscriptions.create "MessagesChannel",
    connected: ->
      console.log 'connected'
    disconnected: ->
      console.log 'disconnected'  
    received: (data) ->
      console.log 'received'
      # 其他操作

这样,我们便可以在创建 message 的时候推送了,比如,在 message model 中添加after_create_commit动作,触发广播。

ActionCable.server.broadcast "messages_#{message.receiver.id}", data

然而这里更推荐使用 active job。

Active Job

首先创建 job,rails g job message_notification

class MessageNotificationJob < ApplicationJob
  queue_as :default

  def perform(message)
    ActionCable.server.broadcast "messages_#{message.receiver.id}", data # 这里是 message 组装的 data 
  end
end

然后把上面的after_create_commit改为:MessageNotificationJob.perform_later(self)

如此重启服务器即可正常工作。

生产环境配置

在生产环境下,Active Job 使用 sidekiq,Action Cable 使用 redis。(Ruby on Rails 的生产环境配置参见 Ruby on Rails 开发和生产环境搭建)

Action Cable

安装 redis2

# 源码安装
# wget/tar/configure/make/make install

# yum 安装
rpm -Uvh http://download.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
rpm -Uvh http://rpms.famillecollet.com/enterprise/remi-release-6.rpm
yum --enablerepo=remi,remi-test install redis
# 配置
vi /etc/sysctl.conf # vm.overcommit_memory=1
sysctl vm.overcommit_memory=1
sysctl -w fs.file-max=100000
# 添加 service
chkconfig --add redis
chkconfig --level 345 redis on
service redis start/stop/restart

设置 redis 的地址。

# config/cable.yml
development:
  adapter: async
 
test:
  adapter: async
 
production:
  adapter: redis
  url: redis://localhost:12345

环境设置。

# config/environments/production
config.action_cable.allowed_request_origins = ["https://example.com", "http://example.com"]

nginx 服务器配置。

server {
  listen 80;
  server_name example.com;

  passenger_enabled on;
  passenger_ruby your_ruby_path;
  rails_env production;
  root your_project_path/current/public;

  location /cable {
    passenger_app_group_name your_app_name_action_cable;
    passenger_force_max_concurrent_requests_per_process 0;
  }

  error_page 500 502 503 504 /50x.html;
  location = /50x.html {
    root html;
  }
}

Active Job

Job 适配器使用 sidekiq,在Gemfile中添加并bundle install

# config/environments/production.rb
config.active_job.queue_adapter = :sidekiq

初始化。

# config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
  config.redis = { url: 'redis://localhost:12345' }
end

Sidekiq.configure_client do |config|
  config.redis = { url: 'redis://localhost:12345' }
end

确保 redis 已经正常启动,然后在项目目录下执行sidekiq,此时便可以正常工作了。

如果你的 job 的 queue 不是 default,那么你还需要创建config/sidekiq.yml文件。执行的时候需要sidekiq -C ./config/sidekiq.yml

---
:concurrency: 1
production:
  :concurrency: 25
:queues:
  - [task1, 5]
  - [task2, 1]

使用 mina 部署的时候,需要添加mina_sidekiq这个 gem3

require 'mina_sidekiq/tasks'
# 在需要的地方 invoke
invoke :'sidekiq:quiet'
invoke :'sidekiq:restart'