Advertisement
Guest User

Untitled

a guest
Nov 29th, 2015
73
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 13.68 KB | None | 0 0
  1. {
  2. "cells": [
  3. {
  4. "cell_type": "markdown",
  5. "metadata": {
  6. "collapsed": false
  7. },
  8. "source": [
  9. "# Experimenting with Dask Imperatives and Distributed"
  10. ]
  11. },
  12. {
  13. "cell_type": "code",
  14. "execution_count": 2,
  15. "metadata": {
  16. "collapsed": true
  17. },
  18. "outputs": [],
  19. "source": [
  20. "import numpy as np"
  21. ]
  22. },
  23. {
  24. "cell_type": "markdown",
  25. "metadata": {},
  26. "source": [
  27. "To start a `Distributed` cluster with 4 workers, I run:\n",
  28. "```\n",
  29. "dcenter & \\\n",
  30. " dworker 127.0.0.1:8787 & \\\n",
  31. " dworker 127.0.0.1:8787 & \\\n",
  32. " dworker 127.0.0.1:8787 & \\\n",
  33. " dworker 127.0.0.1:8787 &\n",
  34. "```\n",
  35. "\n",
  36. "and to stop it, the easiest way seems to be:\n",
  37. "```\n",
  38. "ps aux | grep python | grep dcenter | awk '{print $2}' | xargs kill\n",
  39. "ps aux | grep python | grep dworker | awk '{print $2}' | xargs kill\n",
  40. "```\n"
  41. ]
  42. },
  43. {
  44. "cell_type": "markdown",
  45. "metadata": {},
  46. "source": [
  47. "Having done this, we can create a `distributed` executor for this notebook."
  48. ]
  49. },
  50. {
  51. "cell_type": "code",
  52. "execution_count": 3,
  53. "metadata": {
  54. "collapsed": true
  55. },
  56. "outputs": [],
  57. "source": [
  58. "from distributed import Executor\n",
  59. "executor = Executor('127.0.0.1:8787')"
  60. ]
  61. },
  62. {
  63. "cell_type": "markdown",
  64. "metadata": {},
  65. "source": [
  66. "## Create a dummy workload"
  67. ]
  68. },
  69. {
  70. "cell_type": "markdown",
  71. "metadata": {},
  72. "source": [
  73. "The following code simulates a distributed workload that runs a series of \"expensive\" operations to generate an ordered collection of images. The idea is that these operations are expensive enough and/or numerous enough to justify computing them on a cluster rather than on a single, powerful (shared memory) workstation. For fast visualization purposes, I then normalize these images, apply a colormap, and then pull them down to the master node where they can be saved as frames of a movie.\n",
  74. "\n",
  75. "By the time I colormap these images and fetch the images down to the executor, each image contains roughly 2MB of data. In this example, I generate just 100 of them, so there should just be about 200MB of data to transfer from workers. In a real application I might generate quite a lot more data than this, perhaps several gigabytes worth of images that I'd fetch from the cluster. Even in this \"scaled up\" scenario, the size of each individual image to be fetched should be a manageable size (in the 1-10MB range)."
  76. ]
  77. },
  78. {
  79. "cell_type": "code",
  80. "execution_count": 27,
  81. "metadata": {
  82. "collapsed": true
  83. },
  84. "outputs": [],
  85. "source": [
  86. "IMAGE_SIZE = (512,512)\n",
  87. "NUM_Z_SLICES = 100\n",
  88. "Z_RANGE = (-10e-6, 10e-6)\n",
  89. "MOVIE_Z_SAMPLES = np.linspace(Z_RANGE[0], Z_RANGE[1], NUM_Z_SLICES)"
  90. ]
  91. },
  92. {
  93. "cell_type": "markdown",
  94. "metadata": {},
  95. "source": [
  96. "Define the functions that make up our workload. This is an extremely parallelizable workload that nonetheless has several dependencies between computational \"layers.\""
  97. ]
  98. },
  99. {
  100. "cell_type": "code",
  101. "execution_count": 28,
  102. "metadata": {
  103. "collapsed": true
  104. },
  105. "outputs": [],
  106. "source": [
  107. "def compute_test_image(z):\n",
  108. " \"\"\"\n",
  109. " Imagine that this is a function that takes a while to run on a cluster. \n",
  110. " \n",
  111. " Returns an image of type complex64.\n",
  112. " \"\"\"\n",
  113. " re_data = np.random.random(IMAGE_SIZE)\n",
  114. " im_data = np.random.random(IMAGE_SIZE)\n",
  115. " \n",
  116. " result = re_data + 1j * im_data\n",
  117. " \n",
  118. " # Do a dummy FFT 10 times, to simulate some real work\n",
  119. " for i in range(10):\n",
  120. " result = np.fft.fftshift(np.fft.fft2(np.fft.ifftshift(result)))\n",
  121. "\n",
  122. " return result\n",
  123. "\n",
  124. "def amplitude_range(im):\n",
  125. " amplitude = np.abs(im)\n",
  126. " return (amplitude.min(), amplitude.max())\n",
  127. "\n",
  128. "def phase_range(im):\n",
  129. " phase = np.angle(im)\n",
  130. " return (phase.min(), phase.max())\n",
  131. "\n",
  132. "def colormap_and_combine(im, amplitude_range, phase_range):\n",
  133. " amplitude = np.abs(im) \n",
  134. " amplitude /= amplitude_range[1]\n",
  135. " \n",
  136. " phase = np.angle(im)\n",
  137. " phase = (phase - phase_range[0]) / (phase_range[1] - phase_range[0])\n",
  138. " \n",
  139. " from matplotlib import cm\n",
  140. " amplitude_cmap = cm.hot( amplitude )\n",
  141. " phase_cmap = cm.coolwarm( phase )\n",
  142. " \n",
  143. " from matplotlib.colors import rgb_to_hsv, hsv_to_rgb\n",
  144. " hsv = rgb_to_hsv(phase_cmap[:,:,:3])\n",
  145. " hsv[:,:,2] = np.power(amplitude, 0.5)\n",
  146. " phase_cmap[:,:,:3] = hsv_to_rgb(hsv)\n",
  147. "\n",
  148. " result = np.zeros((im.shape[0], 2 * im.shape[1], 4), dtype = np.float32)\n",
  149. " result[0:, :im.shape[1]] = amplitude_cmap\n",
  150. " result[0:, im.shape[1]:] = phase_cmap\n",
  151. " \n",
  152. " result *= 255\n",
  153. "\n",
  154. " return result.astype(np.uint8)\n"
  155. ]
  156. },
  157. {
  158. "cell_type": "markdown",
  159. "metadata": {},
  160. "source": [
  161. "Create a computational graph using Dask imperatives. (So cool!) This computation is deferred until I actually call `compute()` below. \n",
  162. "\n",
  163. "The description of the computation and the execution engine are nicely abstracted from each other in Dask, so I can run this same computation on a variety of execution engines below and compare their relative performance."
  164. ]
  165. },
  166. {
  167. "cell_type": "code",
  168. "execution_count": 29,
  169. "metadata": {
  170. "collapsed": false
  171. },
  172. "outputs": [],
  173. "source": [
  174. "from dask.imperative import do, value\n",
  175. "\n",
  176. "test_images = [ do(compute_test_image)(z) for z in MOVIE_Z_SAMPLES ]\n",
  177. "amplitude_ranges = [ do(amplitude_range)(im) for im in test_images ]\n",
  178. "phase_ranges = [ do(phase_range)(im) for im in test_images ]\n",
  179. "\n",
  180. "colormapped_images = value([ do(colormap_and_combine)(*args) for args in zip(test_images,amplitude_ranges, phase_ranges) ])"
  181. ]
  182. },
  183. {
  184. "cell_type": "markdown",
  185. "metadata": {},
  186. "source": [
  187. "### Tests Round #1: Fetching individual results"
  188. ]
  189. },
  190. {
  191. "cell_type": "markdown",
  192. "metadata": {},
  193. "source": [
  194. "These tests measure the time it takes to compute a single colormapped image."
  195. ]
  196. },
  197. {
  198. "cell_type": "markdown",
  199. "metadata": {},
  200. "source": [
  201. "#### Test #1.1: Run the computation locally using the `dask.threaded` execution engine."
  202. ]
  203. },
  204. {
  205. "cell_type": "code",
  206. "execution_count": 30,
  207. "metadata": {
  208. "collapsed": false
  209. },
  210. "outputs": [
  211. {
  212. "name": "stdout",
  213. "output_type": "stream",
  214. "text": [
  215. "CPU times: user 29 s, sys: 6.03 s, total: 35 s\n",
  216. "Wall time: 10.2 s\n"
  217. ]
  218. }
  219. ],
  220. "source": [
  221. "%%time\n",
  222. "import dask.threaded\n",
  223. "test1 = colormapped_images[0].compute(get = dask.threaded.get, num_workers = 4)"
  224. ]
  225. },
  226. {
  227. "cell_type": "markdown",
  228. "metadata": {},
  229. "source": [
  230. "**Comments:** This is the baseline case, where a single result is computed. The results are already in shared memory so there is no communication overhead in this example. This provides a sense for how much time is spent computing each output image."
  231. ]
  232. },
  233. {
  234. "cell_type": "markdown",
  235. "metadata": {},
  236. "source": [
  237. "#### Test #1.2: Run the computation locally using the `dask.multiprocessing` execution engine."
  238. ]
  239. },
  240. {
  241. "cell_type": "code",
  242. "execution_count": 31,
  243. "metadata": {
  244. "collapsed": false
  245. },
  246. "outputs": [
  247. {
  248. "name": "stdout",
  249. "output_type": "stream",
  250. "text": [
  251. "CPU times: user 2.35 s, sys: 3.88 s, total: 6.24 s\n",
  252. "Wall time: 18.1 s\n"
  253. ]
  254. }
  255. ],
  256. "source": [
  257. "%%time\n",
  258. "import dask.multiprocessing\n",
  259. "test2 = colormapped_images[0].compute(get = dask.multiprocessing.get, num_workers = 4)"
  260. ]
  261. },
  262. {
  263. "cell_type": "markdown",
  264. "metadata": {},
  265. "source": [
  266. "**Comments:** This take quite a lot longer. Some overhead must be due to the creation of a multiprocessing pool, and some overhead is presumably due to inter-process communication, as the results are fetched back to the master process. The overhead is significant in this example."
  267. ]
  268. },
  269. {
  270. "cell_type": "markdown",
  271. "metadata": {},
  272. "source": [
  273. "#### Test #1.3: Run the computation on a local cluster using the `distributed.executor.get` computation engine."
  274. ]
  275. },
  276. {
  277. "cell_type": "markdown",
  278. "metadata": {},
  279. "source": [
  280. "The first time we run this, the results take a little while to compute."
  281. ]
  282. },
  283. {
  284. "cell_type": "code",
  285. "execution_count": 32,
  286. "metadata": {
  287. "collapsed": false
  288. },
  289. "outputs": [
  290. {
  291. "name": "stdout",
  292. "output_type": "stream",
  293. "text": [
  294. "CPU times: user 895 ms, sys: 152 ms, total: 1.05 s\n",
  295. "Wall time: 20.7 s\n"
  296. ]
  297. }
  298. ],
  299. "source": [
  300. "%%time \n",
  301. "test3 = colormapped_images[0].compute(get = executor.get)"
  302. ]
  303. },
  304. {
  305. "cell_type": "markdown",
  306. "metadata": {},
  307. "source": [
  308. "** Comments: ** The communications overhead is apparent here, but this runs faster than the `dask.multiprocessing` test above. I imagine this is partially due to the fact that the distributed workers are already running, whereas `dask.multiprocessing` needs to spin up a worker pool. Perhaps interprocess communications are a bit faster here as well?"
  309. ]
  310. },
  311. {
  312. "cell_type": "markdown",
  313. "metadata": {},
  314. "source": [
  315. "### Tests: Fetching all results"
  316. ]
  317. },
  318. {
  319. "cell_type": "markdown",
  320. "metadata": {},
  321. "source": [
  322. "#### Test #2.1: Run the computation locally using the `dask.threaded` execution engine."
  323. ]
  324. },
  325. {
  326. "cell_type": "code",
  327. "execution_count": 36,
  328. "metadata": {
  329. "collapsed": false
  330. },
  331. "outputs": [
  332. {
  333. "name": "stdout",
  334. "output_type": "stream",
  335. "text": [
  336. "CPU times: user 28.9 s, sys: 6.07 s, total: 35 s\n",
  337. "Wall time: 10.3 s\n"
  338. ]
  339. }
  340. ],
  341. "source": [
  342. "%%time\n",
  343. "import dask.threaded\n",
  344. "\n",
  345. "results = colormapped_images.compute(get = dask.threaded.get, num_workers=4);"
  346. ]
  347. },
  348. {
  349. "cell_type": "markdown",
  350. "metadata": {},
  351. "source": [
  352. "**Comments:** this test computation is small enough that I can run it locally on my machine to show the best case. This runs extremely quickly, since the computation itself is carried out in parallel across 4 cores on my laptop. The results already reside in shared memory, so basically zero time is spent \"fetching\" the results in this case. This is a best case with nearly no communication overhead."
  353. ]
  354. },
  355. {
  356. "cell_type": "markdown",
  357. "metadata": {},
  358. "source": [
  359. "#### Test #2: Run the computation locally using the `dask.multiprocessing` execution engine."
  360. ]
  361. },
  362. {
  363. "cell_type": "code",
  364. "execution_count": 37,
  365. "metadata": {
  366. "collapsed": false
  367. },
  368. "outputs": [
  369. {
  370. "name": "stdout",
  371. "output_type": "stream",
  372. "text": [
  373. "CPU times: user 2.72 s, sys: 25.9 s, total: 28.6 s\n",
  374. "Wall time: 56.8 s\n"
  375. ]
  376. }
  377. ],
  378. "source": [
  379. "%%time\n",
  380. "import dask.multiprocessing\n",
  381. "\n",
  382. "results = colormapped_images.compute(get = dask.multiprocessing.get, num_workers=4);"
  383. ]
  384. },
  385. {
  386. "cell_type": "markdown",
  387. "metadata": {},
  388. "source": [
  389. "**Comments: ** In this test case, I see my CPU usage climb up to about 300% for about 10 seconds, and then it drops down to 100% for the remaining 1m17s seconds after that. Presumably the master process is collecting results from the workers during this second phase, and is using the full CPU resources it has available to it to do so. I expect this data transfer to take some time, but 1m17s is quite a bit longer that I would expect it should take to transfer 100MB in 2MB chunks locally between processes."
  390. ]
  391. },
  392. {
  393. "cell_type": "markdown",
  394. "metadata": {},
  395. "source": [
  396. "#### Test #3: Run the computation on a local cluster using the `distributed.executor.get` computation engine."
  397. ]
  398. },
  399. {
  400. "cell_type": "code",
  401. "execution_count": 38,
  402. "metadata": {
  403. "collapsed": false
  404. },
  405. "outputs": [
  406. {
  407. "name": "stdout",
  408. "output_type": "stream",
  409. "text": [
  410. "CPU times: user 42.1 s, sys: 26.2 s, total: 1min 8s\n",
  411. "Wall time: 1min 27s\n"
  412. ]
  413. }
  414. ],
  415. "source": [
  416. "%%time\n",
  417. "results = colormapped_images.compute(get = executor.get);"
  418. ]
  419. },
  420. {
  421. "cell_type": "markdown",
  422. "metadata": {},
  423. "source": [
  424. "** Comments: ** Again, CPU usage spikes initially for about 10s as results are computed (each of the four workers takes about 150% of CPU), and then drops down to 100% as a single Python process (I assume the executor) gathers results.\n",
  425. "\n",
  426. "The communications overhead is more here than the `dask.multiprocessing` test above. However, when I run the same test with 50 images, this test actually collects results faster than the `dask.multiprocessing` test. Strange. Either way, the communication overhead is more than I would expect for transferring 200MB worth of data locally on a single machine. Compression would speed this up, of course, but even so the data transfer seems "
  427. ]
  428. },
  429. {
  430. "cell_type": "code",
  431. "execution_count": null,
  432. "metadata": {
  433. "collapsed": true
  434. },
  435. "outputs": [],
  436. "source": []
  437. }
  438. ],
  439. "metadata": {
  440. "kernelspec": {
  441. "display_name": "Python 3",
  442. "language": "python",
  443. "name": "python3"
  444. },
  445. "language_info": {
  446. "codemirror_mode": {
  447. "name": "ipython",
  448. "version": 3
  449. },
  450. "file_extension": ".py",
  451. "mimetype": "text/x-python",
  452. "name": "python",
  453. "nbconvert_exporter": "python",
  454. "pygments_lexer": "ipython3",
  455. "version": "3.5.0"
  456. }
  457. },
  458. "nbformat": 4,
  459. "nbformat_minor": 0
  460. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement