Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """Check if missing chunks transferred from gcp-zarr are related to dask."""
- import os
- import numpy as np
- import xarray as xr
- from fsspec import get_mapper
- from dask.distributed import Client
- OUTDIR = "/tmp"
- TIME = "2012-01-01 00:00:00"
- ZARRFILE = "gs://oceanum-era5/wind_10m.zarr"
- NTRIES = 20
- NCPU = 1
- class TestXarrayDistributed:
- @classmethod
- def setup_class(self):
- self.client = None
- self.iter = 0
- self.success = []
- self.failure = []
- @classmethod
- def teardown_class(self):
- fname = "failure_{}-cpu_{}-tries_{}".format(NCPU, NTRIES, TIME)
- if self.failure:
- dset = xr.concat(self.failure, dim="iter")
- dset.to_netcdf(os.path.join(OUTDIR, fname+".nc"))
- else:
- with open(os.path.join(OUTDIR, fname+".txt"), "w") as stream:
- stream.write("NO FAILURES")
- fname = "success_{}-cpu_{}-tries_{}".format(NCPU, NTRIES, TIME)
- if self.success:
- dset = xr.concat(self.success, dim="iter")
- dset.to_netcdf(os.path.join(OUTDIR, fname+".nc"))
- else:
- with open(os.path.join(OUTDIR, fname+".txt"), "w") as stream:
- stream.write("NO SUCCESSES")
- def start_cluster(self):
- if NCPU > 1:
- print("Starting distributed with {} cpus".format(NCPU))
- self.client = Client()
- self.client.cluster.scale(NCPU)
- def close_cluster(self):
- if self.client is not None:
- self.client.close()
- def open_dataset(self):
- print("Opening zarr dataset: {}".format(ZARRFILE))
- fsmap = get_mapper(ZARRFILE)
- self.dset = xr.open_zarr(fsmap, consolidated=True)
- def close_dataset(self):
- if self.dset is not None:
- print("Closing zarr dataset")
- self.dset.close()
- def load_timestamp(self):
- ds = self.dset.sel(time=TIME).load()
- try:
- dvar = np.sqrt(ds.u10**2 + ds.v10**2)
- except:
- dvar = ds.hs
- return dvar
- def append_result(self):
- dvar = self.load_timestamp()
- if dvar.isnull().any():
- self.failure.append(dvar)
- else:
- self.success.append(dvar)
- self.iter += 1
- def test_transfer(self):
- self.start_cluster()
- self.open_dataset()
- for i in range(NTRIES):
- print("\n{}\nIterating: {}\n{}".format(88*"=", i, 88*"="))
- self.append_result()
- self.close_dataset()
- self.close_cluster()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement