Advertisement
Guest User

Untitled

a guest
Dec 8th, 2016
75
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 28.33 KB | None | 0 0
  1. require 'socket'
  2. require 'fileutils'
  3. require 'zlib'
  4.  
  5. $port = nil
  6. $hostname = nil
  7.  
  8. # From config file.
  9. $updateInterval = nil
  10. $maxPayload = nil
  11. $pingTimeout = nil
  12.  
  13. # Internal clock.
  14. $time = nil
  15. $clockDelay = 0.001
  16.  
  17. # Keep track of ping times.
  18. $startTimes = Hash.new
  19. $startTimesSem = Mutex.new
  20.  
  21. # All nodes' ports, with nodes => ports..
  22. $ports = Hash.new
  23.  
  24. # All $hostname's sockets, with neighbors => sockets.
  25. $sockets = Hash.new
  26. $socketsSem = Mutex.new
  27.  
  28. # All connected nodes, with sources => neighbors => costs.
  29. $neighbors = Hash.new { |h, k| h[k] = Hash.new }
  30. $neighborsSem = Mutex.new
  31.  
  32. #circuits circuit id -> array of nodes
  33. $circuits= Hash.new
  34. $circuitSem= Mutex.new
  35. $circuitInfo=Hash.new{ |h, k| h[k] = Array.new }
  36. $circuitInfoSem=Mutex.new
  37.  
  38. $washere=Hash.new
  39. $washerSem=Mutex.new
  40.  
  41. $circuitPath= Hash.new { |h, k| h[k] = Hash.new }
  42. $circuitPathSem= Mutex.new
  43.  
  44. # All $hostname's shortest paths, with destinations => nextHops and distances.
  45. $paths = Hash.new
  46. $pathsSem = Mutex.new
  47.  
  48. # All pending messages to/from $hostname, with UIDs => fragment arrays.
  49. $messages = Hash.new
  50. $messagesSem = Mutex.new
  51.  
  52. # All filenames, with UIDs => filenames.
  53. $files = Hash.new
  54. $filesSem = Mutex.new
  55.  
  56. class Path
  57. attr_accessor :nextHop, :distance
  58.  
  59. def initialize(nextHop, distance)
  60. self.nextHop = nextHop
  61. self.distance = distance
  62. end
  63. end
  64.  
  65. # --------------------- Part 0 --------------------- #
  66. def edgeb(cmd)
  67. srcIp = cmd[0]
  68. dstIp = cmd[1]
  69. dst = cmd[2]
  70.  
  71. unless $sockets.key?(dst)
  72. socket = TCPSocket.new(dstIp, $ports[dst])
  73. link(dst, socket)
  74.  
  75. $socketsSem.synchronize { socket.write("LINK #{$hostname}\0") }
  76. Thread.new { handleCommands(socket) }
  77. end
  78. end
  79.  
  80. def dumptable(cmd)
  81. filename = cmd[0]
  82.  
  83. File.open(filename, "w") do |f|
  84. $pathsSem.synchronize do
  85. $paths.keys.sort.each do |dst|
  86. f.puts("#{$hostname},#{dst},#{$paths[dst].nextHop},#{$paths[dst].distance}") unless dst == $hostname
  87. end
  88. end
  89. f.close
  90. end
  91. end
  92.  
  93. def shutdown(cmd)
  94. $socketsSem.synchronize do
  95. $sockets.values.each do |sock|
  96. sock.write("EDGED #{$hostname}\0")
  97. sock.close
  98. end
  99. end
  100. STDOUT.flush
  101. exit(0)
  102. end
  103.  
  104. # --------------------- Part 1 --------------------- #
  105. def edged(cmd)
  106. dst = cmd[0]
  107.  
  108. exists = false
  109. $socketsSem.synchronize do
  110. if $sockets.key?(dst)
  111. exists = true
  112. $sockets[dst].close
  113. $sockets.delete(dst)
  114. end
  115. end
  116. if exists
  117. dEdge([$hostname, dst])
  118.  
  119. # Find and delete all now-unreachable nodes.
  120. reachable = [$hostname]
  121. unreachable = []
  122. i = 0
  123. $neighborsSem.synchronize do
  124. while i < reachable.length
  125. reachable |= $neighbors[reachable[i]].keys
  126. i += 1
  127. end
  128. unreachable = $neighbors.keys - reachable
  129. end
  130. dNodes(unreachable)
  131. end
  132. end
  133.  
  134. def edgeu(cmd)
  135. dst = cmd[0]
  136. cost = cmd[1]
  137.  
  138. aEdges(["#{$hostname},#{dst},#{cost}"])
  139. end
  140.  
  141. def status()
  142. STDOUT.write("Name: #{$hostname} Port: #{$port} Neighbors: #{$neighbors[$hostname].keys.sort.join(',')}\0")
  143. end
  144.  
  145. # --------------------- Part 2 --------------------- #
  146. def sendmsg(cmd)
  147. dst = cmd[0]
  148. msg = cmd[1..-1].join(' ')
  149.  
  150. uniq = 999
  151. seqId = 99999
  152. header = "M #{$hostname} #{dst} #{uniq} #{uniq} #{seqId} \0"
  153. messageSize = $maxPayload - header.bytesize
  154.  
  155. numFrags = 0
  156. $startTimesSem.synchronize do
  157. loop do
  158. uniq = rand(999).to_s
  159. break unless $startTimes.key?(uniq)
  160. end
  161. $messagesSem.synchronize do
  162. $messages[uniq] = msg.scan(/.{1,#{messageSize}}/)
  163. numFrags = $messages[uniq].size
  164. end
  165. $startTimes[uniq] = $time
  166. end
  167. sendms([$hostname, dst, numFrags, uniq])
  168.  
  169. Thread.new do
  170. sleep($pingTimeout)
  171.  
  172. $startTimesSem.synchronize do
  173. unless $startTimes[uniq] == nil
  174. STDOUT.puts("SENDMSG ERROR: HOST UNREACHABLE")
  175. $messagesSem.synchronize { $messages.delete(uniq) }
  176. end
  177. $startTimes.delete(uniq)
  178. end
  179. end
  180. end
  181.  
  182. def ping(cmd)
  183. dst = cmd[0]
  184. numPings = cmd[1].to_i
  185. delay = cmd[2].to_f
  186.  
  187. Thread.new do
  188. numPings.times do |seqId|
  189. uniq = 0
  190. $startTimesSem.synchronize do
  191. loop do
  192. uniq = rand.to_s
  193. break unless $startTimes.key?(uniq)
  194. end
  195. $startTimes[uniq] = $time
  196. end
  197. sendp([$hostname, dst, seqId, uniq])
  198.  
  199. Thread.new do
  200. sleep($pingTimeout)
  201.  
  202. $startTimesSem.synchronize do
  203. STDOUT.puts("PING ERROR: HOST UNREACHABLE") unless $startTimes[uniq] == nil
  204. $startTimes.delete(uniq)
  205. end
  206. end
  207. sleep(delay) unless seqId == numPings - 1
  208. end
  209. end
  210. end
  211.  
  212. def traceroute(cmd)
  213. dst = cmd[0]
  214.  
  215. uniq = ""
  216. $startTimesSem.synchronize { $startTimes[uniq] = $time }
  217. ackt([$hostname, dst, $hostname, 0, uniq])
  218. end
  219.  
  220. def ftp(cmd)
  221. =begin
  222. dst = cmd[0]
  223. file = cmd[1]
  224. fpath = cmd[2]
  225.  
  226. uniq = 9999999
  227. seqId = 9999999
  228. header = "M #{$hostname} #{dst} #{uniq} #{uniq} #{seqId} \0"
  229.  
  230. messageSize = $maxPayload - header.bytesize
  231. $startTimesSem.synchronize do
  232. loop do
  233. uniq = rand(9999).to_s
  234. break unless $startTimes.key?(uniq)
  235. end
  236. $startTimes[uniq] = nil
  237. end
  238. Thread.new do
  239. numFrags = 0
  240. inFile = File.open(file, "r")
  241. compress = Zlib::Deflate.deflate(inFile.read)
  242. #STDERR.puts("#{compress}")
  243.  
  244. inFile.close
  245.  
  246. $messagesSem.synchronize { $messages[uniq] = Array.new
  247. $messages[uniq]=compress.scan(/.{1,#{messageSize}}/)
  248. #STDERR.puts("#{$messages[uniq]}")
  249. STDERR.puts("srcUniq1 #{uniq}")
  250. l=messages[uniq].join
  251. f1=File.open("out1","w")
  252. f1.write(l)
  253. f1.close
  254. numFrags = $messages[uniq].size}
  255. STDERR.puts(numFrags)
  256. }
  257.  
  258. #do |f|
  259. # $messagesSem.synchronize { $messages[uniq] = Array.new }
  260. # while chunk = f.read(messageSize)
  261. # $messagesSem.synchronize { $messages[uniq][seqId] = chunk }
  262. # seqId += 1
  263. # end
  264.  
  265. #end
  266.  
  267. STDERR.puts(numFrags)
  268. $startTimesSem.synchronize { $startTimes[uniq] = $time }
  269. sendfs([$hostname, dst, numFrags, uniq, "#{fpath}/#{file}"])
  270.  
  271. Thread.new do
  272. sleep($pingTimeout)
  273.  
  274. $startTimesSem.synchronize do
  275. unless $startTimes[uniq] == nil
  276. STDOUT.puts("SENDMSG ERROR: HOST UNREACHABLE")
  277. $messagesSem.synchronize { $messages.delete(uniq) }
  278. end
  279. $startTimes.delete(uniq)
  280. end
  281. end
  282. end
  283. =end
  284. end
  285.  
  286. # --------------------- Part 3 --------------------- #
  287. def circuitb(cmd)
  288. id = cmd[0]
  289. dst= cmd[1]
  290. circuitNodes = cmd[2]
  291.  
  292. #$circuitSem.synchronize do
  293. # if $circuits.key?id
  294. # STDOUT.puts("CIRCUIT ERROR: #{$hostname}-/->#{$hostname} 1 ")
  295. # return
  296. # else
  297. createCircuit([id,$hostname,dst,circuitNodes])
  298. # end
  299. #end
  300.  
  301.  
  302. end
  303. def circuitd(cmd)
  304. id=cmd[0]
  305. $circuitSem.synchronize do
  306. unless $circuits.key?id
  307. STDOUT.puts("CIRCUIT ERROR: BADID -/-> BADID FAILED AT BADID")
  308. return
  309. end
  310. end
  311. $neighborsSem.synchronize do
  312. x=$circuits[id].keys
  313. x.each{|n|
  314. unless $neighbors.key?n
  315. STDOUT.puts("CIRCUIT ERROR: #{$circuitInfo[id][0]} -/-> #{$circuitInfo[id][0]} FAILED AT #{$circuitInfo[id][2]}")
  316. return
  317. end
  318. }
  319. end
  320. if $circuitInfo[id][0] == $hostname
  321. removeCircuit([1,"jaghool",id,$circuitInfo[id][0],$circuitInfo[id][1],$circuitInfo[id][2].join(",")])
  322. else
  323. return
  324. end
  325.  
  326.  
  327. end
  328.  
  329. # ----------------- Custom Methods ----------------- #
  330. def createCircuit(info)
  331.  
  332.  
  333. id = info.shift
  334. src= info.shift
  335. dst= info.shift
  336. if info[0].size>1
  337. cnodes=info[0].split(",")
  338. else
  339. cnodes=[info[0]]
  340. end
  341. info=[id,src,dst,info[0]]
  342. #STDOUT.puts( "info #{info}")
  343.  
  344. cnodes<< src
  345. cnodes<<dst
  346. tmpHash=Hash.new {|hash, key| hash[key] =Hash.new }
  347. cnodes.each{|n|
  348. $neighborsSem.synchronize do
  349.  
  350. if !$neighbors.key?n
  351. STDOUT.puts("CIRCUIT ERROR: #{$hostname}-/->#{dst} FAILED AT #{n} 1") if src ==$hostname
  352. return
  353. else
  354. hasedge=false
  355. cnodes.each{|neighs|
  356. if $neighbors[n].key?neighs
  357.  
  358. hasedge =true
  359. tmpHash[n][neighs]=1
  360. end
  361. }
  362. if !hasedge
  363.  
  364. STDOUT.puts("CIRCUIT ERROR: #{$hostname}-/->#{dst} FAILED AT #{n} 2") if src ==$hostname
  365. return
  366. end
  367. end
  368. end
  369. }
  370.  
  371. $circuitSem.synchronize {
  372. cnodes.delete(src)
  373. cnodes.delete(dst)
  374. if !$circuits.key?id
  375. $circuits[id]= tmpHash
  376. $circuitInfo[id]=[src,dst,cnodes]
  377. allSocketsPutsCircuit(id , "CREATEC #{info.join(' ')}\0")
  378.  
  379. $circuits[id].each_key { |dst| dijkstraCircuit($hostname, dst,id) unless dst == $hostname }
  380. #STDOUT.puts("in the making #{$circuits} and the path is #{$circuitPath}")
  381. if $hostname == src
  382. STDOUT.puts("CIRCUITB #{id} --> #{dst} over #{cnodes.join(",")}")
  383. end
  384. if $hostname == dst
  385. STDOUT.puts("CIRCUITB #{src}/#{id} --> #{dst} over #{cnodes.join(",")}")
  386. end
  387.  
  388. elsif $circuitInfo[id][2] != cnodes || $circuitInfo[id][0] != src || $circuitInfo[id][1] != dst
  389.  
  390. STDOUT.puts("I somehow got here")
  391. removeCircuit([0,$hostname,id,src,dst,cnodes.join(',')])
  392. #
  393.  
  394. end
  395. STDOUT.puts("i got done #{$circuits}")
  396. #STDERR.puts("here 2 #{$circuits}")
  397. }
  398.  
  399. end
  400. def removeCircuit(info)
  401. isDelete=info.shift.to_i
  402. exnode=info.shift
  403. id = info.shift
  404. src= info.shift
  405. dst= info.shift
  406. if info[0].size>1
  407. cnodes=info[0].split(",")
  408. else
  409. cnodes=[info[0]]
  410. end
  411. STDOUT.puts "#{$hostname} #{id} in remove c"
  412. if $washere[id]
  413. STDOUT.puts "#{$hostname} #{id}"
  414. return
  415. end
  416. info=[isDelete,exnode,id,src,dst,info[0]]
  417.  
  418. if $circuitInfo[id][2] != cnodes || $circuitInfo[id][0] != src || $circuitInfo[id][1] != dst
  419. $washerSem.synchronize{
  420. $washere[id]=true
  421. }
  422. if isDelete == 1
  423. exnode=$hostname
  424. info=[isDelete,exnode,id,src,dst,cnodes.join(",")]
  425. end
  426. $socketsSem.synchronize do
  427. cnodes<< src
  428. cnodes<<dst
  429. cnodes.each{|n|
  430. if $sockets.key?n
  431. $sockets[n].write("REMOVEC #{info.join(' ')}\0")
  432. end
  433. }
  434. end
  435.  
  436. if src==$hostname && isDelete==0
  437. STDOUT.puts("CIRCUIT ERROR: #{$hostname}-/->#{dst} FAILED AT #{exnode} ")
  438. end
  439. STDOUT.puts "from not equal part #{$circuits}"
  440. return
  441.  
  442. end
  443. if $circuitInfo[id][2] == cnodes && $circuitInfo[id][0] == src && $circuitInfo[id][1] == dst
  444. $washerSem.synchronize{
  445. $washere[id]=true
  446. }
  447.  
  448.  
  449. $socketsSem.synchronize do
  450.  
  451. cnodes<< src
  452. cnodes<<dst
  453. STDOUT.puts("in #{$hostname} 5 info #{info.join(' ')}")
  454. cnodes.each{|n|
  455.  
  456. if $sockets.key?n
  457.  
  458. $sockets[n].write("REMOVEC #{info.join(' ')}\0")
  459.  
  460. end
  461.  
  462. }
  463.  
  464. end
  465.  
  466. if src==$hostname
  467.  
  468. Thread.new do
  469. sleep(2)
  470. v=$circuits[id].keys
  471. v.each{|x|
  472.  
  473. setHeretoTrue([id,x])}
  474. end
  475. end
  476.  
  477. $circuits.delete(id)
  478.  
  479. $circuitPathSem.synchronize{
  480. $circuitPath.delete(id)
  481.  
  482. }
  483. $circuitInfoSem.synchronize{
  484. $circuitInfo.delete(id)}
  485. if src==$hostname && isDelete == 1
  486. if exnode != "jaghool"
  487. STDOUT.puts("CIRCUIT ERROR: #{$hostname}-/->#{dst} FAILED AT #{exnode} ")
  488. else
  489. cnodes.delete(src)
  490. cnodes.delete(dst)
  491. STDOUT.puts("CIRCUITD #{id} --> #{dst} over #{cnodes.join(",")}")
  492. end
  493. end
  494. if src==$hostname && isDelete == 0
  495. STDOUT.puts("CIRCUIT ERROR: #{$hostname}-/->#{dst} FAILED AT #{exnode} ")
  496. end
  497. STDOUT.puts "done in removeCircuit from equal part #{$circuits} "
  498. return
  499. end
  500.  
  501. end
  502.  
  503. def setHeretoTrue(info)
  504. id = info[0]
  505. dst = info[1]
  506. dst == $hostname ? $washerSem.synchronize{$washere[id]=false }: nextHopSocketPuts(dst, "SETHERE #{info.join(' ')}\0")
  507. end
  508. def link(node, socket)
  509. $socketsSem.synchronize { $sockets[node] = socket }
  510. edges = "AEDGES #{$hostname},#{node},1"
  511. $neighborsSem.synchronize { $neighbors.each { |src, dsts| dsts.each { |dst, cost| edges << " #{src},#{dst},#{cost}" } } }
  512. $socketsSem.synchronize { socket.write("#{edges}\0") }
  513. end
  514.  
  515. def allSocketsPuts(line)
  516. $socketsSem.synchronize { $sockets.each_value { |sock| sock.write(line) } }
  517. end
  518. def allSocketsPutsCircuit(id,line)
  519. $socketsSem.synchronize do
  520. #$circuitsSem.synchronize do
  521. $circuits[id][$hostname].each{|k,v|
  522. #STDOUT.puts("#{line} k #{k}, v #{v}, #{$sockets[k]}, #{$sockets} ")
  523. $sockets[k].write(line)}
  524. STDOUT.puts("after write")
  525. end
  526. # end
  527. end
  528.  
  529. def aEdges(edges)
  530. edgesAdded = ""
  531. edges.each do |edge|
  532. arr = edge.split(',')
  533. src = arr[0]
  534. dst = arr[1]
  535. cost = arr[2].to_i
  536.  
  537. $neighborsSem.synchronize do
  538. unless $neighbors.key?(src) && $neighbors[src].key?(dst) && $neighbors[src][dst] == cost
  539. $neighbors[src][dst] = $neighbors[dst][src] = cost
  540. edgesAdded << " #{edge}"
  541. end
  542. end
  543. end
  544. allSocketsPuts("AEDGES#{edgesAdded}\0") unless edgesAdded == ""
  545. end
  546.  
  547. def dEdge(edge)
  548. src = edge[0]
  549. dst = edge[1]
  550.  
  551. $neighborsSem.synchronize do
  552. if $neighbors.key?(src) && $neighbors[src].key?(dst)
  553. $neighbors[src].delete(dst)
  554. $neighbors.delete(src) if $neighbors[src].empty?
  555. $neighbors[dst].delete(src)
  556. $neighbors.delete(dst) if $neighbors[dst].empty?
  557.  
  558. allSocketsPuts("DEDGE #{edge.join(' ')}\0")
  559. end
  560. end
  561. end
  562.  
  563. def dNodes(nodes)
  564. nodesDeleted = ""
  565. nodes.each do |node|
  566. if $neighbors.key?(node)
  567. $neighborsSem.synchronize do
  568. $neighbors.each_value { |dsts| dsts.delete(node) }
  569. $neighbors.delete(node)
  570. end
  571. nodesDeleted << " #{node}"
  572. end
  573. end
  574. allSocketsPuts("DNODES#{nodesDeleted}\0") unless nodesDeleted == ""
  575. end
  576.  
  577. def nextHopSocketPuts(dst, line)
  578. nextHop = ""
  579. $pathsSem.synchronize { nextHop = $paths[dst].nextHop if $paths.key?(dst) }
  580. $socketsSem.synchronize { $sockets[nextHop].write(line) if $sockets.key?(nextHop) }
  581. end
  582.  
  583. def sendp(ping)
  584. src = ping[0]
  585. dst = ping[1]
  586. seqId = ping[2].to_i
  587. uniq = ping[3]
  588.  
  589. dst == $hostname ? ackp(ping) : nextHopSocketPuts(dst, "SENDP #{ping.join(' ')}\0")
  590. end
  591.  
  592. def ackp(ping)
  593. src = ping[0]
  594. dst = ping[1]
  595. seqId = ping[2].to_i
  596. uniq = ping[3]
  597.  
  598. if src == $hostname
  599. $startTimesSem.synchronize do
  600. if $startTimes.key?(uniq)
  601. roundTripTime = $time - $startTimes[uniq]
  602.  
  603. if roundTripTime < $pingTimeout
  604. $startTimes[uniq] = nil
  605. STDOUT.puts("#{seqId} #{dst} #{roundTripTime.round(3)}")
  606. end
  607. end
  608. end
  609. else
  610. nextHopSocketPuts(src, "ACKP #{ping.join(' ')}\0")
  611. end
  612. end
  613.  
  614. def sendt(trace)
  615. src = trace[0]
  616. dst = trace[1]
  617. hops = trace[2].to_i
  618. uniq = trace[3]
  619.  
  620. if hops == 0
  621. ackt([src, dst, $hostname, hops, uniq])
  622. else
  623. trace[2] = hops - 1
  624. nextHopSocketPuts(dst, "SENDT #{trace.join(' ')}\0")
  625. end
  626. end
  627.  
  628. def ackt(trace)
  629. src = trace[0]
  630. dst = trace[1]
  631. hostId = trace[2]
  632. hopCount = trace[3].to_i
  633. uniq = trace[4]
  634.  
  635. if src == $hostname
  636. sendTrace = false
  637. $startTimesSem.synchronize do
  638. if $startTimes.key?(uniq)
  639. roundTripTime = hostId == src ? 0.0 : ($time - $startTimes[uniq])
  640.  
  641. if roundTripTime < $pingTimeout
  642. $startTimes[uniq] = nil
  643. STDOUT.puts("#{hopCount} #{hostId} #{(roundTripTime / 2).round(3)}")
  644.  
  645. unless hostId == dst
  646. sendTrace = true
  647. loop do
  648. uniq = rand.to_s
  649. break unless $startTimes.key?(uniq)
  650. end
  651. $startTimes[uniq] = $time
  652. end
  653. end
  654. end
  655. end
  656. if sendTrace
  657. sendt([src, dst, hopCount + 1, uniq])
  658.  
  659. Thread.new do
  660. sleep($pingTimeout)
  661.  
  662. $startTimesSem.synchronize do
  663. STDOUT.puts("TIMEOUT ON #{hopCount + 1}") unless $startTimes[uniq] == nil
  664. $startTimes.delete(uniq)
  665. end
  666. end
  667. end
  668. else
  669. trace[3] = hopCount + 1
  670. nextHopSocketPuts(src, "ACKT #{trace.join(' ')}\0")
  671. end
  672. end
  673.  
  674. def sendms(args)
  675. src = args[0]
  676. dst = args[1]
  677. numFrags = args[2].to_i
  678. srcUniq = args[3]
  679.  
  680. if dst == $hostname
  681. uniq = 0
  682. $messagesSem.synchronize do
  683. loop do
  684. uniq = rand(999).to_s
  685. break unless $messages.key?(uniq)
  686. end
  687. $messages[uniq] = Array.new(numFrags)
  688. $startTimesSem.synchronize { $startTimes[uniq] = $time }
  689. end
  690. ackms([src, dst, srcUniq, uniq])
  691. STDERR.puts("out of acks")
  692. Thread.new do
  693. sleep($pingTimeout)
  694. STDERR.puts("after pingTimeout")
  695. $startTimesSem.synchronize do
  696. STDERR.puts("$startTimes.key?(uniq) = #{$startTimes.key?(uniq)}")
  697. STDERR.puts("($time - $startTimes[uniq]) = #{$time - $startTimes[uniq]}")
  698. unless $startTimes.key?(uniq) && ($time - $startTimes[uniq]) < $pingTimeout
  699. STDERR.puts("deleting")
  700. $startTimes.delete(uniq)
  701. $messagesSem.synchronize { $messages.delete(uniq) }
  702. end
  703. end
  704. end
  705. else
  706. nextHopSocketPuts(dst, "SENDMS #{args.join(' ')}\0")
  707. end
  708. end
  709.  
  710. def ackms(args)
  711. src = args[0]
  712. dst = args[1]
  713. srcUniq = args[2]
  714. dstUniq = args[3]
  715.  
  716. if src == $hostname
  717. $startTimesSem.synchronize { $startTimes.key?(srcUniq) && ($time - $startTimes[srcUniq]) < $pingTimeout ? $startTimes[srcUniq] = nil : return }
  718. numFrags = 0
  719. $messagesSem.synchronize { numFrags = $messages[srcUniq].size }
  720.  
  721. numFrags.times do |seqId|
  722. uniq = 0
  723. $messagesSem.synchronize do
  724. $startTimesSem.synchronize do
  725. loop do
  726. uniq = rand(999).to_s
  727. break unless $startTimes.key?(uniq)
  728. end
  729. $startTimes[uniq] = $time
  730. end
  731. m([src, dst, uniq, dstUniq, seqId, $messages[srcUniq][seqId]])
  732. end
  733. Thread.new do
  734. sleep($pingTimeout)
  735.  
  736. $startTimesSem.synchronize do
  737. $messagesSem.synchronize do
  738. if $startTimes[uniq] != nil && $messages.key?(srcUniq)
  739. STDOUT.puts("SNDMSG ERROR: HOST UNREACHABLE")
  740. $messages.delete(srcUniq)
  741. end
  742. $startTimes.delete(uniq)
  743. end
  744. end
  745. end
  746. end
  747. Thread.new do
  748. sleep($pingTimeout)
  749.  
  750. $messagesSem.synchronize { $messages.delete(srcUniq) }
  751. end
  752. else
  753. nextHopSocketPuts(src, "ACKMS #{args.join(' ')}\0")
  754. end
  755. end
  756.  
  757. def m(args)
  758. src = args[0]
  759. dst = args[1]
  760. srcUniq = args[2]
  761. uniq = args[3]
  762. seqId = args[4].to_i
  763. msg = args[5]
  764.  
  765. if dst == $hostname
  766. stop = true
  767. $startTimesSem.synchronize do
  768. if $startTimes.key?(uniq)
  769. if $time - $startTimes[uniq] < $pingTimeout
  770. $startTimes[uniq] = $time
  771. stop = false
  772. end
  773. end
  774. end
  775. ackm([src, dst, srcUniq])
  776.  
  777. $messagesSem.synchronize do
  778. unless stop
  779. $messages[uniq][seqId] = msg
  780.  
  781. unless $messages[uniq].include?(nil)
  782. STDOUT.puts("SENDMSG: #{src} --> #{$messages[uniq].join('')}")
  783. stop = true
  784. end
  785. end
  786. if stop
  787. $messages.delete(uniq)
  788. $startTimesSem.synchronize { $startTimes.delete(uniq) }
  789. end
  790. end
  791. else
  792. nextHopSocketPuts(dst, "M #{args.join(' ')}\0")
  793. end
  794. end
  795.  
  796. def ackm(args)
  797. src = args[0]
  798. dst = args[1]
  799. uniq = args[2]
  800.  
  801. if src == $hostname
  802. $startTimesSem.synchronize { $startTimes[uniq] = nil if $startTimes.key?(uniq) && $time - $startTimes[uniq] < $pingTimeout }
  803. else
  804. nextHopSocketPuts(src, "ACKM #{args.join(' ')}\0")
  805. end
  806. end
  807.  
  808. def sendfs(args)
  809. src = args[0]
  810. dst = args[1]
  811. numFrags = args[2].to_i
  812. srcUniq = args[3]
  813. file = args[4]
  814.  
  815. if dst == $hostname
  816. uniq = 0
  817. $messagesSem.synchronize do
  818. loop do
  819. uniq = rand(999999).to_s
  820. break unless $messages.key?(uniq)
  821. end
  822. $messages[uniq] = Array.new(numFrags)
  823. $filesSem.synchronize { $files[uniq] = file }
  824. $startTimesSem.synchronize { $startTimes[uniq] = $time }
  825. end
  826. ackfs([src, dst, srcUniq, uniq])
  827. #STDERR.puts("out of acks")
  828. Thread.new do
  829. sleep($pingTimeout)
  830. #STDERR.puts("after pingTimeout")
  831. $startTimesSem.synchronize do
  832. #STDERR.puts("$startTimes.key?(uniq) = #{$startTimes.key?(uniq)}")
  833. #STDERR.puts("($time - $startTimes[uniq]) = #{$time - $startTimes[uniq]}")
  834. unless $startTimes.key?(uniq) && ($time - $startTimes[uniq]) < $pingTimeout
  835. #STDERR.puts("deleting")
  836. $startTimes.delete(uniq)
  837. $messagesSem.synchronize { $messages.delete(uniq) }
  838. $filesSem.synchronize { $files.delete(uniq) }
  839. end
  840. end
  841. end
  842. else
  843. nextHopSocketPuts(dst, "SENDFS #{args.join(' ')}\0")
  844. end
  845. end
  846.  
  847. def ackfs(args)
  848. src = args[0]
  849. dst = args[1]
  850. srcUniq = args[2]
  851. dstUniq = args[3]
  852.  
  853. if src == $hostname
  854. #STDERR.puts("srcUniq #{srcUniq}")
  855. $startTimesSem.synchronize { $startTimes.key?(srcUniq) && ($time - $startTimes[srcUniq]) < $pingTimeout ? $startTimes[srcUniq] = nil : return }
  856. numFrags = 0
  857.  
  858. $messagesSem.synchronize { numFrags = $messages[srcUniq].size }
  859. #STDERR.puts("#{numFrags}")
  860. STDERR.puts("frag #{numFrags}")
  861. numFrags.times do |seqId|
  862.  
  863. uniq = 0
  864. $messagesSem.synchronize do
  865. $startTimesSem.synchronize do
  866. loop do
  867. uniq = rand(999999).to_s
  868. break unless $startTimes.key?(uniq)
  869. end
  870. $startTimes[uniq] = $time
  871. end
  872. f([src, dst, uniq, dstUniq, seqId, $messages[srcUniq][seqId]])
  873. end
  874. Thread.new do
  875. sleep($pingTimeout)
  876.  
  877. $startTimesSem.synchronize do
  878. $messagesSem.synchronize do
  879. if $startTimes[uniq] != nil && $messages.key?(srcUniq) && ($time - $startTimes[uniq])> $pingTimeout
  880. STDOUT.puts("FTP ERROR: HOST UNREACHABLE on #{seqId}")
  881. $messages.delete(srcUniq)
  882. $startTimes.delete(uniq)
  883. return
  884. end
  885. $startTimes.delete(uniq)
  886.  
  887. end
  888. end
  889. end
  890. #STDERR.puts("#{seqId}")
  891. end
  892. Thread.new do
  893. sleep($pingTimeout)
  894.  
  895. $messagesSem.synchronize { $messages.delete(srcUniq) }
  896. end
  897. else
  898. nextHopSocketPuts(src, "ACKFS #{args.join(' ')}\0")
  899. end
  900. end
  901.  
  902. def f(args)
  903. src = args[0]
  904. dst = args[1]
  905. srcUniq = args[2]
  906. uniq = args[3]
  907. seqId = args[4].to_i
  908. msg = args[5]
  909.  
  910. if dst == $hostname
  911.  
  912. stop = true
  913. $startTimesSem.synchronize do
  914. if $startTimes.key?(uniq)
  915. if ($time - $startTimes[uniq] )< $pingTimeout
  916. $startTimes[uniq] = $time
  917. stop = false
  918. end
  919. end
  920. end
  921. #STDERR.puts("after if")
  922. ackf([src, dst, srcUniq])
  923. #STDERR.puts("after ackf")
  924. success = false
  925. #STDERR.puts("#{success} 1")
  926. $messagesSem.synchronize do
  927. # STDERR.puts("#{success} 2")
  928. unless stop
  929. $messages[uniq][seqId] = msg
  930. STDERR.puts("#{seqId} ")
  931. success = true if !$messages[uniq].include?(nil)
  932. #
  933. end
  934. #STDERR.puts("#{success} 4")
  935. if stop
  936.  
  937. $messages.delete(uniq)
  938. $filesSem.synchronize { $files.delete(uniq) }
  939. $startTimesSem.synchronize { $startTimes.delete(uniq) }
  940. end
  941. end
  942. # STDERR.puts("#{seqId}")
  943. if success
  944. file = ""
  945. $filesSem.synchronize { file = $files[uniq] }
  946. STDERR.puts(file)
  947. STDERR.puts(File.dirname(file))
  948. FileUtils.mkdir_p(File.dirname(file))
  949. Thread.new do
  950. outputFile= File.open(file, "w")
  951. # $messagesSem.synchronize { $messages[uniq].each { |chunk| f.print(chunk) } }
  952. $messagesSem.synchronize {
  953.  
  954. dec=$messages[uniq].join
  955. f1=File.open("out2","w")
  956. f1.write dec
  957. f1.close
  958. STDERR.puts("before compress")
  959. decompress=Zlib::Inflate.inflate($messages[uniq].join)
  960. }
  961.  
  962.  
  963. STDERR.puts("after compress")
  964. outputFile.write(decompress)
  965. STDERR.puts("after write")
  966. outputFile.close
  967. STDERR.puts("after close")
  968.  
  969. STDOUT.puts("FTP: #{src} --> #{file}")
  970. end
  971. end
  972. else
  973. nextHopSocketPuts(dst, "F #{args.join(' ')}\0")
  974. end
  975. end
  976.  
  977. def ackf(args)
  978. src = args[0]
  979. dst = args[1]
  980. uniq = args[2]
  981.  
  982. if src == $hostname
  983. $startTimesSem.synchronize { $startTimes[uniq] = nil if $startTimes.key?(uniq) && ($time - $startTimes[uniq]) < $pingTimeout }
  984. else
  985. nextHopSocketPuts(src, "ACKF #{args.join(' ')}\0")
  986. end
  987. end
  988.  
  989. def handleCommands(socket)
  990. while line = socket.gets("\0").chomp("\0")
  991. #STDERR.puts(line)
  992. #line.chomp!
  993. arr = line.split(' ')
  994. cmd = arr[0]
  995. args = arr[1..-1]
  996. case cmd
  997. when "LINK"; link(args[0], socket)
  998. when "AEDGES"; aEdges(args)
  999. when "DEDGE"; dEdge(args)
  1000. when "DNODES"; dNodes(args)
  1001. when "EDGED"; edged(args)
  1002. when "SENDP"; sendp(args)
  1003. when "ACKP"; ackp(args)
  1004. when "SENDT"; sendt(args)
  1005. when "ACKT"; ackt(args)
  1006. when "SENDMS"; sendms(args)
  1007. when "ACKMS"; ackms(args)
  1008. when "M"; m(line.split(/ /, 7)[1..-1])
  1009. when "ACKM"; ackm(args)
  1010. when "SENDFS"; sendfs(args)
  1011. when "ACKFS"; ackfs(args)
  1012. when "F"; f(line.split(/ /, 7)[1..-1])
  1013. when "ACKF"; ackf(args)
  1014. when "CREATEC"; createCircuit(args)
  1015. when "REMOVEC"; removeCircuit(args)
  1016. when "SETHERE";setHeretoTrue(args)
  1017. end
  1018. end
  1019. end
  1020.  
  1021. def dijkstraCircuit(src, dst, id)
  1022. max = 2**(0.size * 8 - 2) - 1
  1023. vertex = $circuits[id].keys
  1024. dists = Hash.new
  1025. previous = Hash.new
  1026. nodes = {}
  1027. visited = []
  1028.  
  1029. return if src == dst
  1030.  
  1031. vertex.each do |k|
  1032. if k == src
  1033. dists[k] = 0
  1034. nodes[k] = 0
  1035. else
  1036. dists[k] = max
  1037. nodes[k] = max
  1038. end
  1039. previous[k] = nil
  1040. end
  1041. while nodes
  1042. small = nodes.min_by { |k, v| v }
  1043. name = small[0]
  1044.  
  1045. if dists[name] == max || name == nil || nodes.empty?
  1046. break
  1047. end
  1048. if visited.include?(name)
  1049. nodes.delete(name)
  1050. next
  1051. end
  1052. nodes.delete(name)
  1053.  
  1054. $circuits[id][name].each do |nei, c|
  1055. tmp = dists[name] + c
  1056.  
  1057. if tmp < dists[nei]
  1058. dists[nei] = tmp
  1059. previous[nei] = name
  1060. nodes[nei] = tmp
  1061. end
  1062. unless previous[nei] == src
  1063. prev = previous[nei]
  1064. while previous[prev] != src && prev != nil
  1065. prev = previous[prev]
  1066. end
  1067. else
  1068. prev = nei
  1069. end
  1070. $circuitPathSem.synchronize{
  1071. prev=$hostname if $hostname==nei
  1072. $circuitPath[id][nei] = Path.new(prev, dists[nei])
  1073. }
  1074. visited << name
  1075. end
  1076.  
  1077. if name == dst
  1078. path = Array.new
  1079. x = name
  1080.  
  1081. while previous[name] != src && previous[name]
  1082. name = previous[name]
  1083. end
  1084. if previous[x] == src
  1085. name = x
  1086. end
  1087. $circuitPathSem.synchronize{
  1088.  
  1089. $circuitPath[id][x] = Path.new(name, dists[x])
  1090. }
  1091. return
  1092. end
  1093. end
  1094. end
  1095.  
  1096. def dijkstra(src, dst)
  1097. max = 2**(0.size * 8 - 2) - 1
  1098. vertex = $neighbors.keys
  1099. dists = Hash.new
  1100. previous = Hash.new
  1101. nodes = {}
  1102. visited = []
  1103.  
  1104. return if src == dst
  1105.  
  1106. vertex.each do |k|
  1107. if k == src
  1108. dists[k] = 0
  1109. nodes[k] = 0
  1110. else
  1111. dists[k] = max
  1112. nodes[k] = max
  1113. end
  1114. previous[k] = nil
  1115. end
  1116. while nodes
  1117. small = nodes.min_by { |k, v| v }
  1118. name = small[0]
  1119.  
  1120. if dists[name] == max || name == nil || nodes.empty?
  1121. break
  1122. end
  1123. if visited.include?(name)
  1124. nodes.delete(name)
  1125. next
  1126. end
  1127. nodes.delete(name)
  1128.  
  1129. $neighbors[name].each do |nei, c|
  1130. tmp = dists[name] + c
  1131.  
  1132. if tmp < dists[nei]
  1133. dists[nei] = tmp
  1134. previous[nei] = name
  1135. nodes[nei] = tmp
  1136. end
  1137. unless previous[nei] == src
  1138. prev = previous[nei]
  1139. while previous[prev] != src && prev != nil
  1140. prev = previous[prev]
  1141. end
  1142. else
  1143. prev = nei
  1144. end
  1145. $paths[nei] = Path.new(prev, dists[nei])
  1146.  
  1147. visited << name
  1148. end
  1149. if name == dst
  1150. path = Array.new
  1151. x = name
  1152.  
  1153. while previous[name] != src && previous[name]
  1154. name = previous[name]
  1155. end
  1156. if previous[x] == src
  1157. name = x
  1158. end
  1159. $paths[x] = Path.new(name, dists[x])
  1160.  
  1161. return
  1162. end
  1163. end
  1164. end
  1165.  
  1166. def server()
  1167. server = TCPServer.new $port
  1168. loop { Thread.new (server.accept) { |sock| handleCommands(sock) } }
  1169. end
  1170.  
  1171. def updateTime(delay)
  1172. sleep(delay)
  1173. $time += delay
  1174. end
  1175.  
  1176. def synchronizeTime()
  1177. $time = Time.now.to_f
  1178. updateTime($updateInterval - $time % $updateInterval)
  1179. end
  1180.  
  1181. def clock()
  1182. synchronizeTime()
  1183. Thread.new { pathsUpdater() }
  1184. loop { updateTime($clockDelay) }
  1185. end
  1186.  
  1187. def pathsUpdater()
  1188. loop do
  1189. Thread.new do
  1190. $pathsSem.synchronize do
  1191. $paths.clear
  1192. $neighborsSem.synchronize { $neighbors.each_key { |dst| dijkstra($hostname, dst) unless dst == $hostname } }
  1193. end
  1194. end
  1195. sleep($updateInterval)
  1196. end
  1197. end
  1198.  
  1199. # do main loop here....
  1200. def main()
  1201. while line = STDIN.gets
  1202. #STDERR.puts("#{$hostname}: #{line}")
  1203. line = line.strip
  1204. arr = line.split(' ')
  1205. cmd = arr[0]
  1206. args = arr[1..-1]
  1207. case cmd
  1208. when "EDGEB"; edgeb(args)
  1209. when "EDGED"; edged(args)
  1210. when "EDGEU"; edgeu(args)
  1211. when "DUMPTABLE"; dumptable(args)
  1212. when "SHUTDOWN"; shutdown(args)
  1213. when "STATUS"; status()
  1214. when "SENDMSG"; sendmsg(args)
  1215. when "PING"; ping(args)
  1216. when "TRACEROUTE"; traceroute(args)
  1217. when "FTP"; ftp(args)
  1218. when "CIRCUITB"; circuitb(args)
  1219. when "CIRCUITD"; circuitd(args)
  1220. else STDERR.puts "ERROR: INVALID COMMAND \"#{cmd}\""
  1221. end
  1222. end
  1223. end
  1224.  
  1225. def setup(hostname, port, nodes, config)
  1226. $hostname = hostname
  1227. $port = port
  1228.  
  1229. # Set up ports, server, buffers.
  1230. File.open(nodes, "r") do |f|
  1231. f.each_line do |line|
  1232. arr = line.split(',')
  1233. n = arr[0]
  1234. p = arr[1].to_i
  1235. $ports[n] = p
  1236. end
  1237. f.close
  1238. end
  1239. File.open(config, "r") do |f|
  1240. f.each_line do |line|
  1241. arr = line.split('=')
  1242. var = arr[0]
  1243. val = arr[1].to_i
  1244. case var
  1245. when "updateInterval"; $updateInterval = val
  1246. when "maxPayload"; $maxPayload = val
  1247. when "pingTimeout"; $pingTimeout = val
  1248. end
  1249. end
  1250. f.close
  1251. end
  1252. Thread.new { server() }
  1253. Thread.new do
  1254. synchronizeTime()
  1255. Thread.new { clock() }
  1256. pathsUpdater()
  1257. end
  1258.  
  1259. main()
  1260. end
  1261.  
  1262. setup(ARGV[0], ARGV[1], ARGV[2], ARGV[3])
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement