Guest User

Untitled

a guest
Oct 17th, 2018
82
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.45 KB | None | 0 0
  1. package gitfetcher
  2.  
  3. import (
  4. "bufio"
  5. "context"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net/url"
  10. "regexp"
  11. "strconv"
  12. "strings"
  13. "time"
  14.  
  15. "github.com/dave/services"
  16. // "github.com/dave/services/logger"
  17. "gopkg.in/src-d/go-billy-siva.v4"
  18. "gopkg.in/src-d/go-billy.v4"
  19. "gopkg.in/src-d/go-billy.v4/memfs"
  20. git "gopkg.in/src-d/go-git.v4"
  21. "gopkg.in/src-d/go-git.v4/plumbing"
  22. "gopkg.in/src-d/go-git.v4/plumbing/cache"
  23. "gopkg.in/src-d/go-git.v4/plumbing/storer"
  24. "gopkg.in/src-d/go-git.v4/storage/filesystem"
  25. "gopkg.in/src-d/go-git.v4/storage/memory"
  26. )
  27.  
  28. const FNAME = "repo.bin"
  29.  
  30. func New(cache, fileserver services.Fileserver, config Config) *Fetcher {
  31. return &Fetcher{
  32. cache: cache,
  33. fileserver: fileserver,
  34. config: config,
  35. }
  36. }
  37.  
  38. type Fetcher struct {
  39. cache, fileserver services.Fileserver
  40. config Config
  41. }
  42.  
  43. type Config struct {
  44. GitSaveTimeout time.Duration
  45. GitCloneTimeout time.Duration
  46. GitMaxObjects int
  47. GitBucket string
  48. }
  49.  
  50. func (f *Fetcher) Fetch(ctx context.Context, url string) (billy.Filesystem, error) {
  51.  
  52. persisted, sfs, store, worktree, err := f.initFilesystems()
  53. if err != nil {
  54. return nil, err
  55. }
  56.  
  57. exists, err := f.load(ctx, f.cache, url, persisted)
  58. if err != nil {
  59. return nil, err
  60. }
  61.  
  62. if !exists {
  63. exists, err = f.load(ctx, f.fileserver, url, persisted)
  64. if err != nil {
  65. return nil, err
  66. }
  67. }
  68.  
  69. var changed bool
  70.  
  71. if exists {
  72. if changed, err = f.doFetch(ctx, url, store, worktree); err != nil {
  73. // If error while fetching, try a full clone before exiting. Make sure we re-initialise
  74. // the filesystems.
  75. persisted, sfs, store, worktree, err = f.initFilesystems()
  76. if err != nil {
  77. return nil, err
  78. }
  79. if changed, err = f.doClone(ctx, url, store, worktree); err != nil {
  80. return nil, err
  81. }
  82. }
  83.  
  84. } else {
  85. if changed, err = f.doClone(ctx, url, store, worktree); err != nil {
  86. return nil, err
  87. }
  88. }
  89.  
  90. if err := sfs.Sync(); err != nil {
  91. return nil, err
  92. }
  93. // we don't want the context to be cancelled half way through saving, so let's create a new one:
  94. gitctx, _ := context.WithTimeout(context.Background(), f.config.GitSaveTimeout)
  95. if changed {
  96. go f.save(gitctx, f.fileserver, url, persisted)
  97. }
  98. go f.save(gitctx, f.cache, url, persisted)
  99.  
  100. return worktree, nil
  101. }
  102.  
  103. func (f *Fetcher) initFilesystems() (persisted billy.Filesystem, sfs sivafs.SivaFS, store *filesystem.Storage, worktree billy.Filesystem, err error) {
  104. persisted = memfs.New()
  105. sfs, err = sivafs.NewFilesystem(persisted, FNAME, memfs.New())
  106. if err != nil {
  107. return nil, nil, nil, nil, err
  108. }
  109.  
  110. store = filesystem.NewStorage(sfs, cache.NewObjectLRUDefault())
  111. // if err != nil {
  112. // return nil, nil, nil, nil, err
  113. // }
  114.  
  115. worktree = memfs.New()
  116.  
  117. return persisted, sfs, store, worktree, nil
  118. }
  119.  
  120. func (f *Fetcher) doFetch(ctx context.Context, url string, store *filesystem.Storage, worktree billy.Filesystem) (changed bool, err error) {
  121.  
  122. // Opening git repo
  123. repo, err := git.Open(store, worktree)
  124. if err != nil {
  125. return false, err
  126. }
  127.  
  128. // Get the origin remote (all repos have origin?)
  129. remote, err := repo.Remote("origin")
  130. if err != nil {
  131. return false, err
  132. }
  133.  
  134. // Get a list of references from the remote
  135. refs, err := remote.List(&git.ListOptions{})
  136. if err != nil {
  137. return false, err
  138. }
  139.  
  140. // Find the HEAD reference. If we can't find it, return an error.
  141. rs := memory.ReferenceStorage{}
  142. for _, ref := range refs {
  143. rs[ref.Name()] = ref
  144. }
  145. originHead, err := storer.ResolveReference(rs, plumbing.HEAD)
  146. if err != nil {
  147. return false, err
  148. }
  149. if originHead == nil {
  150. return false, errors.New("HEAD not found")
  151. }
  152.  
  153. // We only need to do a full Fetch if the head has changed. Compare with repo.Head().
  154. repoHead, err := repo.Head()
  155. if err != nil {
  156. return false, err
  157. }
  158. if originHead.Hash() != repoHead.Hash() {
  159.  
  160. // repo has changed - this will mean it's saved after the operation
  161. changed = true
  162.  
  163. ctx, cancel := context.WithTimeout(ctx, f.config.GitCloneTimeout)
  164. defer cancel()
  165.  
  166. pw, errchan := newProgressWatcher(f.config.GitMaxObjects)
  167. defer pw.stop()
  168. var errFromWatcher error
  169. go func() {
  170. if err := <-errchan; err != nil {
  171. errFromWatcher = err
  172. cancel()
  173. }
  174. }()
  175.  
  176. if err := repo.FetchContext(ctx, &git.FetchOptions{Force: true, Progress: pw}); err != nil && err != git.NoErrAlreadyUpToDate {
  177. if errFromWatcher != nil {
  178. return false, errFromWatcher
  179. }
  180. return false, err
  181. }
  182. }
  183.  
  184. // Get the worktree, and do a hard reset to the HEAD from origin.
  185. w, err := repo.Worktree()
  186. if err != nil {
  187. return false, err
  188. }
  189. if err := w.Reset(&git.ResetOptions{
  190. Commit: originHead.Hash(),
  191. Mode: git.HardReset,
  192. }); err != nil {
  193. return false, err
  194. }
  195.  
  196. return changed, nil
  197. }
  198.  
  199. func (f *Fetcher) doClone(ctx context.Context, url string, store *filesystem.Storage, worktree billy.Filesystem) (changed bool, err error) {
  200.  
  201. ctx, cancel := context.WithTimeout(ctx, f.config.GitCloneTimeout)
  202. defer cancel()
  203.  
  204. pw, errchan := newProgressWatcher(f.config.GitMaxObjects)
  205. defer pw.stop()
  206. var errFromWatcher error
  207. go func() {
  208. if err := <-errchan; err != nil {
  209. errFromWatcher = err
  210. cancel()
  211. }
  212. }()
  213.  
  214. if _, err := git.CloneContext(ctx, store, worktree, &git.CloneOptions{
  215. URL: url,
  216. Progress: pw,
  217. Tags: git.NoTags,
  218. SingleBranch: true,
  219. }); err != nil {
  220. if errFromWatcher != nil {
  221. return false, errFromWatcher
  222. }
  223. return false, err
  224. }
  225. return true, nil
  226. }
  227.  
  228. var progressRegex = []*regexp.Regexp{
  229. regexp.MustCompile(`Counting objects: (\d+), done\.?`),
  230. regexp.MustCompile(`Finding sources: +\d+% \(\d+/(\d+)\)`),
  231. }
  232.  
  233. func newProgressWatcher(configGitMaxObjects int) (*progressWatcher, chan error) {
  234. r, w := io.Pipe()
  235. p := &progressWatcher{
  236. w: w,
  237. }
  238. scanner := bufio.NewScanner(r)
  239. scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) {
  240. i := strings.IndexAny(string(data), "\r\n")
  241. if i >= 0 {
  242. return i + 1, data[:i], nil
  243. }
  244. if atEOF {
  245. return 0, nil, io.EOF
  246. }
  247. return 0, nil, nil
  248. })
  249. errchan := make(chan error)
  250. go func() {
  251. defer close(errchan)
  252. for {
  253. ok := scanner.Scan()
  254. if !ok {
  255. return
  256. }
  257. if matched, objects := matchProgress(scanner.Text()); matched && objects > configGitMaxObjects {
  258. errchan <- fmt.Errorf("too many git objects (max %d): %d", configGitMaxObjects, objects)
  259. }
  260. }
  261. }()
  262. return p, errchan
  263. }
  264.  
  265. type progressWatcher struct {
  266. w *io.PipeWriter
  267. }
  268.  
  269. func (p *progressWatcher) stop() {
  270. p.w.Close()
  271. }
  272.  
  273. func (p *progressWatcher) Write(b []byte) (n int, err error) {
  274. return p.w.Write(b)
  275. }
  276.  
  277. func matchProgress(s string) (matched bool, objects int) {
  278. for _, r := range progressRegex {
  279. matches := r.FindStringSubmatch(s)
  280. if len(matches) != 2 {
  281. continue
  282. }
  283. objects, err := strconv.Atoi(matches[1])
  284. if err != nil {
  285. continue
  286. }
  287. return true, objects
  288. }
  289. return false, 0
  290. }
  291.  
  292. func (f *Fetcher) save(ctx context.Context, fileserver services.Fileserver, repoUrl string, fs billy.Filesystem) error {
  293. // open the persisted git file for reading
  294. persisted, err := fs.Open(FNAME)
  295. if err != nil {
  296. return err
  297. }
  298. defer persisted.Close()
  299. if _, err := fileserver.Write(ctx, f.config.GitBucket, url.PathEscape(repoUrl), persisted, true, "application/octet-stream", "no-cache"); err != nil {
  300. return err
  301. }
  302. return nil
  303. }
  304.  
  305. func (f *Fetcher) load(ctx context.Context, fileserver services.Fileserver, repoUrl string, fs billy.Filesystem) (found bool, err error) {
  306. // open / create the persisted git file for writing
  307. persisted, err := fs.Create(FNAME)
  308. if err != nil {
  309. return false, err
  310. }
  311. defer persisted.Close()
  312. return fileserver.Read(ctx, f.config.GitBucket, url.PathEscape(repoUrl), persisted)
  313. }
Add Comment
Please, Sign In to add comment