# Simple Shared State Protocol (sssp) Server # Copyright 2006 Eric Harris-Braun # This program is made available under the terms of the GNU GPL license # as specified at http://www.gnu.org/copyleft/gpl.html # See http://eric.harris-braun.com/sssp for details $version = "0.0.3" $sssp_version = "0.02" require 'socket' require 'Thread' args = [] $verbose = false ARGV.each do |arg| if arg =~ /^-/ then $verbose = true if arg == "-v" if (arg == "-V") then print "sssp server version #{$version} for sssp protocol version #{$sssp_version}\n" print "(c)2006 Harris-Braun Enterprises, LLC. Licensed for use under GPL.\n" exit end else args.push(arg) end end port = (args[0] || 8080).to_i server = TCPServer.new('', port) $history = Array.new $state = Hash.new $meta = Hash.new $state_changes = Hash.new $sessions = Hash.new $outputQueue = Queue.new $m = Mutex.new def warn(error) puts "WARNING: #{error}" end class Session def initialize(tcpsession) @tcpsession = tcpsession @close_actions = [] end def add_close_action(action) puts "adding condition: "+action if $verbose @close_actions << action end def print(str) if !@tcpsession.closed? then @tcpsession.print(str) end end def addr @tcpsession.addr end def peeraddr @tcpsession.peeraddr end def cleanup puts "session close_actions:"+@close_actions.length.to_s if $verbose @tcpsession.close if @close_actions.length > 0 then @close_actions.each do |action| changeState(action) end end end attr_reader :tcpsession,:close_actions end def changeState(state_action) #parse the action, convert to canonical form and set the value in the state (meta,command,address,value) = state_action.split(/\s/,4); $m.synchronize { if (command =~ /s(et)*/i) then state_action = "#{meta} SET #{address} #{value}" #convert to canonical form $state[address] = value $meta[address] = meta elsif (command =~ /d(elete)*/i) then state_action = "#{meta} DELETE #{address}" #convert to canonical form $state[address].delete $meta[address].delete end $history.push(state_action) state_num = $history.length-1 $state_changes[address] = state_num $outputQueue.push(state_num) } end def processReq(session,req, body) # REQ_CHANGE if req =~ /^c(hange)*/i then changeState(body) # REQ_IF_CHANGE elsif req =~ /^i(f_change)*/i then (condition,body) = body.split(/\s/,2) if condition == 'CLOSE' then session.add_close_action(body) else warn("unknown condition (#{condition}) made by #{session.peeraddr[2]}") end # REQ_STATE [][-] elsif req =~ /^s(tate)*/i then end_state = $history.length-1 if body == '' then $m.synchronize { ($state_changes.sort {|a,b| a[1]<=>b[1]}).each do |x| (addr,state_num) = x session.print "STATE #{state_num} #{$meta[addr]} SET #{addr} #{$state[addr]}\r\n" end } else body =~ /([0-9]+) *([0-9]+)*/ start_state = $1.to_i if ($2.to_i != 0) then end_state = $2.to_i end (start_state..end_state).each do |n| session.print "STATE #{n} #{$history[n]}\r\n" end end end puts "Request: #{req} body: #{body}" if $verbose end threads = [] # create the thread that writes out data to all sessions threads << Thread.new { while true state_num = $outputQueue.pop #will block until the queue has something in it. state_action = $history[state_num] $sessions.each do |session,old_state_num| $sessions[session] = state_num session.print "STATE #{state_num} #{state_action}\r\n" puts "sending to #{session.peeraddr[2]} STATE #{state_num} {#{state_action}}" if $verbose end end } # loop forever waiting for incomming connections and then put them into # a thread to actually handle them while (theSession = server.accept) threads << Thread.new(theSession) { |tcpsession| begin session = Session.new(tcpsession) sessionkey = session.peeraddr.join(",") puts "new connection from: #{sessionkey}" $sessions[session] = -1 done = false while !done req = session.tcpsession.gets if req == nil then done = true else req = req.chop puts "Raw Request: '#{req}' length:"+req.length.to_s if $verbose if req =~ /^req_([a-z_]*) *(.*)/i then processReq(session,$1,$2) elsif req =~ /^end/i then done = true elsif req == 't' then session.print threads.inspect+"\n" elsif req == 'h' then session.print $state.inspect+"\n" end end end rescue Exception=> err puts "Error: " + err ensure # the thread is closing down which means probably that the connection # has closed so we'd better clean up! puts "connection from: #{sessionkey} closing" $sessions.delete(session) session.cleanup puts "connection from: #{sessionkey} closed" end } end #cleanup threads threads.each { |aThread| aThread.join }