Advertisement
Guest User

Untitled

a guest
Dec 5th, 2016
65
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 14.80 KB | None | 0 0
  1. require 'socket'
  2.  
  3. $port = nil
  4. $hostname = nil
  5.  
  6. # From config file.
  7. $updateInterval = nil
  8. $maxPayload = nil
  9. $pingTimeout = nil
  10.  
  11. # Internal clock.
  12. $time = nil
  13. $clockDelay = 0.001
  14.  
  15. # Keep track of ping times.
  16. $startTimes = Hash.new
  17. $startTimesSem = Mutex.new
  18.  
  19. # All nodes' ports.
  20. $ports = Hash.new
  21.  
  22. # All $hostname's sockets, with neighbors => sockets.
  23. $sockets = Hash.new
  24. $socketsSem = Mutex.new
  25.  
  26. # All connected nodes, with sources => neighbors => costs.
  27. $neighbors = Hash.new { |h, k| h[k] = Hash.new }
  28. $neighborsSem = Mutex.new
  29.  
  30. # All $hostname's shortest paths, with destinations => nextHops and distances.
  31. $paths = Hash.new
  32. $pathsSem = Mutex.new
  33.  
  34. # All pending messages to/from $hostname.
  35. $messages = Hash.new
  36. $messagesSem = Mutex.new
  37.  
  38. class Path
  39. attr_accessor :nextHop, :distance
  40.  
  41. def initialize(nextHop, distance)
  42. self.nextHop = nextHop
  43. self.distance = distance
  44. end
  45. end
  46.  
  47. # --------------------- Part 0 --------------------- #
  48. def edgeb(cmd)
  49. srcIp = cmd[0]
  50. dstIp = cmd[1]
  51. dst = cmd[2]
  52.  
  53. unless $sockets.key?(dst)
  54. socket = TCPSocket.new(dstIp, $ports[dst])
  55. link(dst, socket)
  56.  
  57. $socketsSem.synchronize { socket.puts("LINK #{$hostname}") }
  58. Thread.new { handleCommands(socket) }
  59. end
  60. end
  61.  
  62. def dumptable(cmd)
  63. filename = cmd[0]
  64.  
  65. File.open(filename, "w") do |f|
  66. $pathsSem.synchronize do
  67. $paths.keys.sort.each do |dst|
  68. f.puts("#{$hostname},#{dst},#{$paths[dst].nextHop},#{$paths[dst].distance}") unless dst == $hostname
  69. end
  70. end
  71. f.close
  72. end
  73. end
  74.  
  75. def shutdown(cmd)
  76. $socketsSem.synchronize do
  77. $sockets.values.each do |sock|
  78. sock.puts("EDGED #{$hostname}")
  79. sock.close
  80. end
  81. end
  82. STDOUT.flush
  83. exit(0)
  84. end
  85.  
  86. # --------------------- Part 1 --------------------- #
  87. def edged(cmd)
  88. dst = cmd[0]
  89.  
  90. exists = false
  91. $socketsSem.synchronize do
  92. if $sockets.key?(dst)
  93. exists = true
  94. $sockets[dst].close
  95. $sockets.delete(dst)
  96. end
  97. end
  98. if exists
  99. dEdge([$hostname, dst])
  100.  
  101. # Find and delete all now-unreachable nodes.
  102. reachable = [$hostname]
  103. unreachable = []
  104. i = 0
  105. $neighborsSem.synchronize do
  106. while i < reachable.length
  107. reachable |= $neighbors[reachable[i]].keys
  108. i += 1
  109. end
  110. unreachable = $neighbors.keys - reachable
  111. end
  112. dNodes(unreachable)
  113. end
  114. end
  115.  
  116. def edgeu(cmd)
  117. dst = cmd[0]
  118. cost = cmd[1]
  119.  
  120. aEdges(["#{$hostname},#{dst},#{cost}"])
  121. end
  122.  
  123. def status()
  124. STDOUT.puts("Name: #{$hostname} Port: #{$port} Neighbors: #{$neighbors[$hostname].keys.sort.join(',')}")
  125. end
  126.  
  127. # --------------------- Part 2 --------------------- #
  128. def sndmsg(cmd)
  129. dst = cmd[0]
  130. msg = cmd[1..-1].join(' ')
  131. uniq = 999
  132. seqId = 655335
  133. header = "SENDM #{$hostname} #{dst} #{uniq} #{uniq} #{seqId}"
  134. messageSize = $maxPayload - header.bytesize
  135. numFrags = 0
  136. $startTimesSem.synchronize do
  137. loop do
  138. uniq = rand(999).to_s
  139. break unless $startTimes.key?(uniq)
  140. end
  141. $messagesSem.synchronize do
  142. $messages[uniq] = msg.scan(/.{1,#{messageSize}}/)
  143. numFrags = $messages[uniq].size
  144. end
  145. $startTimes[uniq] = $time
  146. end
  147. sends([$hostname, dst, numFrags, uniq])
  148.  
  149. Thread.new do
  150. sleep($pingTimeout)
  151.  
  152. $startTimesSem.synchronize do
  153. unless $startTimes[uniq] == nil
  154. STDOUT.puts("SNDMSG ERROR: HOST UNREACHABLE")
  155. $messagesSem.synchronize { $messages.delete(uniq) }
  156. end
  157. $startTimes.delete(uniq)
  158. end
  159. end
  160. end
  161.  
  162. def ping(cmd)
  163. dst = cmd[0]
  164. numPings = cmd[1].to_i
  165. delay = cmd[2].to_f
  166.  
  167. Thread.new do
  168. numPings.times do |seqId|
  169. uniq = 0
  170. $startTimesSem.synchronize do
  171. loop do
  172. uniq = rand.to_s
  173. break unless $startTimes.key?(uniq)
  174. end
  175. $startTimes[uniq] = $time
  176. end
  177. sendp([$hostname, dst, seqId, uniq])
  178.  
  179. Thread.new do
  180. sleep($pingTimeout)
  181.  
  182. $startTimesSem.synchronize do
  183. STDOUT.puts("PING ERROR: HOST UNREACHABLE") unless $startTimes[uniq] == nil
  184. $startTimes.delete(uniq)
  185. end
  186. end
  187. sleep(delay) unless seqId == numPings - 1
  188. end
  189. end
  190. end
  191.  
  192. def traceroute(cmd)
  193. dst = cmd[0]
  194.  
  195. uniq = ""
  196. $startTimesSem.synchronize { $startTimes[uniq] = $time }
  197. ackt([$hostname, dst, $hostname, 0, uniq])
  198. end
  199.  
  200. def ftp(cmd)
  201. STDOUT.puts("FTP: not implemented")
  202. end
  203.  
  204. # --------------------- Part 3 --------------------- #
  205. def circuit(cmd)
  206. STDOUT.puts("CIRCUIT: not implemented")
  207. end
  208.  
  209. # ----------------- Custom Methods ----------------- #
  210. def link(node, socket)
  211. $socketsSem.synchronize { $sockets[node] = socket }
  212. edges = "AEDGES #{$hostname},#{node},1"
  213. $neighborsSem.synchronize { $neighbors.each { |src, dsts| dsts.each { |dst, cost| edges << " #{src},#{dst},#{cost}" } } }
  214. $socketsSem.synchronize { socket.puts(edges) }
  215. end
  216.  
  217. def allSocketsPuts(line)
  218. $socketsSem.synchronize { $sockets.each_value { |sock| sock.puts(line) } }
  219. end
  220.  
  221. def aEdges(edges)
  222. edgesAdded = ""
  223. edges.each do |edge|
  224. arr = edge.split(',')
  225. src = arr[0]
  226. dst = arr[1]
  227. cost = arr[2].to_i
  228.  
  229. $neighborsSem.synchronize do
  230. unless $neighbors.key?(src) && $neighbors[src].key?(dst) && $neighbors[src][dst] == cost
  231. $neighbors[src][dst] = $neighbors[dst][src] = cost
  232. edgesAdded << " #{edge}"
  233. end
  234. end
  235. end
  236. allSocketsPuts("AEDGES#{edgesAdded}") unless edgesAdded == ""
  237. end
  238.  
  239. def dEdge(edge)
  240. src = edge[0]
  241. dst = edge[1]
  242.  
  243. $neighborsSem.synchronize do
  244. if $neighbors.key?(src) && $neighbors[src].key?(dst)
  245. $neighbors[src].delete(dst)
  246. $neighbors.delete(src) if $neighbors[src].empty?
  247. $neighbors[dst].delete(src)
  248. $neighbors.delete(dst) if $neighbors[dst].empty?
  249.  
  250. allSocketsPuts("DEDGE #{edge.join(' ')}")
  251. end
  252. end
  253. end
  254.  
  255. def dNodes(nodes)
  256. nodesDeleted = ""
  257. nodes.each do |node|
  258. if $neighbors.key?(node)
  259. $neighborsSem.synchronize do
  260. $neighbors.each_value { |dsts| dsts.delete(node) }
  261. $neighbors.delete(node)
  262. end
  263. nodesDeleted << " #{node}"
  264. end
  265. end
  266. allSocketsPuts("DNODES#{nodesDeleted}") unless nodesDeleted == ""
  267. end
  268.  
  269. def nextHopSocketPuts(dst, line)
  270. nextHop = ""
  271. $pathsSem.synchronize { nextHop = $paths[dst].nextHop if $paths.key?(dst) }
  272. $socketsSem.synchronize { $sockets[nextHop].puts(line) if $sockets.key?(nextHop) }
  273. end
  274.  
  275. def sendp(ping)
  276. src = ping[0]
  277. dst = ping[1]
  278. seqId = ping[2].to_i
  279. uniq = ping[3]
  280.  
  281. dst == $hostname ? ackp(ping) : nextHopSocketPuts(dst, "SENDP #{ping.join(' ')}")
  282. end
  283.  
  284. def ackp(ping)
  285. src = ping[0]
  286. dst = ping[1]
  287. seqId = ping[2].to_i
  288. uniq = ping[3]
  289.  
  290. if src == $hostname
  291. $startTimesSem.synchronize do
  292. if $startTimes.key?(uniq)
  293. roundTripTime = $time - $startTimes[uniq]
  294.  
  295. if roundTripTime < $pingTimeout
  296. $startTimes[uniq] = nil
  297. STDOUT.puts("#{seqId} #{dst} #{roundTripTime.round(3)}")
  298. end
  299. end
  300. end
  301. else
  302. nextHopSocketPuts(src, "ACKP #{ping.join(' ')}")
  303. end
  304. end
  305.  
  306. def sendt(trace)
  307. src = trace[0]
  308. dst = trace[1]
  309. hops = trace[2].to_i
  310. uniq = trace[3]
  311.  
  312. if hops == 0
  313. ackt([src, dst, $hostname, hops, uniq])
  314. else
  315. trace[2] = hops - 1
  316. nextHopSocketPuts(dst, "SENDT #{trace.join(' ')}")
  317. end
  318. end
  319.  
  320. def ackt(trace)
  321. src = trace[0]
  322. dst = trace[1]
  323. hostId = trace[2]
  324. hopCount = trace[3].to_i
  325. uniq = trace[4]
  326.  
  327. if src == $hostname
  328. sendTrace = false
  329. $startTimesSem.synchronize do
  330. if $startTimes.key?(uniq)
  331. roundTripTime = hostId == src ? 0.0 : ($time - $startTimes[uniq])
  332.  
  333. if roundTripTime < $pingTimeout
  334. $startTimes[uniq] = nil
  335. STDOUT.puts("#{hopCount} #{hostId} #{(roundTripTime / 2).round(3)}")
  336.  
  337. unless hostId == dst
  338. sendTrace = true
  339. loop do
  340. uniq = rand.to_s
  341. break unless $startTimes.key?(uniq)
  342. end
  343. $startTimes[uniq] = $time
  344. end
  345. end
  346. end
  347. end
  348. if sendTrace
  349. sendt([src, dst, hopCount + 1, uniq])
  350.  
  351. Thread.new do
  352. sleep($pingTimeout)
  353.  
  354. $startTimesSem.synchronize do
  355. STDOUT.puts("TIMEOUT ON #{hopCount + 1}") unless $startTimes[uniq] == nil
  356. $startTimes.delete(uniq)
  357. end
  358. end
  359. end
  360. else
  361. trace[3] = hopCount + 1
  362. nextHopSocketPuts(src, "ACKT #{trace.join(' ')}")
  363. end
  364. end
  365.  
  366. def sends(args)
  367. src = args[0]
  368. dst = args[1]
  369. numFrags = args[2].to_i
  370. srcUniq = args[3]
  371.  
  372. if dst == $hostname
  373. uniq = 0
  374. $messagesSem.synchronize do
  375. loop do
  376. uniq = rand(999).to_s
  377. break unless $messages.key?(uniq)
  378. end
  379. $messages[uniq] = Array.new(numFrags)
  380. $startTimesSem.synchronize { $startTimes[uniq] = $time }
  381. end
  382. acks([src, dst, srcUniq, uniq])
  383. STDERR.puts("out of acks")
  384. Thread.new do
  385. sleep($pingTimeout)
  386. STDERR.puts("after pingTimeout")
  387. $startTimesSem.synchronize do
  388. STDERR.puts("$startTimes.key?(uniq) = #{$startTimes.key?(uniq)}")
  389. STDERR.puts("($time - $startTimes[uniq]) = #{$time - $startTimes[uniq]}")
  390. unless $startTimes.key?(uniq) && ($time - $startTimes[uniq]) < $pingTimeout
  391. STDERR.puts("deleting")
  392. $startTimes.delete(uniq)
  393. $messagesSem.synchronize { $messages.delete(uniq) }
  394. end
  395. end
  396. end
  397. else
  398. nextHopSocketPuts(dst, "SENDS #{args.join(' ')}")
  399. end
  400. end
  401.  
  402. def acks(args)
  403. src = args[0]
  404. dst = args[1]
  405. srcUniq = args[2]
  406. dstUniq = args[3]
  407.  
  408. if src == $hostname
  409. $startTimesSem.synchronize { $startTimes.key?(srcUniq) && ($time - $startTimes[srcUniq]) < $pingTimeout ? $startTimes[srcUniq] = nil : return }
  410. numFrags = 0
  411. $messagesSem.synchronize { numFrags = $messages[srcUniq].size }
  412.  
  413. numFrags.times do |seqId|
  414. uniq = 0
  415. $messagesSem.synchronize do
  416. $startTimesSem.synchronize do
  417. loop do
  418. uniq = rand(999).to_s
  419. break unless $startTimes.key?(uniq)
  420. end
  421. $startTimes[uniq] = $time
  422. end
  423. sendm([src, dst, uniq, dstUniq, seqId, $messages[srcUniq][seqId]])
  424. end
  425. Thread.new do
  426. sleep($pingTimeout)
  427.  
  428. $startTimesSem.synchronize do
  429. $messagesSem.synchronize do
  430. if $startTimes[uniq] != nil && $messages.key?(srcUniq)
  431. STDOUT.puts("SNDMSG ERROR: HOST UNREACHABLE")
  432. $messages.delete(srcUniq)
  433. end
  434. $startTimes.delete(uniq)
  435. end
  436. end
  437. end
  438. end
  439. Thread.new do
  440. sleep($pingTimeout)
  441.  
  442. $messagesSem.synchronize { $messages.delete(srcUniq) }
  443. end
  444. else
  445. nextHopSocketPuts(src, "ACKS #{args.join(' ')}")
  446. end
  447. end
  448.  
  449. def sendm(args)
  450. src = args[0]
  451. dst = args[1]
  452. srcUniq = args[2]
  453. uniq = args[3]
  454. seqId = args[4].to_i
  455. msg = args[5..-1].join(' ')
  456.  
  457. if dst == $hostname
  458. stop = true
  459. $startTimesSem.synchronize do
  460. if $startTimes.key?(uniq)
  461. if $time - $startTimes[uniq] < $pingTimeout
  462. $startTimes[uniq] = $time
  463. stop = false
  464. end
  465. end
  466. end
  467. ackm([src, dst, srcUniq])
  468.  
  469. $messagesSem.synchronize do
  470. unless stop
  471. $messages[uniq][seqId] = msg
  472.  
  473. unless $messages[uniq].include?(nil)
  474. STDOUT.puts("SNDMSG: #{src} --> #{$messages[uniq].join('')}")
  475. stop = true
  476. end
  477. end
  478. if stop
  479. $messages.delete(uniq)
  480. $startTimesSem.synchronize { $startTimes.delete(uniq) }
  481. end
  482. end
  483.  
  484. else
  485. nextHopSocketPuts(dst, "SENDM #{args.join(' ')}")
  486. end
  487. end
  488.  
  489. def ackm(args)
  490. src = args[0]
  491. dst = args[1]
  492. uniq = args[2]
  493.  
  494. if src == $hostname
  495. $startTimesSem.synchronize { $startTimes[uniq] = nil if $startTimes.key?(uniq) && $time - $startTimes[uniq] < $pingTimeout }
  496. else
  497. nextHopSocketPuts(src, "ACKM #{args.join(' ')}")
  498. end
  499. end
  500.  
  501. def handleCommands(socket)
  502. while line = socket.gets
  503. arr = line.split(' ')
  504. cmd = arr[0]
  505. args = arr[1..-1]
  506. case cmd
  507. when "LINK"; link(args[0], socket)
  508. when "AEDGES"; aEdges(args)
  509. when "DEDGE"; dEdge(args)
  510. when "DNODES"; dNodes(args)
  511. when "EDGED"; edged(args)
  512. when "SENDP"; sendp(args)
  513. when "ACKP"; ackp(args)
  514. when "SENDT"; sendt(args)
  515. when "ACKT"; ackt(args)
  516. when "SENDS"; sends(args)
  517. when "ACKS"; acks(args)
  518. when "SENDM"; sendm(args)
  519. when "ACKM"; ackm(args)
  520. end
  521. end
  522. end
  523.  
  524. def dijkstra(src, dst)
  525. max = 2**(0.size * 8 - 2) - 1
  526. vertex = $neighbors.keys
  527. dists = Hash.new
  528. previous = Hash.new
  529. nodes = {}
  530. visited = []
  531.  
  532. return if src == dst
  533.  
  534. vertex.each do |k|
  535. if k == src
  536. dists[k] = 0
  537. nodes[k] = 0
  538. else
  539. dists[k] = max
  540. nodes[k] = max
  541. end
  542. previous[k] = nil
  543. end
  544. while nodes
  545. small = nodes.min_by { |k, v| v }
  546. name = small[0]
  547.  
  548. if dists[name] == max || name == nil || nodes.empty?
  549. break
  550. end
  551. if visited.include?(name)
  552. nodes.delete(name)
  553. next
  554. end
  555. nodes.delete(name)
  556.  
  557. $neighbors[name].each do |nei, c|
  558. tmp = dists[name] + c
  559.  
  560. if tmp < dists[nei]
  561. dists[nei] = tmp
  562. previous[nei] = name
  563. nodes[nei] = tmp
  564. end
  565. unless previous[nei] == src
  566. prev = previous[nei]
  567. while previous[prev] != src && prev != nil
  568. prev = previous[prev]
  569. end
  570. else
  571. prev = nei
  572. end
  573. $paths[nei] = Path.new(prev, dists[nei])
  574.  
  575. visited << name
  576. end
  577. if name == dst
  578. path = Array.new
  579. x = name
  580.  
  581. while previous[name] != src && previous[name]
  582. name = previous[name]
  583. end
  584. if previous[x] == src
  585. name = x
  586. end
  587. $paths[x] = Path.new(name, dists[x])
  588.  
  589. return
  590. end
  591. end
  592. end
  593.  
  594. def server()
  595. server = TCPServer.new $port
  596. loop { Thread.new (server.accept) { |sock| handleCommands(sock) } }
  597. end
  598.  
  599. def updateTime(delay)
  600. sleep(delay)
  601. $time += delay
  602. end
  603.  
  604. def synchronizeTime()
  605. $time = Time.now.to_f
  606. updateTime($updateInterval - $time % $updateInterval)
  607. end
  608.  
  609. def clock()
  610. synchronizeTime()
  611. Thread.new { pathsUpdater() }
  612. loop { updateTime($clockDelay) }
  613. end
  614.  
  615. def pathsUpdater()
  616. loop do
  617. Thread.new do
  618. $pathsSem.synchronize do
  619. $paths.clear
  620. $neighborsSem.synchronize { $neighbors.each_key { |dst| dijkstra($hostname, dst) unless dst == $hostname } }
  621. end
  622. end
  623. sleep($updateInterval)
  624. end
  625. end
  626.  
  627. # do main loop here....
  628. def main()
  629. while line = STDIN.gets
  630. line = line.strip
  631. arr = line.split(' ')
  632. cmd = arr[0]
  633. args = arr[1..-1]
  634. case cmd
  635. when "EDGEB"; edgeb(args)
  636. when "EDGED"; edged(args)
  637. when "EDGEU"; edgeu(args)
  638. when "DUMPTABLE"; dumptable(args)
  639. when "SHUTDOWN"; shutdown(args)
  640. when "STATUS"; status()
  641. when "SNDMSG"; sndmsg(args)
  642. when "PING"; ping(args)
  643. when "TRACEROUTE"; traceroute(args)
  644. when "FTP"; ftp(args)
  645. when "CIRCUIT"; circuit(args)
  646. else STDERR.puts "ERROR: INVALID COMMAND \"#{cmd}\""
  647. end
  648. end
  649. end
  650.  
  651. def setup(hostname, port, nodes, config)
  652. $hostname = hostname
  653. $port = port
  654.  
  655. # Set up ports, server, buffers.
  656. File.open(nodes, "r") do |f|
  657. f.each_line do |line|
  658. arr = line.split(',')
  659. n = arr[0]
  660. p = arr[1].to_i
  661. $ports[n] = p
  662. end
  663. f.close
  664. end
  665. File.open(config, "r") do |f|
  666. f.each_line do |line|
  667. arr = line.split('=')
  668. var = arr[0]
  669. val = arr[1].to_i
  670. case var
  671. when "updateInterval"; $updateInterval = val
  672. when "maxPayload"; $maxPayload = val
  673. when "pingTimeout"; $pingTimeout = val
  674. end
  675. end
  676. f.close
  677. end
  678. Thread.new { server() }
  679. Thread.new do
  680. synchronizeTime()
  681. Thread.new { clock() }
  682. pathsUpdater()
  683. end
  684. main()
  685. end
  686.  
  687. setup(ARGV[0], ARGV[1], ARGV[2], ARGV[3])
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement