The Crystal programming language | http://crystal-lang.org | Fund Crystal's development: http://is.gd/X7PRtI | Docs: http://crystal-lang.org/docs/ | API: http://crystal-lang.org/api/
Unhandled exception in spawn: Error writing to socket: Broken pipe (IO::Error)
from /home/jonathan/.asdf/installs/crystal/1.0.0/share/crystal/src/io/evented.cr:82:13 in 'unbuffered_write'
from /home/jonathan/.asdf/installs/crystal/1.0.0/share/crystal/src/io/buffered.cr:217:5 in 'flush'
from /home/jonathan/.asdf/installs/crystal/1.0.0/share/crystal/src/http/web_socket/protocol.cr:106:5 in 'send'
from /home/jonathan/.asdf/installs/crystal/1.0.0/share/crystal/src/http/web_socket/protocol.cr:103:3 in 'send'
from /home/jonathan/.asdf/installs/crystal/1.0.0/share/crystal/src/http/web_socket/protocol.cr:90:5 in 'send'
from /home/jonathan/.asdf/installs/crystal/1.0.0/share/crystal/src/http/web_socket.cr:78:5 in 'send'
from lib/amber/src/amber/websockets/channel.cr:80:34 in 'rebroadcast!'
from src/channels/text_channel.cr:18:5 in 'handle_message'
from lib/amber/src/amber/websockets/channel.cr:49:9 in 'on_message'
from lib/amber/src/amber/websockets/channel.cr:88:13 in '->'
from /home/jonathan/.asdf/installs/crystal/1.0.0/share/crystal/src/primitives.cr:255:3 in '->'
from /home/jonathan/.asdf/installs/crystal/1.0.0/share/crystal/src/primitives.cr:255:3 in 'dispatch_received_message'
from lib/redis/src/redis/strategy/subscription_loop.cr:34:17 in 'enter_message_reception_loop'
from lib/redis/src/redis/strategy/subscription_loop.cr:19:7 in 'command'
from lib/redis/src/redis.cr:311:7 in 'command'
from lib/redis/src/redis/command_execution/value_oriented.cr:81:9 in 'void_command'
from lib/redis/src/redis/commands.cr:1675:7 in 'subscribe'
from lib/redis/src/redis/commands.cr:1665:7 in 'subscribe'
from lib/amber/src/amber/websockets/adapters/redis.cr:24:11 in '->'
from /home/jonathan/.asdf/installs/crystal/1.0.0/share/crystal/src/primitives.cr:255:3 in 'run'
from /home/jonathan/.asdf/installs/crystal/1.0.0/share/crystal/src/fiber.cr:92:34 in '->'
from ???
module Amber::WebSockets::Adapters
# Allows websocket connections through redis pub/sub.
class RedisAdapter
@subscriber : Redis
@publisher : Redis
@listener : Hash(String,Proc(String, JSON::Any, Nil)) = Hash(String, Proc(String, JSON::Any, Nil)).new
@subscribed : Bool = false
def self.instance
@@instance ||= new
end
# Establish subscribe and publish connections to Redis
def initialize
@subscriber = Redis.new(url: Amber.settings.redis_url)
@publisher = Redis.new(url: Amber.settings.redis_url)
if !@subscribed == true
spawn do
Fiber.yield
puts "subscribing to #{CHANNEL_TOPIC_PATHS}"
@subscribed = true
@subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on|
on.message do |_, m|
msg = JSON.parse(m)
sender_id = msg["sender"].as_s
message = msg["msg"]
topic = message["topic"].to_s.split(":").first
@listener[topic].call(sender_id, message)
end
on.subscribe do |_, _|
puts "subscribed to #{CHANNEL_TOPIC_PATHS}"
end
spawn do
Fiber.yield
to_subscribe = SUBSCRIBE_CHANNEL.receive
@subscriber.subscribe(to_subscribe)
end
end
end
end
end
# Publish the *message* to the redis publisher with topic *topic_path*
def publish(topic_path, client_socket, message)
@publisher.publish(topic_path, {sender: client_socket.id, msg: message}.to_json)
end
# Register listener with topic path after one-time subscribe on initialization
# So that it will be called when Redis pushes data to the subscription channel
def on_message(topic_path, listener)
@listener[topic_path] = listener
SUBSCRIBE_CHANNEL.send(topic_path)
end
end
end
Unhandled exception in spawn: Error writing to socket: Broken pipe (IO::Error)
from /home/jonathan/.asdf/installs/crystal/1.0.0/share/crystal/src/io/evented.cr:82:13 in 'unbuffered_write'
from /home/jonathan/.asdf/installs/crystal/1.0.0/share/crystal/src/io/buffered.cr:217:5 in 'flush'
from /home/jonathan/.asdf/installs/crystal/1.0.0/share/crystal/src/http/web_socket/protocol.cr:106:5 in 'send'
from /home/jonathan/.asdf/installs/crystal/1.0.0/share/crystal/src/http/web_socket/protocol.cr:103:3 in 'send'
from /home/jonathan/.asdf/installs/crystal/1.0.0/share/crystal/src/http/web_socket/protocol.cr:257:5 in 'close'
from /home/jonathan/.asdf/installs/crystal/1.0.0/share/crystal/src/http/web_socket.cr:111:5 in 'close'
from lib/amber/src/amber/websockets/client_socket.cr:111:9 in 'disconnect!'
from lib/amber/src/amber/websockets/client_socket.cr:102:9 in 'beat'
from lib/amber/src/amber/websockets/client_sockets.cr:17:13 in '->'
from /home/jonathan/.asdf/installs/crystal/1.0.0/share/crystal/src/primitives.cr:255:3 in 'run'
from /home/jonathan/.asdf/installs/crystal/1.0.0/share/crystal/src/fiber.cr:92:34 in '->'
from ???
Spec
that has equivalent functionality. Did I miss something, or is there a shard that does this?
require "../../../spec_helper"
module Amber
describe Amber::WebSockets::Adapters::RedisAdapter do
describe "#initialize" do
it "should subscribe to CHANNEL_TOPIC_PATHS" do
_, client_socket = create_user_socket
_, client_socket2 = create_user_socket
Amber::Server.pubsub_adapter = Amber::WebSockets::Adapters::RedisAdapter
channel = UserSocket.channels[0][:channel]
channel2 = UserSocket.channels[1][:channel]
channel.subscribe_to_channel(client_socket, "{}")
channel.subscribe_to_channel(client_socket2, "{}")
channel2.subscribe_to_channel(client_socket2, "{}")
channel2.subscribe_to_channel(client_socket, "{}")
# channel.test_field.last.should eq "handle joined #{client_socket.id}"
Amber::WebSockets::CHANNEL_TOPIC_PATHS.should eq ["user_room", "secondary_room"]
redis_adapter = Amber::WebSockets::Adapters::RedisAdapter.new
sleep 5.seconds
redis_adapter.subscribed.should eq true
end
end
describe "#publish" do
it "should publish the message to the channel" do
_, client_socket = create_user_socket
_, client_socket2 = create_user_socket
Amber::Server.pubsub_adapter = Amber::WebSockets::Adapters::RedisAdapter
channel = UserSocket.channels[0][:channel]
channel2 = UserSocket.channels[1][:channel]
channel.subscribe_to_channel(client_socket, "{}")
channel.subscribe_to_channel(client_socket2, "{}")
channel2.subscribe_to_channel(client_socket2, "{}")
channel2.subscribe_to_channel(client_socket, "{}")
# channel.test_field.last.should eq "handle joined #{client_socket.id}"
Amber::WebSockets::CHANNEL_TOPIC_PATHS.should eq ["user_room", "secondary_room"]
redis_adapter = Amber::WebSockets::Adapters::RedisAdapter.new
sleep 3.seconds
redis_adapter.subscribed.should eq true
channel = UserSocket.channels[0][:channel]
message = JSON.parse({"event" => "message", "topic" => "user_room:123", "subject" => "msg:new", "payload" => {"message" => "hey guys"}}.to_json)
channel.on_message("123", message)
channel.test_field.last.should eq "hey guys"
end
end
end
end
it "with one item" do
layout.each_line do |a, b|
a.should eq [1]
b.should eq 1
end
end
Layout.new
*
$~
and $1
, $n
representing regex matches