(実験)ConcurrentLuaでマルチクライアントサーバーをつくってみた

去年のいつごろだったかに作ったやつを、年あけてから載せるという暴挙
ConcurrentLuaでマルチクライアントサーバーみたいなのを実験的に作ってみました。(エラー処理もしてないし、整理できてないしだけど。
考えるままに素直に作れた感じがする。ふしぎ。

想定

LAN内のみ。ファイアウォールなし。NATなし。

こんな流れ

  • サーバーでは「受付」「Broadcaster」が常駐して、クライアントが接続すると「席」をSpawnする。
  • クライアントではクライアント本体が常駐して、「スピーカー」をSpawnする。
接続
  1. クライアントをサーバーの「受付」につなぐ。
  2. 「受付」がクライアントとサーバーの通信をとりもつ「席」を用意する
  3. 「受付」がクライアントを「席」に誘導する
  4. 「席」が「Broadcaster」に、席リストに加えておくよう通知する
メッセージ交換
  1. 接続時に、クライアントは「スピーカー」を設置する。こいつが定期的に他のクライアントにメッセージを送る。
  2. 「スピーカー」が自分の「席」に発言を送る
  3. 「席」が「Broadcaster」に発言を送る
  4. 「Broadcaster」が席リストの全員(発言を送ってきた席以外)に発言を送る
  5. それぞれの「席」がおのおののクライアントに発言を送る
  6. 各クライアントが発言を受け取って表示
切断

クライアントから切断メッセージを受け取ると、「席」は「Broadcaster」に席リストからはずすように言い残して事切れる。

コード

multi_server.lua

require 'concurrent'

-- メッセージは、たいがい{ from, title, body }で。
-- from: どこが送信したか
-- title: 命令
-- body: 文章

-- seatからbroadcasterに送信するメッセージには、client_nameが含まれる
-- 自分以外に送る場合に、seatのpidが必要なため。
-- かつクライアントの名前もいる。

-- クライアントに席(seat)を用意してご案内
function receptionist()
    while true do
        local message = concurrent.receive()
        if message.title == 'connect' then
            local seat_pid = concurrent.spawn( seat, message.body )
            concurrent.send( message.from, {
                from = { concurrent.self(), concurrent.node() },
                title = 'seat assigned',
                body = { seat_pid, concurrent.node() }
                })
        end
    end
end

-- 自分以外の「席」に、クライアントへメッセージ送信してくれるよう頼む
function broadcast( seats, message )
    for key, seat in pairs(seats) do
        if seat ~= message.from then
            concurrent.send( seat, {
                from = message.client_name,
                title = message.title,
                body = message.body
                } )
        end
    end
end

-- サーバーに接続してる人全員(または自分以外)にメッセージを送信
function broadcaster()
    local seats = {}
    
    -- 指定された席を検索して、tableから削除する。
    -- TODO: 高速化(2分探索など、他のアルゴリズムを探す)
    local remove_seat
    remove_seat = function ( seats, target )
            for key, seat in pairs(seats) do
                if seat == target then
                    table.remove( seats, key )
                    return true
                end
            end
            return false
        end

    while true do
        local message = concurrent.receive()
        if message.title == 'connect' then
            table.insert( seats, message.from )
            for key, seat in pairs(seats) do
                print( key, seat )
            end
            concurrent.spawn( broadcast, seats, {
                from = message.from,
                title = 'talk',
                body = message.body,
                client_name = message.client_name
                })
        elseif message.title == 'disconnect' then
            concurrent.spawn( remove_seat, seats, message.from )
            concurrent.spawn( broadcast, seats, {
                from = message.client_name,
                title = 'talk',
                body = message.body,
                client_name = message.client_name
                })
        elseif message.title == 'talk' then
            concurrent.spawn( broadcast, seats, message )
        end
    end                            
end

-- 各々のクライアントに割り当てられる。検証・加工してbroadcasterに送信を任せる
-- client = { name, process }
-- nameとnodenameが別々 <- nameはつなぎながらでも変えたい
function seat( client )
    print( client.name .. 'のための席ができたよ' )
    concurrent.send( 'broadcaster', {
        from = concurrent.self(),
        title = 'connect',
        body = client.name .. ' connected',
        client_name = client.name
        })
    while true do
        local message = concurrent.receive()
        if message.title == 'disconnect' then
            print( client.name .. 'の接続が切れました' )
            concurrent.send( 'broadcaster', {
                from = concurrent.self(),
                title = message.title,
                body = message.body,
                client_name = client.name
                })
            break
        elseif message.broadcast == true then
            concurrent.send( 'broadcaster', {
                from = concurrent.self(),
                title = message.title,
                body = message.body,
                client_name = client.name
                })
        elseif message.title == 'talk' then
            print( client.name .. ' へ:' .. message.body )
            concurrent.send( client.process, message )
        end
    end
end

concurrent.init('server@192.168.0.13')

pid_broadcaster = concurrent.spawn( broadcaster )
pid_receptionist = concurrent.spawn( receptionist )
concurrent.register( 'broadcaster', pid_broadcaster )
concurrent.register( 'receptionist', pid_receptionist )
print( '準備完了' )
concurrent.loop()
concurrent.shutdown()

multi_client.lua

require 'concurrent'

--concurrent.setoption( 'trapexit', true )

function speaker()
    local i
    for i = 1, 10 do
        concurrent.send( 'client', {
            title = 'talk',
            body = 'あいうえお!',
            broadcast = true,
            index = i
            })
        concurrent.sleep( 500 )
    end
    print( 'speakerおわり' )
    concurrent.send( 'client', {
        from = concurrent.self(),
        title = 'close speaker'
        })
end

function client()
    print( '準備完了: ' .. concurrent.self() .. concurrent.node())
    concurrent.send( { 'receptionist', 'server@192.168.0.13' }, {
        title = 'connect',
        from = {concurrent.self(), concurrent.node()},
        body = {
            name = time,
            process = { concurrent.self(), concurrent.node() },
            }
        })
    local seat
    local speaker_pid
    while true do
        local message = concurrent.receive()
        if message.title == 'seat assigned' then
            seat = message.body
            print( '席ができたよ!: ' )
            speaker_pid = concurrent.spawn( speaker )
        elseif message.title == 'close speaker' then
            local err = concurrent.send( seat, {
                from = { concurrent.self(), concurrent.node() },
                title = 'disconnect',
                body = '切断します。',
                })
            print( '切断します。' )
            break
        elseif message.broadcast == true then
            local err = concurrent.send( seat, {
                from = { concurrent.self(), concurrent.node() },
                title = message.title,
                body = message.body,
                broadcast = message.broadcast,
                })
            print( message.index .. message.body .. 'って送ります: ' )
        elseif message.from ~= 'a' .. time and message.title == 'talk' then
            print( message.from .. ': ' .. message.body )
        end
    end
end

time = tostring( os.time() )
concurrent.init( 'a' .. time .. '@192.168.0.13' )
client_pid = concurrent.spawn( client )
concurrent.register( 'client', client_pid )
concurrent.loop()
print( 'quit' )
concurrent.shutdown()