Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import math
- import sys
- import errno
- import os
- import ctypes
- import signal
- import torch
- import time
- import traceback
- import unittest
- import subprocess
- from torch import multiprocessing
- from torch.utils.data import Dataset, TensorDataset, DataLoader, ConcatDataset
- from torch.utils.data.dataset import random_split
- from torch.utils.data.dataloader import default_collate, ExceptionWrapper
- JOIN_TIMEOUT = 10
- def _manager_process(dataset, worker_pids):
- print("_manager_process is started")
- loader = iter(DataLoader(dataset, batch_size=2, num_workers=4, pin_memory=True))
- print("loader init done")
- workers = loader.workers
- print("# workers: ", len(workers))
- for i in range(len(workers)):
- print(workers[i].pid)
- worker_pids[i] = int(workers[i].pid)
- print(worker_pids[i])
- for i, sample in enumerate(loader):
- if i == 3:
- break
- os.kill(os.getpid(), signal.SIGKILL)
- def _is_process_alive(pid, pname):
- command = 'ps -p {} -o comm='.format(pid)
- p = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
- (output, err) = p.communicate()
- p_status = p.wait()
- output = output.decode('utf-8')
- return pname in output
- def test_worker_exit(dataset):
- worker_pids = multiprocessing.Array('i', [0] * 4)
- # _manager_process(dataset, worker_pids)
- mp = multiprocessing.Process(target=_manager_process, args=(dataset, worker_pids, ))
- mp.start()
- time.sleep(30)
- exit_status = [False] * len(worker_pids)
- start_time = time.time()
- pname = 'python'
- while True:
- for i in range(len(worker_pids)):
- pid = worker_pids[i]
- if not exit_status[i]:
- if not _is_process_alive(pid, pname):
- exit_status[i] = True
- if all(exit_status):
- break
- else:
- time.sleep(1)
- if time.time() - start_time > JOIN_TIMEOUT:
- raise Exception('subprocess not terminated')
- if __name__ == '__main__':
- data = torch.randn(100, 2, 3, 5)
- labels = torch.randperm(50).repeat(2)
- dataset = TensorDataset(data, labels)
- multiprocessing.set_start_method('spawn')
- test_worker_exit(dataset)
Add Comment
Please, Sign In to add comment