Advertisement
Guest User

Untitled

a guest
Dec 6th, 2016
56
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 20.00 KB | None | 0 0
  1. require 'socket'
  2. require 'fileutils'
  3.  
  4. $port = nil
  5. $hostname = nil
  6.  
  7. # From config file.
  8. $updateInterval = nil
  9. $maxPayload = nil
  10. $pingTimeout = nil
  11.  
  12. # Internal clock.
  13. $time = nil
  14. $clockDelay = 0.001
  15.  
  16. # Keep track of ping times.
  17. $startTimes = Hash.new
  18. $startTimesSem = Mutex.new
  19.  
  20. # All nodes' ports, with nodes => ports..
  21. $ports = Hash.new
  22.  
  23. # All $hostname's sockets, with neighbors => sockets.
  24. $sockets = Hash.new
  25. $socketsSem = Mutex.new
  26.  
  27. # All connected nodes, with sources => neighbors => costs.
  28. $neighbors = Hash.new { |h, k| h[k] = Hash.new }
  29. $neighborsSem = Mutex.new
  30.  
  31. # All $hostname's shortest paths, with destinations => nextHops and distances.
  32. $paths = Hash.new
  33. $pathsSem = Mutex.new
  34.  
  35. # All pending messages to/from $hostname, with UIDs => fragment arrays.
  36. $messages = Hash.new
  37. $messagesSem = Mutex.new
  38.  
  39. # All filenames, with UIDs => filenames.
  40. $files = Hash.new
  41. $filesSem = Mutex.new
  42.  
  43. class Path
  44. attr_accessor :nextHop, :distance
  45.  
  46. def initialize(nextHop, distance)
  47. self.nextHop = nextHop
  48. self.distance = distance
  49. end
  50. end
  51.  
  52. # --------------------- Part 0 --------------------- #
  53. def edgeb(cmd)
  54. srcIp = cmd[0]
  55. dstIp = cmd[1]
  56. dst = cmd[2]
  57.  
  58. unless $sockets.key?(dst)
  59. socket = TCPSocket.new(dstIp, $ports[dst])
  60. link(dst, socket)
  61.  
  62. $socketsSem.synchronize { socket.write("LINK #{$hostname}\0") }
  63. Thread.new { handleCommands(socket) }
  64. end
  65. end
  66.  
  67. def dumptable(cmd)
  68. filename = cmd[0]
  69.  
  70. File.open(filename, "w") do |f|
  71. $pathsSem.synchronize do
  72. $paths.keys.sort.each do |dst|
  73. f.puts("#{$hostname},#{dst},#{$paths[dst].nextHop},#{$paths[dst].distance}") unless dst == $hostname
  74. end
  75. end
  76. f.close
  77. end
  78. end
  79.  
  80. def shutdown(cmd)
  81. $socketsSem.synchronize do
  82. $sockets.values.each do |sock|
  83. sock.write("EDGED #{$hostname}\0")
  84. sock.close
  85. end
  86. end
  87. STDOUT.flush
  88. exit(0)
  89. end
  90.  
  91. # --------------------- Part 1 --------------------- #
  92. def edged(cmd)
  93. dst = cmd[0]
  94.  
  95. exists = false
  96. $socketsSem.synchronize do
  97. if $sockets.key?(dst)
  98. exists = true
  99. $sockets[dst].close
  100. $sockets.delete(dst)
  101. end
  102. end
  103. if exists
  104. dEdge([$hostname, dst])
  105.  
  106. # Find and delete all now-unreachable nodes.
  107. reachable = [$hostname]
  108. unreachable = []
  109. i = 0
  110. $neighborsSem.synchronize do
  111. while i < reachable.length
  112. reachable |= $neighbors[reachable[i]].keys
  113. i += 1
  114. end
  115. unreachable = $neighbors.keys - reachable
  116. end
  117. dNodes(unreachable)
  118. end
  119. end
  120.  
  121. def edgeu(cmd)
  122. dst = cmd[0]
  123. cost = cmd[1]
  124.  
  125. aEdges(["#{$hostname},#{dst},#{cost}"])
  126. end
  127.  
  128. def status()
  129. STDOUT.write("Name: #{$hostname} Port: #{$port} Neighbors: #{$neighbors[$hostname].keys.sort.join(',')}\0")
  130. end
  131.  
  132. # --------------------- Part 2 --------------------- #
  133. def sendmsg(cmd)
  134. dst = cmd[0]
  135. msg = cmd[1..-1].join(' ')
  136.  
  137. uniq = 999
  138. seqId = 99999
  139. header = "M #{$hostname} #{dst} #{uniq} #{uniq} #{seqId} \0"
  140. messageSize = $maxPayload - header.bytesize
  141.  
  142. numFrags = 0
  143. $startTimesSem.synchronize do
  144. loop do
  145. uniq = rand(999).to_s
  146. break unless $startTimes.key?(uniq)
  147. end
  148. $messagesSem.synchronize do
  149. $messages[uniq] = msg.scan(/.{1,#{messageSize}}/)
  150. numFrags = $messages[uniq].size
  151. end
  152. $startTimes[uniq] = $time
  153. end
  154. sendms([$hostname, dst, numFrags, uniq])
  155.  
  156. Thread.new do
  157. sleep($pingTimeout)
  158.  
  159. $startTimesSem.synchronize do
  160. unless $startTimes[uniq] == nil
  161. STDOUT.puts("SENDMSG ERROR: HOST UNREACHABLE")
  162. $messagesSem.synchronize { $messages.delete(uniq) }
  163. end
  164. $startTimes.delete(uniq)
  165. end
  166. end
  167. end
  168.  
  169. def ping(cmd)
  170. dst = cmd[0]
  171. numPings = cmd[1].to_i
  172. delay = cmd[2].to_f
  173.  
  174. Thread.new do
  175. numPings.times do |seqId|
  176. uniq = 0
  177. $startTimesSem.synchronize do
  178. loop do
  179. uniq = rand.to_s
  180. break unless $startTimes.key?(uniq)
  181. end
  182. $startTimes[uniq] = $time
  183. end
  184. sendp([$hostname, dst, seqId, uniq])
  185.  
  186. Thread.new do
  187. sleep($pingTimeout)
  188.  
  189. $startTimesSem.synchronize do
  190. STDOUT.puts("PING ERROR: HOST UNREACHABLE") unless $startTimes[uniq] == nil
  191. $startTimes.delete(uniq)
  192. end
  193. end
  194. sleep(delay) unless seqId == numPings - 1
  195. end
  196. end
  197. end
  198.  
  199. def traceroute(cmd)
  200. dst = cmd[0]
  201.  
  202. uniq = ""
  203. $startTimesSem.synchronize { $startTimes[uniq] = $time }
  204. ackt([$hostname, dst, $hostname, 0, uniq])
  205. end
  206.  
  207. def ftp(cmd)
  208. dst = cmd[0]
  209. file = cmd[1]
  210. fpath = cmd[2]
  211.  
  212. uniq = 999
  213. seqId = 9999999
  214. header = "M #{$hostname} #{dst} #{uniq} #{uniq} #{seqId} \0"
  215.  
  216. messageSize = $maxPayload - header.bytesize
  217. $startTimesSem.synchronize do
  218. loop do
  219. uniq = rand(999).to_s
  220. break unless $startTimes.key?(uniq)
  221. end
  222. $startTimes[uniq] = nil
  223. end
  224. Thread.new do
  225. seqId = 0
  226. File.open(file, "r") do |f|
  227. $messagesSem.synchronize { $messages[uniq] = Array.new }
  228. while chunk = f.read(messageSize)
  229. $messagesSem.synchronize { $messages[uniq][seqId] = chunk }
  230. seqId += 1
  231. end
  232. f.close
  233. end
  234. numFrags = seqId
  235.  
  236. $startTimesSem.synchronize { $startTimes[uniq] = $time }
  237. sendfs([$hostname, dst, numFrags, uniq, "#{fpath}/#{file}"])
  238.  
  239. Thread.new do
  240. sleep($pingTimeout)
  241.  
  242. $startTimesSem.synchronize do
  243. unless $startTimes[uniq] == nil
  244. STDOUT.puts("SENDMSG ERROR: HOST UNREACHABLE")
  245. $messagesSem.synchronize { $messages.delete(uniq) }
  246. end
  247. $startTimes.delete(uniq)
  248. end
  249. end
  250. end
  251. end
  252.  
  253. # --------------------- Part 3 --------------------- #
  254. def circuit(cmd)
  255. STDOUT.puts("CIRCUIT: not implemented")
  256. end
  257.  
  258. # ----------------- Custom Methods ----------------- #
  259. def link(node, socket)
  260. $socketsSem.synchronize { $sockets[node] = socket }
  261. edges = "AEDGES #{$hostname},#{node},1"
  262. $neighborsSem.synchronize { $neighbors.each { |src, dsts| dsts.each { |dst, cost| edges << " #{src},#{dst},#{cost}" } } }
  263. $socketsSem.synchronize { socket.write("#{edges}\0") }
  264. end
  265.  
  266. def allSocketsPuts(line)
  267. $socketsSem.synchronize { $sockets.each_value { |sock| sock.write(line) } }
  268. end
  269.  
  270. def aEdges(edges)
  271. edgesAdded = ""
  272. edges.each do |edge|
  273. arr = edge.split(',')
  274. src = arr[0]
  275. dst = arr[1]
  276. cost = arr[2].to_i
  277.  
  278. $neighborsSem.synchronize do
  279. unless $neighbors.key?(src) && $neighbors[src].key?(dst) && $neighbors[src][dst] == cost
  280. $neighbors[src][dst] = $neighbors[dst][src] = cost
  281. edgesAdded << " #{edge}"
  282. end
  283. end
  284. end
  285. allSocketsPuts("AEDGES#{edgesAdded}\0") unless edgesAdded == ""
  286. end
  287.  
  288. def dEdge(edge)
  289. src = edge[0]
  290. dst = edge[1]
  291.  
  292. $neighborsSem.synchronize do
  293. if $neighbors.key?(src) && $neighbors[src].key?(dst)
  294. $neighbors[src].delete(dst)
  295. $neighbors.delete(src) if $neighbors[src].empty?
  296. $neighbors[dst].delete(src)
  297. $neighbors.delete(dst) if $neighbors[dst].empty?
  298.  
  299. allSocketsPuts("DEDGE #{edge.join(' ')}\0")
  300. end
  301. end
  302. end
  303.  
  304. def dNodes(nodes)
  305. nodesDeleted = ""
  306. nodes.each do |node|
  307. if $neighbors.key?(node)
  308. $neighborsSem.synchronize do
  309. $neighbors.each_value { |dsts| dsts.delete(node) }
  310. $neighbors.delete(node)
  311. end
  312. nodesDeleted << " #{node}"
  313. end
  314. end
  315. allSocketsPuts("DNODES#{nodesDeleted}\0") unless nodesDeleted == ""
  316. end
  317.  
  318. def nextHopSocketPuts(dst, line)
  319. nextHop = ""
  320. $pathsSem.synchronize { nextHop = $paths[dst].nextHop if $paths.key?(dst) }
  321. $socketsSem.synchronize { $sockets[nextHop].write(line) if $sockets.key?(nextHop) }
  322. end
  323.  
  324. def sendp(ping)
  325. src = ping[0]
  326. dst = ping[1]
  327. seqId = ping[2].to_i
  328. uniq = ping[3]
  329.  
  330. dst == $hostname ? ackp(ping) : nextHopSocketPuts(dst, "SENDP #{ping.join(' ')}\0")
  331. end
  332.  
  333. def ackp(ping)
  334. src = ping[0]
  335. dst = ping[1]
  336. seqId = ping[2].to_i
  337. uniq = ping[3]
  338.  
  339. if src == $hostname
  340. $startTimesSem.synchronize do
  341. if $startTimes.key?(uniq)
  342. roundTripTime = $time - $startTimes[uniq]
  343.  
  344. if roundTripTime < $pingTimeout
  345. $startTimes[uniq] = nil
  346. STDOUT.puts("#{seqId} #{dst} #{roundTripTime.round(3)}")
  347. end
  348. end
  349. end
  350. else
  351. nextHopSocketPuts(src, "ACKP #{ping.join(' ')}\0")
  352. end
  353. end
  354.  
  355. def sendt(trace)
  356. src = trace[0]
  357. dst = trace[1]
  358. hops = trace[2].to_i
  359. uniq = trace[3]
  360.  
  361. if hops == 0
  362. ackt([src, dst, $hostname, hops, uniq])
  363. else
  364. trace[2] = hops - 1
  365. nextHopSocketPuts(dst, "SENDT #{trace.join(' ')}\0")
  366. end
  367. end
  368.  
  369. def ackt(trace)
  370. src = trace[0]
  371. dst = trace[1]
  372. hostId = trace[2]
  373. hopCount = trace[3].to_i
  374. uniq = trace[4]
  375.  
  376. if src == $hostname
  377. sendTrace = false
  378. $startTimesSem.synchronize do
  379. if $startTimes.key?(uniq)
  380. roundTripTime = hostId == src ? 0.0 : ($time - $startTimes[uniq])
  381.  
  382. if roundTripTime < $pingTimeout
  383. $startTimes[uniq] = nil
  384. STDOUT.puts("#{hopCount} #{hostId} #{(roundTripTime / 2).round(3)}")
  385.  
  386. unless hostId == dst
  387. sendTrace = true
  388. loop do
  389. uniq = rand.to_s
  390. break unless $startTimes.key?(uniq)
  391. end
  392. $startTimes[uniq] = $time
  393. end
  394. end
  395. end
  396. end
  397. if sendTrace
  398. sendt([src, dst, hopCount + 1, uniq])
  399.  
  400. Thread.new do
  401. sleep($pingTimeout)
  402.  
  403. $startTimesSem.synchronize do
  404. STDOUT.puts("TIMEOUT ON #{hopCount + 1}") unless $startTimes[uniq] == nil
  405. $startTimes.delete(uniq)
  406. end
  407. end
  408. end
  409. else
  410. trace[3] = hopCount + 1
  411. nextHopSocketPuts(src, "ACKT #{trace.join(' ')}\0")
  412. end
  413. end
  414.  
  415. def sendms(args)
  416. src = args[0]
  417. dst = args[1]
  418. numFrags = args[2].to_i
  419. srcUniq = args[3]
  420.  
  421. if dst == $hostname
  422. uniq = 0
  423. $messagesSem.synchronize do
  424. loop do
  425. uniq = rand(999).to_s
  426. break unless $messages.key?(uniq)
  427. end
  428. $messages[uniq] = Array.new(numFrags)
  429. $startTimesSem.synchronize { $startTimes[uniq] = $time }
  430. end
  431. ackms([src, dst, srcUniq, uniq])
  432. STDERR.puts("out of acks")
  433. Thread.new do
  434. sleep($pingTimeout)
  435. STDERR.puts("after pingTimeout")
  436. $startTimesSem.synchronize do
  437. STDERR.puts("$startTimes.key?(uniq) = #{$startTimes.key?(uniq)}")
  438. STDERR.puts("($time - $startTimes[uniq]) = #{$time - $startTimes[uniq]}")
  439. unless $startTimes.key?(uniq) && ($time - $startTimes[uniq]) < $pingTimeout
  440. STDERR.puts("deleting")
  441. $startTimes.delete(uniq)
  442. $messagesSem.synchronize { $messages.delete(uniq) }
  443. end
  444. end
  445. end
  446. else
  447. nextHopSocketPuts(dst, "SENDMS #{args.join(' ')}\0")
  448. end
  449. end
  450.  
  451. def ackms(args)
  452. src = args[0]
  453. dst = args[1]
  454. srcUniq = args[2]
  455. dstUniq = args[3]
  456.  
  457. if src == $hostname
  458. $startTimesSem.synchronize { $startTimes.key?(srcUniq) && ($time - $startTimes[srcUniq]) < $pingTimeout ? $startTimes[srcUniq] = nil : return }
  459. numFrags = 0
  460. $messagesSem.synchronize { numFrags = $messages[srcUniq].size }
  461.  
  462. numFrags.times do |seqId|
  463. uniq = 0
  464. $messagesSem.synchronize do
  465. $startTimesSem.synchronize do
  466. loop do
  467. uniq = rand(999).to_s
  468. break unless $startTimes.key?(uniq)
  469. end
  470. $startTimes[uniq] = $time
  471. end
  472. m([src, dst, uniq, dstUniq, seqId, $messages[srcUniq][seqId]])
  473. end
  474. Thread.new do
  475. sleep($pingTimeout)
  476.  
  477. $startTimesSem.synchronize do
  478. $messagesSem.synchronize do
  479. if $startTimes[uniq] != nil && $messages.key?(srcUniq)
  480. STDOUT.puts("SNDMSG ERROR: HOST UNREACHABLE")
  481. $messages.delete(srcUniq)
  482. end
  483. $startTimes.delete(uniq)
  484. end
  485. end
  486. end
  487. end
  488. Thread.new do
  489. sleep($pingTimeout)
  490.  
  491. $messagesSem.synchronize { $messages.delete(srcUniq) }
  492. end
  493. else
  494. nextHopSocketPuts(src, "ACKMS #{args.join(' ')}\0")
  495. end
  496. end
  497.  
  498. def m(args)
  499. src = args[0]
  500. dst = args[1]
  501. srcUniq = args[2]
  502. uniq = args[3]
  503. seqId = args[4].to_i
  504. msg = args[5]
  505.  
  506. if dst == $hostname
  507. stop = true
  508. $startTimesSem.synchronize do
  509. if $startTimes.key?(uniq)
  510. if $time - $startTimes[uniq] < $pingTimeout
  511. $startTimes[uniq] = $time
  512. stop = false
  513. end
  514. end
  515. end
  516. ackm([src, dst, srcUniq])
  517.  
  518. $messagesSem.synchronize do
  519. unless stop
  520. $messages[uniq][seqId] = msg
  521.  
  522. unless $messages[uniq].include?(nil)
  523. STDOUT.puts("SENDMSG: #{src} --> #{$messages[uniq].join('')}")
  524. stop = true
  525. end
  526. end
  527. if stop
  528. $messages.delete(uniq)
  529. $startTimesSem.synchronize { $startTimes.delete(uniq) }
  530. end
  531. end
  532. else
  533. nextHopSocketPuts(dst, "M #{args.join(' ')}\0")
  534. end
  535. end
  536.  
  537. def ackm(args)
  538. src = args[0]
  539. dst = args[1]
  540. uniq = args[2]
  541.  
  542. if src == $hostname
  543. $startTimesSem.synchronize { $startTimes[uniq] = nil if $startTimes.key?(uniq) && $time - $startTimes[uniq] < $pingTimeout }
  544. else
  545. nextHopSocketPuts(src, "ACKM #{args.join(' ')}\0")
  546. end
  547. end
  548.  
  549. def sendfs(args)
  550. src = args[0]
  551. dst = args[1]
  552. numFrags = args[2].to_i
  553. srcUniq = args[3]
  554. file = args[4]
  555.  
  556. if dst == $hostname
  557. uniq = 0
  558. $messagesSem.synchronize do
  559. loop do
  560. uniq = rand(999).to_s
  561. break unless $messages.key?(uniq)
  562. end
  563. $messages[uniq] = Array.new(numFrags)
  564. $filesSem.synchronize { $files[uniq] = file }
  565. $startTimesSem.synchronize { $startTimes[uniq] = $time }
  566. end
  567. ackfs([src, dst, srcUniq, uniq])
  568. STDERR.puts("out of acks")
  569. Thread.new do
  570. sleep($pingTimeout)
  571. STDERR.puts("after pingTimeout")
  572. $startTimesSem.synchronize do
  573. STDERR.puts("$startTimes.key?(uniq) = #{$startTimes.key?(uniq)}")
  574. STDERR.puts("($time - $startTimes[uniq]) = #{$time - $startTimes[uniq]}")
  575. unless $startTimes.key?(uniq) && ($time - $startTimes[uniq]) < $pingTimeout
  576. STDERR.puts("deleting")
  577. $startTimes.delete(uniq)
  578. $messagesSem.synchronize { $messages.delete(uniq) }
  579. $filesSem.synchronize { $files.delete(uniq) }
  580. end
  581. end
  582. end
  583. else
  584. nextHopSocketPuts(dst, "SENDFS #{args.join(' ')}\0")
  585. end
  586. end
  587.  
  588. def ackfs(args)
  589. src = args[0]
  590. dst = args[1]
  591. srcUniq = args[2]
  592. dstUniq = args[3]
  593.  
  594. if src == $hostname
  595. $startTimesSem.synchronize { $startTimes.key?(srcUniq) && ($time - $startTimes[srcUniq]) < $pingTimeout ? $startTimes[srcUniq] = nil : return }
  596. numFrags = 0
  597. $messagesSem.synchronize { numFrags = $messages[srcUniq].size }
  598.  
  599. numFrags.times do |seqId|
  600. uniq = 0
  601. $messagesSem.synchronize do
  602. $startTimesSem.synchronize do
  603. loop do
  604. uniq = rand(999).to_s
  605. break unless $startTimes.key?(uniq)
  606. end
  607. $startTimes[uniq] = $time
  608. end
  609. f([src, dst, uniq, dstUniq, seqId, $messages[srcUniq][seqId]])
  610. end
  611. Thread.new do
  612. sleep($pingTimeout)
  613.  
  614. $startTimesSem.synchronize do
  615. $messagesSem.synchronize do
  616. if $startTimes[uniq] != nil && $messages.key?(srcUniq)
  617. STDOUT.puts("FTP ERROR: HOST UNREACHABLE")
  618. $messages.delete(srcUniq)
  619. end
  620. $startTimes.delete(uniq)
  621. end
  622. end
  623. end
  624. end
  625. Thread.new do
  626. sleep($pingTimeout)
  627.  
  628. $messagesSem.synchronize { $messages.delete(srcUniq) }
  629. end
  630. else
  631. nextHopSocketPuts(src, "ACKFS #{args.join(' ')}\0")
  632. end
  633. end
  634.  
  635. def f(args)
  636. src = args[0]
  637. dst = args[1]
  638. srcUniq = args[2]
  639. uniq = args[3]
  640. seqId = args[4].to_i
  641. msg = args[5]
  642. STDERR.puts("in f")
  643. if dst == $hostname
  644. stop = true
  645. $startTimesSem.synchronize do
  646. if $startTimes.key?(uniq)
  647. if $time - $startTimes[uniq] < $pingTimeout
  648. $startTimes[uniq] = $time
  649. stop = false
  650. end
  651. end
  652. end
  653. ackf([src, dst, srcUniq])
  654.  
  655. success = false
  656. $messagesSem.synchronize do
  657. unless stop
  658. $messages[uniq][seqId] = msg
  659. success = !$messages[uniq].include?(nil)
  660. end
  661. if stop
  662. $messages.delete(uniq)
  663. $filesSem.synchronize { $files.delete(uniq) }
  664. $startTimesSem.synchronize { $startTimes.delete(uniq) }
  665. end
  666. end
  667. if success
  668. file = ""
  669. $filesSem.synchronize { file = $files[uniq] }
  670. STDERR.puts(file)
  671. STDERR.puts(File.dirname(file))
  672. FileUtils.mkdir_p(File.dirname(file))
  673. Thread.new do
  674. File.open(file, "w") do |f|
  675. $messagesSem.synchronize { $messages[uniq].each { |chunk| f.print(chunk) } }
  676. f.close
  677. STDERR.puts("after close")
  678. end
  679. STDOUT.puts("FTP: #{src} --> #{file}")
  680. end
  681. end
  682. else
  683. nextHopSocketPuts(dst, "F #{args.join(' ')}\0")
  684. end
  685. end
  686.  
  687. def ackf(args)
  688. src = args[0]
  689. dst = args[1]
  690. uniq = args[2]
  691.  
  692. if src == $hostname
  693. $startTimesSem.synchronize { $startTimes[uniq] = nil if $startTimes.key?(uniq) && ($time - $startTimes[uniq]) < $pingTimeout }
  694. else
  695. nextHopSocketPuts(src, "ACKF #{args.join(' ')}\0")
  696. end
  697. end
  698.  
  699. def handleCommands(socket)
  700. while line = socket.gets("\0").chomp("\0")
  701. #STDERR.puts(line)
  702. line.chomp!
  703. arr = line.split(' ')
  704. cmd = arr[0]
  705. args = arr[1..-1]
  706. case cmd
  707. when "LINK"; link(args[0], socket)
  708. when "AEDGES"; aEdges(args)
  709. when "DEDGE"; dEdge(args)
  710. when "DNODES"; dNodes(args)
  711. when "EDGED"; edged(args)
  712. when "SENDP"; sendp(args)
  713. when "ACKP"; ackp(args)
  714. when "SENDT"; sendt(args)
  715. when "ACKT"; ackt(args)
  716. when "SENDMS"; sendms(args)
  717. when "ACKMS"; ackms(args)
  718. when "M"; m(line.split(/ /, 7)[1..-1])
  719. when "ACKM"; ackm(args)
  720. when "SENDFS"; sendfs(args)
  721. when "ACKFS"; ackfs(args)
  722. when "F"; f(line.split(/ /, 7)[1..-1])
  723. when "ACKF"; ackf(args)
  724. end
  725. end
  726. end
  727.  
  728. def dijkstra(src, dst)
  729. max = 2**(0.size * 8 - 2) - 1
  730. vertex = $neighbors.keys
  731. dists = Hash.new
  732. previous = Hash.new
  733. nodes = {}
  734. visited = []
  735.  
  736. return if src == dst
  737.  
  738. vertex.each do |k|
  739. if k == src
  740. dists[k] = 0
  741. nodes[k] = 0
  742. else
  743. dists[k] = max
  744. nodes[k] = max
  745. end
  746. previous[k] = nil
  747. end
  748. while nodes
  749. small = nodes.min_by { |k, v| v }
  750. name = small[0]
  751.  
  752. if dists[name] == max || name == nil || nodes.empty?
  753. break
  754. end
  755. if visited.include?(name)
  756. nodes.delete(name)
  757. next
  758. end
  759. nodes.delete(name)
  760.  
  761. $neighbors[name].each do |nei, c|
  762. tmp = dists[name] + c
  763.  
  764. if tmp < dists[nei]
  765. dists[nei] = tmp
  766. previous[nei] = name
  767. nodes[nei] = tmp
  768. end
  769. unless previous[nei] == src
  770. prev = previous[nei]
  771. while previous[prev] != src && prev != nil
  772. prev = previous[prev]
  773. end
  774. else
  775. prev = nei
  776. end
  777. $paths[nei] = Path.new(prev, dists[nei])
  778.  
  779. visited << name
  780. end
  781. if name == dst
  782. path = Array.new
  783. x = name
  784.  
  785. while previous[name] != src && previous[name]
  786. name = previous[name]
  787. end
  788. if previous[x] == src
  789. name = x
  790. end
  791. $paths[x] = Path.new(name, dists[x])
  792.  
  793. return
  794. end
  795. end
  796. end
  797.  
  798. def server()
  799. server = TCPServer.new $port
  800. loop { Thread.new (server.accept) { |sock| handleCommands(sock) } }
  801. end
  802.  
  803. def updateTime(delay)
  804. sleep(delay)
  805. $time += delay
  806. end
  807.  
  808. def synchronizeTime()
  809. $time = Time.now.to_f
  810. updateTime($updateInterval - $time % $updateInterval)
  811. end
  812.  
  813. def clock()
  814. synchronizeTime()
  815. Thread.new { pathsUpdater() }
  816. loop { updateTime($clockDelay) }
  817. end
  818.  
  819. def pathsUpdater()
  820. loop do
  821. Thread.new do
  822. $pathsSem.synchronize do
  823. $paths.clear
  824. $neighborsSem.synchronize { $neighbors.each_key { |dst| dijkstra($hostname, dst) unless dst == $hostname } }
  825. end
  826. end
  827. sleep($updateInterval)
  828. end
  829. end
  830.  
  831. # do main loop here....
  832. def main()
  833. while line = STDIN.gets
  834. #STDERR.puts("#{$hostname}: #{line}")
  835. line = line.strip
  836. arr = line.split(' ')
  837. cmd = arr[0]
  838. args = arr[1..-1]
  839. case cmd
  840. when "EDGEB"; edgeb(args)
  841. when "EDGED"; edged(args)
  842. when "EDGEU"; edgeu(args)
  843. when "DUMPTABLE"; dumptable(args)
  844. when "SHUTDOWN"; shutdown(args)
  845. when "STATUS"; status()
  846. when "SENDMSG"; sendmsg(args)
  847. when "PING"; ping(args)
  848. when "TRACEROUTE"; traceroute(args)
  849. when "FTP"; ftp(args)
  850. when "CIRCUIT"; circuit(args)
  851. else STDERR.puts "ERROR: INVALID COMMAND \"#{cmd}\""
  852. end
  853. end
  854. end
  855.  
  856. def setup(hostname, port, nodes, config)
  857. $hostname = hostname
  858. $port = port
  859.  
  860. # Set up ports, server, buffers.
  861. File.open(nodes, "r") do |f|
  862. f.each_line do |line|
  863. arr = line.split(',')
  864. n = arr[0]
  865. p = arr[1].to_i
  866. $ports[n] = p
  867. end
  868. f.close
  869. end
  870. File.open(config, "r") do |f|
  871. f.each_line do |line|
  872. arr = line.split('=')
  873. var = arr[0]
  874. val = arr[1].to_i
  875. case var
  876. when "updateInterval"; $updateInterval = val
  877. when "maxPayload"; $maxPayload = val
  878. when "pingTimeout"; $pingTimeout = val
  879. end
  880. end
  881. f.close
  882. end
  883. Thread.new { server() }
  884. Thread.new do
  885. synchronizeTime()
  886. Thread.new { clock() }
  887. pathsUpdater()
  888. end
  889. Thread.new do
  890. loop do
  891. $startTimesSem.synchronize { STDOUT.puts("#{$startTimes}") }
  892. sleep(0.01)
  893. end
  894. end
  895. main()
  896. end
  897.  
  898. setup(ARGV[0], ARGV[1], ARGV[2], ARGV[3])
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement