Advertisement
Guest User

Untitled

a guest
Oct 23rd, 2019
98
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.47 KB | None | 0 0
  1. """Check if missing chunks transferred from gcp-zarr are related to dask."""
  2. import os
  3. import numpy as np
  4. import xarray as xr
  5. from fsspec import get_mapper
  6. from dask.distributed import Client
  7.  
  8. OUTDIR = "/tmp"
  9. TIME = "2012-01-01 00:00:00"
  10.  
  11. ZARRFILE = "gs://oceanum-era5/wind_10m.zarr"
  12.  
  13. NTRIES = 20
  14. NCPU = 1
  15.  
  16.  
  17. class TestXarrayDistributed:
  18.  
  19. @classmethod
  20. def setup_class(self):
  21. self.client = None
  22. self.iter = 0
  23. self.success = []
  24. self.failure = []
  25.  
  26. @classmethod
  27. def teardown_class(self):
  28. fname = "failure_{}-cpu_{}-tries_{}".format(NCPU, NTRIES, TIME)
  29. if self.failure:
  30. dset = xr.concat(self.failure, dim="iter")
  31. dset.to_netcdf(os.path.join(OUTDIR, fname+".nc"))
  32. else:
  33. with open(os.path.join(OUTDIR, fname+".txt"), "w") as stream:
  34. stream.write("NO FAILURES")
  35.  
  36. fname = "success_{}-cpu_{}-tries_{}".format(NCPU, NTRIES, TIME)
  37. if self.success:
  38. dset = xr.concat(self.success, dim="iter")
  39. dset.to_netcdf(os.path.join(OUTDIR, fname+".nc"))
  40. else:
  41. with open(os.path.join(OUTDIR, fname+".txt"), "w") as stream:
  42. stream.write("NO SUCCESSES")
  43.  
  44. def start_cluster(self):
  45. if NCPU > 1:
  46. print("Starting distributed with {} cpus".format(NCPU))
  47. self.client = Client()
  48. self.client.cluster.scale(NCPU)
  49.  
  50. def close_cluster(self):
  51. if self.client is not None:
  52. self.client.close()
  53.  
  54. def open_dataset(self):
  55. print("Opening zarr dataset: {}".format(ZARRFILE))
  56. fsmap = get_mapper(ZARRFILE)
  57. self.dset = xr.open_zarr(fsmap, consolidated=True)
  58.  
  59. def close_dataset(self):
  60. if self.dset is not None:
  61. print("Closing zarr dataset")
  62. self.dset.close()
  63.  
  64. def load_timestamp(self):
  65. ds = self.dset.sel(time=TIME).load()
  66. try:
  67. dvar = np.sqrt(ds.u10**2 + ds.v10**2)
  68. except:
  69. dvar = ds.hs
  70. return dvar
  71.  
  72. def append_result(self):
  73. dvar = self.load_timestamp()
  74. if dvar.isnull().any():
  75. self.failure.append(dvar)
  76. else:
  77. self.success.append(dvar)
  78. self.iter += 1
  79.  
  80. def test_transfer(self):
  81. self.start_cluster()
  82. self.open_dataset()
  83. for i in range(NTRIES):
  84. print("\n{}\nIterating: {}\n{}".format(88*"=", i, 88*"="))
  85. self.append_result()
  86. self.close_dataset()
  87. self.close_cluster()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement