Advertisement
Guest User

Atata

a guest
Oct 18th, 2013
934
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 9.52 KB | None | 0 0
  1. // atatagrab project main.go
  2. package main
  3.  
  4. import (
  5.     "io"
  6.     "os"
  7.     "os/signal"
  8.     "sync"
  9.  
  10.     "container/heap"
  11.     "flag"
  12.     "fmt"
  13.     "strconv"
  14.     "strings"
  15.  
  16.     "github.com/opesun/goquery"
  17.     "net/http"
  18. )
  19.  
  20. const (
  21.     //Зададим кодовую фразу на случай конца потока:
  22.     ENDMESSAGE = "TooLateToDieYoung"
  23. )
  24.  
  25. var (
  26.     WORKERS     = 5     //количество рабочих
  27.     WORKERSCAP  = 5     //размер очереди каждого рабочего
  28.     ATATASTREAM = 291   //id потока ататы
  29.     ATATAPOS    = 0     //стартовая позиция в потоке ататы
  30.     IMGDIR      = "img" //директория для сохранения картинок
  31. )
  32.  
  33. //Назначим флаги командной строки:
  34. func init() {
  35.     flag.IntVar(&WORKERS, "w", WORKERS, "количество рабочих")
  36.     flag.IntVar(&ATATASTREAM, "s", ATATASTREAM, "id потока ататы")
  37.     flag.IntVar(&ATATAPOS, "p", ATATAPOS, "стартовая позиция")
  38.     flag.StringVar(&IMGDIR, "d", IMGDIR, "директория для картинок")
  39. }
  40.  
  41. //Генератор загружает страницы и достает из них ссылки на картинки
  42. func generator(out chan string, stream, start int) {
  43.     for pos := start; ; pos += 20 {
  44.         //Разбираем страницу:
  45.         x, err := goquery.ParseUrl("http://home.atata.com/streams/" + strconv.Itoa(stream) + "?order=date&from=" + strconv.Itoa(pos))
  46.         if err == nil {
  47.             //Отправляем все найденные ссылки в поток:
  48.             for _, url := range x.Find("figure a.image").Attrs("href") {
  49.                 out <- "http://atata.com/" + url
  50.             }
  51.             //А если встретили признак последней страницы - отправляем кодовую фразу..
  52.             if len(x.Find("li.last.hide")) > 0 {
  53.                 out <- ENDMESSAGE
  54.                 //..и прекращаем работу генератора
  55.                 return
  56.             }
  57.         }
  58.     }
  59. }
  60.  
  61. //Рабочий
  62. type Worker struct {
  63.     urls    chan string     // канал для заданий
  64.     pending int             // кол-во оставшихся задач
  65.     index   int             // позиция в куче
  66.     wg      *sync.WaitGroup //указатель на группу ожидания
  67. }
  68.  
  69. //В качестве аргумента получаем указатель на канал завершения
  70. func (w *Worker) work(done chan *Worker) {
  71.     for {
  72.         url := <-w.urls //читаем следующее задание
  73.         w.wg.Add(1)     //инкриминируем счетчик группы ожидания
  74.         download(url)   //загружаем файл
  75.         w.wg.Done()     //сигнализируем группе ожидания что закончили
  76.         done <- w       //показываем что завершили работу
  77.     }
  78. }
  79.  
  80. //Это будет наша "куча":
  81. type Pool []*Worker
  82.  
  83. //Проверка кто меньше - в нашем случае меньше тот у кого меньше заданий:
  84. func (p Pool) Less(i, j int) bool { return p[i].pending < p[j].pending }
  85.  
  86. //Вернем количество рабочих в пуле:
  87. func (p Pool) Len() int { return len(p) }
  88.  
  89. //Реализуем обмен местами:
  90. func (p Pool) Swap(i, j int) {
  91.     if i >= 0 && i < len(p) && j >= 0 && j < len(p) {
  92.         p[i], p[j] = p[j], p[i]
  93.         p[i].index, p[j].index = i, j
  94.     }
  95. }
  96.  
  97. //Заталкивание элемента:
  98. func (p *Pool) Push(x interface{}) {
  99.     n := len(*p)
  100.     worker := x.(*Worker)
  101.     worker.index = n
  102.     *p = append(*p, worker)
  103. }
  104.  
  105. //И выталкивание:
  106. func (p *Pool) Pop() interface{} {
  107.     old := *p
  108.     n := len(old)
  109.     item := old[n-1]
  110.     item.index = -1
  111.     *p = old[0 : n-1]
  112.     return item
  113. }
  114.  
  115. //Балансировщик
  116. type Balancer struct {
  117.     pool     Pool            //Наша "куча" рабочих
  118.     done     chan *Worker    //Канал уведомления о завершении для рабочих
  119.     requests chan string     //Канал для получения новых заданий
  120.     flowctrl chan bool       //Канал для PMFC
  121.     queue    int             //Количество незавершенных заданий переданных рабочим
  122.     wg       *sync.WaitGroup //Группа ожидания для рабочих
  123. }
  124.  
  125. //Инициализируем балансировщик. Аргументом получаем канал по которому приходят задания
  126. func (b *Balancer) init(in chan string) {
  127.     b.requests = make(chan string)
  128.     b.flowctrl = make(chan bool)
  129.     b.done = make(chan *Worker)
  130.     b.wg = new(sync.WaitGroup)
  131.  
  132.     //Запускаем наш Flow Control:
  133.     go func() {
  134.         for {
  135.             b.requests <- <-in //получаем новое задание и пересылаем его на внутренний канал
  136.             <-b.flowctrl       //а потом ждем получения подтверждения
  137.         }
  138.     }()
  139.  
  140.     //Инициализируем кучу и создаем рабочих:
  141.     heap.Init(&b.pool)
  142.     for i := 0; i < WORKERS; i++ {
  143.         w := &Worker{
  144.             urls:    make(chan string, WORKERSCAP),
  145.             index:   0,
  146.             pending: 0,
  147.             wg:      b.wg,
  148.         }
  149.         go w.work(b.done)     //запускаем рабочего
  150.         heap.Push(&b.pool, w) //и заталкиваем его в кучу
  151.     }
  152. }
  153.  
  154. //Рабочая функция балансировщика получает аргументом канал уведомлений от главного цикла
  155. func (b *Balancer) balance(quit chan bool) {
  156.     lastjobs := false //Флаг завершения, поднимаем когда кончились задания
  157.     for {
  158.         select { //В цикле ожидаем коммуникации по каналам:
  159.  
  160.         case <-quit: //пришло указание на остановку работы
  161.             b.wg.Wait()  //ждем завершения текущих загрузок рабочими..
  162.             quit <- true //..и отправляем сигнал что закончили
  163.  
  164.         case url := <-b.requests: //Получено новое задание (от flow controller)
  165.             if url != ENDMESSAGE { //Проверяем - а не кодовая ли это фраза?
  166.                 b.dispatch(url) // если нет, то отправляем рабочим
  167.             } else {
  168.                 lastjobs = true //иначе поднимаем флаг завершения
  169.             }
  170.  
  171.         case w := <-b.done: //пришло уведомление, что рабочий закончил загрузку
  172.             b.completed(w) //обновляем его данные
  173.             if lastjobs {
  174.                 if w.pending == 0 { //если у рабочего кончились задания..
  175.                     heap.Remove(&b.pool, w.index) //то удаляем его из кучи
  176.                 }
  177.                 if len(b.pool) == 0 { //а если куча стала пуста
  178.                     //значит все рабочие закончили свои очереди
  179.                     quit <- true //и можно отправлять сигнал подтверждения готовности к останову
  180.                 }
  181.             }
  182.         }
  183.     }
  184. }
  185.  
  186. // Функция отправки задания
  187. func (b *Balancer) dispatch(url string) {
  188.     w := heap.Pop(&b.pool).(*Worker) //Берем из кучи самого незагруженного рабочего..
  189.     w.urls <- url                    //..и отправляем ему задание.
  190.     w.pending++                      //Добавляем ему "весу"..
  191.     heap.Push(&b.pool, w)            //..и отправляем назад в кучу
  192.     if b.queue++; b.queue < WORKERS*WORKERSCAP {
  193.         b.flowctrl <- true
  194.     }
  195. }
  196.  
  197. //Обработка завершения задания
  198. func (b *Balancer) completed(w *Worker) {
  199.     w.pending--
  200.     heap.Remove(&b.pool, w.index)
  201.     heap.Push(&b.pool, w)
  202.     if b.queue--; b.queue == WORKERS*WORKERSCAP-1 {
  203.         b.flowctrl <- true
  204.     }
  205. }
  206.  
  207. //Загрузка изображения
  208. func download(url string) {
  209.     fileName := IMGDIR + "/" + url[strings.LastIndex(url, "/")+1:]
  210.     output, err := os.Create(fileName)
  211.     defer output.Close()
  212.  
  213.     response, err := http.Get(url)
  214.     if err != nil {
  215.         fmt.Println("Error while downloading", url, "-", err)
  216.         return
  217.     }
  218.     defer response.Body.Close()
  219.     io.Copy(output, response.Body)
  220. }
  221.  
  222. func main() {
  223.     //разберем флаги
  224.     flag.Parse()
  225.     //создадим директорию для загрузки, если её еще нет
  226.     if err := os.MkdirAll(IMGDIR, 666); err != nil {
  227.         panic(err)
  228.     }
  229.  
  230.     //Подготовим каналы и балансировщик
  231.     links := make(chan string)
  232.     quit := make(chan bool)
  233.     b := new(Balancer)
  234.     b.init(links)
  235.  
  236.     //Приготовимся перехватывать сигнал останова в канал keys
  237.     keys := make(chan os.Signal, 1)
  238.     signal.Notify(keys, os.Interrupt)
  239.  
  240.     //Запускаем балансировщик и генератор
  241.     go b.balance(quit)
  242.     go generator(links, ATATASTREAM, ATATAPOS)
  243.  
  244.     fmt.Println("Начинаем загрузку изображений...")
  245.     //Основной цикл программы:
  246.     for {
  247.         select {
  248.         case <-keys: //пришла информация от нотификатора сигналов:
  249.             fmt.Println("CTRL-C: Ожидаю завершения активных загрузок")
  250.             quit <- true //посылаем сигнал останова балансировщику
  251.  
  252.         case <-quit: //пришло подтверждение о завершении от балансировщика
  253.             fmt.Println("Загрузки завершены!")
  254.             return
  255.         }
  256.     }
  257. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement