Advertisement
Snusmumriken

Pub/sub

Sep 18th, 2017
187
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Lua 5.96 KB | None | 0 0
  1. local socket = require'socket'
  2.  
  3. -- формат передачи - |тип [тема] данные|
  4. local client = {}
  5. client.__index = client
  6.  
  7. function client:new(ip, port)
  8.     local o = {}                 -- прототип объекта клиента
  9.     o.ip = ip or '127.0.0.1'     -- данные сервера
  10.     o.port = port or '7777'      -- куда публикуем всякую дрянь
  11.     o.callbacks = {}             -- колбеки для тем подписок
  12.     o.socket = socket.tcp()
  13.     print('C: connect ', o.ip, o.port, ':', o.socket:connect(o.ip, o.port), 5)
  14.     o.socket:settimeout(0)       -- убираем блокировки (выдаст timeout коль ничего нет в сокете)
  15.     return setmetatable(o, self) -- присобачиваем прототипу нас как класс (благодаря __index)
  16. end
  17.  
  18. function client:close()
  19.     self.socket:close()
  20.     if self.callbacks.close then
  21.         self.callbacks.close(1)
  22.     end
  23. end
  24.  
  25. function client:send(type, ...)
  26.     local data = table.concat({type, ...}, ' '):gsub([[\+n]], [[\\n]])
  27.     print('C: send: ', self.socket:send(data..'\r\n'))
  28. end
  29.  
  30. function client:update()
  31.     local data, err = self.socket:receive'*l' -- принимаем строчку от сервера
  32.     if err == 'closed' then return  nil, 'closed' end
  33.    
  34.     --тип [топик] данные
  35.     while data do
  36.         print('C: RCV', data)
  37.         data = data:gsub([[\\n]], [[\n]]) -- делаем обратную замену, как при отправке
  38.         local type, body = data:match'(%S+)%s(.*)' -- нам нужно выделить тип сообщения и тело сообщения
  39.         if type == 'pub' then -- если это какая-то публикация
  40.             print('C: IS PUB!')
  41.             local topic, message = body:match'(%S+)%s(.*)' -- выделяем топик и сообщение из тела
  42.             if self.callbacks[topic] then
  43.                 self.callbacks[topic](message) -- дёргаем колбек
  44.             end
  45.         elseif type == 'disconnect' then
  46.             self:close()
  47.         end
  48.         data = self.socket:receive'*l'
  49.     end
  50. end
  51.  
  52. -- При подписке, мы пихаем функцию, которая будет дёргаться когда придёт что-то по данному топику
  53. function client:subscribe(topic, callback)
  54.     self:send('sub', topic)
  55.     self.callbacks[topic] = callback
  56. end
  57.  
  58. function client:unsubscribe(topic)
  59.     self:send('unsub', topic)
  60.     self.callbacks[topic] = nil -- удаляем колбек на этот топик, коль отписались
  61. end
  62.  
  63. function client:publish(topic, ...)
  64.     self:send('pub', topic, ...)
  65. end
  66.  
  67. local server = {}
  68. server.__index = server
  69.  
  70. function server:new(port, ip)  -- ip тут совсем не обязателен, поэтому второй (как опция фильтра айпишников)
  71.     local o = {}
  72.     o.ip = ip or '*'             -- при создании слушающего сокета, ip работает как фильтр айпишников, по которым приходят сообщения
  73.     o.port = port or '7777'      -- а порт - как порт который мы слушаем
  74.     o.clients = {}               -- список приконнекченного народа и их подписок как элементы таблицы
  75.     o.socket = socket.tcp()
  76.     o.socket:settimeout(0)       -- убираем блокировки (выдаст timeout коль ничего нет в сокете)
  77.     o.socket:setsockname(o.ip, o.port)
  78.    
  79.     print('S: listen ', o.ip, o.port, ':', o.socket:listen())
  80.     return setmetatable(o, self) -- присобачиваем прототипу нас как класс (благодаря __index)
  81. end
  82.  
  83. function server:send(sock, ...)
  84.     local data = table.concat({...}, ' ') -- конкатенируем всю фигню в строку и балуемся с экранированием переносов строки.
  85.     print('S: send: ', data)
  86.     sock:send(data..'\r\n')
  87. end
  88.  
  89. -- Пересылка сообщения по выбранному топику
  90. function server:repost(topic, data)
  91.     for sock, pubs in pairs(self.clients) do
  92.         if pubs[topic] then     -- Если клиент подписан на наш топик - отправляем ему
  93.             self:send(sock, data) -- Тут - полные данные, без парсинга. Что приняли то и отослали.
  94.         end
  95.     end
  96. end
  97.  
  98. function server:update()
  99.     local sock = self.socket:accept() -- к нам кто-то законнектился? Отлично!
  100.     while sock do
  101.         print('S: New sock!')
  102.         sock:settimeout(0)
  103.         self.clients[sock] = {} -- инициализируем список подписок
  104.         sock = self.socket:accept()
  105.     end
  106.    
  107.     -- Проходим по списку клиентов, принимаем от них данные, реагируем
  108.     for sock, pubs in pairs(self.clients) do
  109.         local data, err = sock:receive'*l'
  110.         if err == 'closed' then  -- если клиент отвалился - удаляем его нафиг, он нам больше не нужен
  111.             print('S: '..(sock:getpeername())..' CLOSED')
  112.             self.clients[sock] = nil
  113.             break
  114.         end
  115.        
  116.         while data do
  117.             local type, body = data:match'(%S+)%s(%S+)'
  118.             print('S: type/body', type, body)
  119.             if type == 'sub' then       -- подписка
  120.                 print('S: IS SUB!')
  121.                 pubs[body] = true         -- body как топик (тип подписки)
  122.             elseif type == 'unsub' then -- отмена подписки
  123.                 print('S: IS UNSUB!')
  124.                 pubs[body] = nil
  125.             elseif type == 'pub' then   -- публикация
  126.                 self:repost(body, data)   -- мы так и не изменяли data, поэтому перешлём в неизменном виде
  127.             elseif type == 'disconnect' then
  128.                 print('S: IS DUSCONNECT!')
  129.                 self.clients[sock] = nil  -- удаляем нафиг, вместе с подписками
  130.             end
  131.             data = sock:receive'*l'
  132.         end
  133.     end
  134. end
  135.  
  136. return {
  137.     client = client,
  138.     server = server
  139. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement