Advertisement
Guest User

paralle_curl.py

a guest
Nov 29th, 2014
233
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.22 KB | None | 0 0
  1. #!/usr/bin/python
  2. import sys
  3. import os
  4. import time
  5. import os.path
  6. import threading
  7. import socket
  8.  
  9. verbose=os.getenv('VERBOSE')
  10.  
  11. curlcmdbase="curl -i "
  12.  
  13. fs='test'
  14. user='%s:tester'%fs
  15. password='testing'
  16.  
  17. #default is no encryption
  18. url_prefix=os.getenv('URL_PREFIX')
  19. if not url_prefix: url_prefix='http'
  20.  
  21. # if we're using plain http: default to 8080 like openswift
  22. # otherwise don't specify at all, this defaults to 443
  23. http_portenv=os.getenv('HTTP_PORT')
  24. if (not http_portenv) and url_prefix == 'http' : http_portenv='61005'
  25. http_port = int(http_portenv)
  26. http_portstr=''
  27. if http_port: http_portstr=':%d'%http_port
  28.  
  29. # don't force them to use a certificate unless we are using https
  30. cert = os.getenv('CERT')
  31. if (not cert) and url_prefix == 'https': cert='cert.crt'
  32. cacert_option=''
  33. if cert: cacert_option = ' --cacert %s '%cert
  34. # parse command line arguments
  35.  
  36. if len(sys.argv) < 6:
  37. print 'usage: parallel_curl.sh op threads files size-KB client1,...,clientM server1,...,serverN'
  38. sys.exit(1)
  39.  
  40. op=sys.argv[1]
  41. threads=int(sys.argv[2])
  42. files=int(sys.argv[3])
  43. sz=int(sys.argv[4])
  44. client_list=sys.argv[5].split(',')
  45. server_list=sys.argv[6].split(',')
  46.  
  47. svrcount=len(server_list)
  48. clntcount=len(client_list)
  49. me=socket.gethostname().split('.')[0]
  50.  
  51. if verbose:
  52. print 'doing %s %d threads %d objects/thread %d KB/object'%(op, threads, files, sz)
  53. print 'url_prefix: ' + url_prefix
  54. print 'http_portstr: ' + http_portstr
  55. print 'cert: ' + cacert_option
  56.  
  57. # parallel requests are done with this class
  58. count3 = 0
  59. class curl_thread(threading.Thread):
  60. def __init__(self, client, thread_id, cmd):
  61. threading.Thread.__init__(self)
  62. self.cmd = cmd
  63. self.client = client
  64. self.thread_id = thread_id
  65.  
  66. def run(self):
  67. global count3
  68. for fnum in range(0, files):
  69. next_cmd = 'ssh -nx -o StrictHostKeyChecking=no %s "'%self.client + self.cmd%fnum
  70. next_cmd += ' | strings " >> /tmp/curl-thread.log'#%self.thread_id
  71. if verbose: print 'next command for thread %d: %s'%(self.thread_id, next_cmd)
  72. os.system(next_cmd)
  73. count3 =count3 +1
  74.  
  75. # get authorization tokens from file or from server if necessary
  76.  
  77. tkfile=os.path.join(os.getenv('HOME'), 'tk.tmp')
  78. auth_tokens = []
  79. if os.access(tkfile, os.R_OK):
  80. f = open(tkfile, "r")
  81. auth_tokens = [ t.strip() for t in f.readlines() ]
  82. f.close()
  83. if len(auth_tokens) < min(threads,svrcount): # if we couldn't find any authorization tokens from last run
  84. for k in range(0, min(threads,svrcount)):
  85. svr = server_list[k]
  86. cmd = curlcmdbase + cacert_option
  87. cmd += " -v -H 'X-Storage-User: %s' -H 'X-Storage-Pass: %s' %s://%s:%s/auth/v1.0 2>&1 | "
  88. cmd += "awk '/X-Auth-Token/{ print $3 }' >> %s"
  89. cmd = cmd%(user, password, url_prefix, svr, http_port, tkfile)
  90. if verbose: print cmd
  91. os.system(cmd)
  92. f = open(tkfile, "r")
  93. auth_tokens = [ t.strip() for t in f.readlines() ]
  94. f.close()
  95. if len(auth_tokens) == 0:
  96. raise Exception('failed to obtain any authorization tokens')
  97. if verbose: print 'authorization tokens = %s'%auth_tokens
  98. # for put operation we need a file on each client to put
  99. # generate one and make sure it's on all clients
  100.  
  101. if op == 'put':
  102. current_sz = -1
  103. bfn = '/tmp/big.dd'
  104. if os.access(bfn, os.R_OK):
  105. current_sz = os.path.getsize(bfn)
  106. if sz * 1024 != current_sz:
  107. cmd = 'rm -fv %s ; dd if=/dev/zero of=%s bs=1k count=%d && ls -l %s && gzip -c < %s > %s.gz'%\
  108. (bfn, bfn, sz, bfn, bfn, bfn)
  109. if verbose: print cmd
  110. os.system(cmd)
  111. assert(os.path.getsize(bfn) == sz * 1024)
  112. for j in range(0, min(threads,clntcount)):
  113. clnt = client_list[j]
  114. cmd=' rsync -av %s.gz %s:%s.gz'%(bfn, clnt, bfn)
  115. if verbose: print cmd
  116. os.system(cmd)
  117. cmd = 'ssh -nx %s "gunzip -c < %s.gz > %s && ls -l %s"'%(clnt, bfn, bfn, bfn)
  118. if verbose: print cmd
  119. os.system(cmd)
  120. os.system('rm -f /tmp/curl-thread*.log')
  121. start_time=time.time()
  122.  
  123. # generate curl command for each thread and start it
  124.  
  125. curl_thread_list = []
  126. count = 0
  127. for thr in range(0, threads):
  128. clnt = client_list[thr%clntcount] # which client to use
  129. svr = server_list[thr%svrcount] # which server to use
  130. tk = auth_tokens[thr%svrcount] # which authorization token to use
  131. curlcmd = curlcmdbase + cacert_option + "-H 'X-Auth-Token: %s ' "%tk
  132. if op == "put":
  133. # container must exist before we put objects in it
  134.  
  135. container_put = 'ssh -nx ' + clnt + " \"" + curlcmd + " -X PUT %s://%s%s/v1/AUTH_test/%s-%d\""%\
  136. (url_prefix, server_list[0], http_portstr, clnt, thr)
  137. count =count +1;
  138. print "count \n", count
  139. time.sleep(1)
  140. if verbose: print "\n",container_put
  141. os.system(container_put)
  142. curlcmd += "-X PUT -T /tmp/big.dd "
  143. elif op == "get":
  144. curlcmd += "-X GET "
  145. elif op == "delete":
  146. curlcmd += "-X DELETE "
  147. else:
  148. raise Exception("invalid op " + op)
  149. curlcmd += " %s://%s%s/v1/AUTH_%s/%s-%d/f%s"%(url_prefix, svr, http_portstr, fs, clnt, thr, '%d')
  150. curl_thread_list.append(curl_thread(clnt, thr, curlcmd))
  151. time.sleep(1)
  152. count2 = 0
  153. for thr in range(0, threads):
  154. count2 = count2+1
  155. curl_thread_list[thr].start()
  156. for thr in range(0, threads):
  157. curl_thread_list[thr].join(100)
  158.  
  159. # read log files from HTTP activity
  160. # and detect errors
  161. errors=0
  162. #for thr in range(0, threads):
  163. fn = '/tmp/curl-thread.log'#%thr
  164. if verbose: print '*******reading %s'%fn
  165. logf = open(fn, "r")
  166. lines = logf.readlines()
  167. logf.close()
  168. for ln in lines:
  169. if ln.__contains__('<title>'):
  170. code = ln.split('>')[1].split()[0]
  171. rc = int(code)
  172. if rc < 200 or rc >= 300:
  173. print 'ERROR: '+ code
  174. errors += 1
  175. end_time = time.time()
  176. if errors > 0:
  177. print 'WARNING: %d errors found'%errors
  178. print 'doing %s %d threads %d objects/thread %d KB/object'%(op, threads, files, sz)
  179. print 'clients: %s'%client_list
  180. print 'servers: %s'%server_list
  181. elapsed_time = end_time - start_time
  182. print 'elapsed time = %7.2f sec'%elapsed_time
  183. throughput = files * threads / elapsed_time
  184. print 'throughput = %9.2f objs/sec'%throughput
  185. if op != 'delete':
  186. transfer_rate = sz * throughput / 1024.0
  187. print 'transfer rate = %9.2f MB/s'%transfer_rate
  188. print "count thread===", count2
  189. print "\ncount 3========", count3
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement