Wednesday 3 March 2010

Ruby Sockets

I was messing around with Ruby sockets, and came up with a simple chat-server. Testing proved to be rather more complicated... If you run this program, you can connect to it using "telnet 6606".
require 'socket'
require 'thread'

# Boardcaster maintains a list of users.
class Broadcaster
def initialize; @users = []; end
def add user; @users << socket =" TCPServer.open(6606)" broadcaster =" Broadcaster.new" lock =" Mutex.new">")
user = {:name => s.gets.strip, :socket => s }
b.add user
print("#{user[:name]} is accepted\n")
s.write("Hello #{user[:name]}\n\rUsers on-line: #{b.list}\n\r>")
while true
st = s.gets.strip
#p st
break if st == 'bye'
lock.synchronize do
b.broadcast "#{user[:name]} says \"#{st}\"\n\r>"
end
end
lock.synchronize do
b.broadcast "#{user[:name]} has left\n\r>"
end
s.close
b.remove user
print("#{user[:name]} is gone\n")
end

end

This was my first experience of both threads and sockets on Ruby, and with regards to threads, I have to admit to being pretty clueless! However, it does seem worthwhile locking the shared resource, b, when used on a thread.

Sockets seems straightforward enough. A new socket is opened using the TCPServer class. Data is collected with gets, and sent with write. At the end it is closed. I suspect there should be some exception handling in there, but it certainly proves the concept.


Testing Stream-Handling Methods

Okay, so now I want to test my methods that handle streams. Let us suppose that you have a method that accepts data from some stream and outputs to another, like the broadcast method above, and you want to test it. How do you do it?

First, let me simplify, and instead consider this method:
def get_data source, sink
print "\n>"
name = source.gets.strip
print "\n>"
age = source.gets.strip
sink.print "Name: #{name}, age: #{age}"
end

This could be invoked for use with the keyboard like this
get_data $stdin, $stdout

Or across a network, like this:
require 'socket'
socket = TCPServer.open(6606)
get_data socket, socket

If I want to test that method the trick is to use StringIO objects.
def test_get_data1
StringIO.open { |sink|
get_data(StringIO.new("Boris\n32\n"), sink)
assert_equal "Name: Boris, age: 32", sink.string
}
end

Actually, Ruby would happily let you use the same StringIO object for both input and output, but the output would be appended to the input string, so your assertion would need to check for both the input and the output.
def test_get_data2
StringIO.open("Boris\n32\n") { |io|
get_data(io, io)
assert_equal "Boris\n32\nName: Boris, age: 32", io.string
}
end

That is bad; if we change the get_data method to accept different input, we would need to change the test in two places, and that is clearly a bad thing. Well, okay, we change it so the input gets inserted into what we expect. The problem now is that Ruby is modifying that string during the test, so we need instead to give Ruby a duplicate of the input string for it to play with, so we still have the orignal for comparison at the end.
def test_get_data3
input = "Boris\n32\n"
StringIO.open(input.clone) { |io|
get_data(io, io)
assert_equal "#{input}Name: Boris, age: 32", io.string
}
end

Then again, perhaps we need to rethink. The whole thing can be generalised into a new method, which can test any method against any input. The test itself can then be reduced to a single line.
def test_get_data4
stream_test("Boris\n32\n", "Name: Boris, age: 32") do |io|
get_data(io, io)
end
end

def stream_test input, output
StringIO.open(input.clone) { |io|
yield io
assert_equal "#{input}#{output}", io.string
}
end

A serious problem with all of these is that errors do not get caught by the test regime. A message is sent to the output, but the error is not counted in the totals (failures, on the other hand, are). I guess this is because the redirect is capturing the exception. A way around this is to capture the exception inside the block, and then flag this as a failure:
def stream_test input, output
StringIO.open(input.clone) { |io|
begin
yield io
rescue Exception => ex
assert false, "ERROR: #{ex.inspect}\n#{$!.backtrace[0..12] * "\n"}"
end
assert_equal "#{input}#{output}", io.string
}
end



Testing Multiple Threads

Those tests are all very well, but my Broadcast object sends messages to multiple users. How do I test that? Now I need threads in my tests!

Here is a test method that worked for me:
def test_broadcast
# SETTING UP

# The number of threads to spawn
number = 100
test_string = 'teststring'
b = Broadcaster.new
# Define strings outside the blocks so we can access them
# later on
string_ary = Array.new(number, '')
thread_ary = Array.new(number)
main_s = nil

# LISTENING THREADS
# A number of threads are spawned, they listen for
# messages for 0.2 seconds, then write their StringIO
# string to string_ary, before terminating.

number.times do |i|
# Spawn a new thread
thread_ary[i] = Thread.start(b, i) do
# Create a StringIO object to collect the string
StringIO.open do |sink|
# Create a new user, and add it to the Broadcaster
user = {:name => 'test1', :socket => sink }
b.add user
# Wait a short time for the message to be broadcast
# Choose wisely, 0.2 on my system led to failures
sleep(0.3)
# Set s1 to a copy of the sink string, so it
# is still around outside the block
string_ary[i] = sink.string.clone
end
end
end

# SENDING THREAD
# On the main thread, this sends the message, then waits
# for all the other threads to terminate.

# Create a StringIO object to collect the string
StringIO.open do |sink|
begin
# Create a new user, and add it to the Broadcaster
user = {:name => 'test2', :socket => sink }
b.add user
# Broadcast the test string
b.broadcast(test_string)
# Wait for the other threads to finish
# by which time the broadcast should have been received.
number.times { |i| thread_ary[i].join }
main_s = sink.string.clone
rescue Exception => ex
# Flag any exceptions as a failure
assert false, ex.inspect
end
end

# TESTING
# Test all the threads received the test_string
assert_equal test_string, main_s
number.times { |i| assert_equal test_string, string_ary[i] }
end



Struggling with Ruby: Contents Page

3 comments:

sadfuzzy said...
This comment has been removed by the author.
sadfuzzy said...

Please, can you correct code errors? For example,
def add user; @users << socket =" TCPServer.open(6606)" broadcaster =" Broadcaster.new" lock =" Mutex.new">")
user = {:name => s.gets.strip, :socket => s }
b.add user
print("#{user[:name]} is accepted\n")
s.write("Hello #{user[:name]}\n\rUsers on-line: #{b.list}\n\r>")

s is not defined :(

sadfuzzy said...
This comment has been removed by the author.