changeset 2612:ffb895f16925

add support for streaming clone. existing clone code uses pull to get changes from remote repo. is very slow, uses lots of memory and cpu. new clone code has server write file data straight to client, client writes file data straight to disk. memory and cpu used are very low, clone is much faster over lan. new client can still clone with pull, can still clone from older servers. new server can still serve older clients.
author Vadim Gelfer <vadim.gelfer@gmail.com>
date Fri, 14 Jul 2006 11:17:22 -0700
parents 1b4eb1f92433
children 479e26afa10f
files mercurial/hg.py mercurial/hgweb/hgweb_mod.py mercurial/httprepo.py mercurial/localrepo.py mercurial/remoterepo.py mercurial/repo.py mercurial/sshrepo.py mercurial/sshserver.py mercurial/streamclone.py mercurial/util.py tests/test-http tests/test-http-proxy tests/test-http-proxy.out tests/test-http.out tests/test-pull tests/test-ssh tests/test-ssh.out
diffstat 17 files changed, 294 insertions(+), 21 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/hg.py	Thu Jul 13 09:50:51 2006 -0700
+++ b/mercurial/hg.py	Fri Jul 14 11:17:22 2006 -0700
@@ -179,7 +179,7 @@
             revs = [src_repo.lookup(r) for r in rev]
 
         if dest_repo.local():
-            dest_repo.pull(src_repo, heads=revs)
+            dest_repo.clone(src_repo, heads=revs, pull=pull)
         elif src_repo.local():
             src_repo.push(dest_repo, revs=revs)
         else:
--- a/mercurial/hgweb/hgweb_mod.py	Thu Jul 13 09:50:51 2006 -0700
+++ b/mercurial/hgweb/hgweb_mod.py	Fri Jul 14 11:17:22 2006 -0700
@@ -11,7 +11,8 @@
 import mimetypes
 from mercurial.demandload import demandload
 demandload(globals(), "re zlib ConfigParser mimetools cStringIO sys tempfile")
-demandload(globals(), "mercurial:mdiff,ui,hg,util,archival,templater")
+demandload(globals(), "mercurial:mdiff,ui,hg,util,archival,streamclone")
+demandload(globals(), "mercurial:templater")
 demandload(globals(), "mercurial.hgweb.common:get_mtime,staticfile")
 from mercurial.node import *
 from mercurial.i18n import gettext as _
@@ -859,7 +860,7 @@
                   or self.t("error", error="%r not found" % fname))
 
     def do_capabilities(self, req):
-        resp = 'unbundle'
+        resp = 'unbundle stream=%d' % (self.repo.revlogversion,)
         req.httphdr("application/mercurial-0.1", length=len(resp))
         req.write(resp)
 
@@ -950,3 +951,7 @@
         finally:
             fp.close()
             os.unlink(tempname)
+
+    def do_stream_out(self, req):
+        req.httphdr("application/mercurial-0.1")
+        streamclone.stream_out(self.repo, req)
--- a/mercurial/httprepo.py	Thu Jul 13 09:50:51 2006 -0700
+++ b/mercurial/httprepo.py	Fri Jul 14 11:17:22 2006 -0700
@@ -326,6 +326,9 @@
             fp.close()
             os.unlink(tempname)
 
+    def stream_out(self):
+        return self.do_cmd('stream_out')
+
 class httpsrepository(httprepository):
     def __init__(self, ui, path):
         if not has_https:
--- a/mercurial/localrepo.py	Thu Jul 13 09:50:51 2006 -0700
+++ b/mercurial/localrepo.py	Fri Jul 14 11:17:22 2006 -0700
@@ -8,17 +8,19 @@
 from node import *
 from i18n import gettext as _
 from demandload import *
+import repo
 demandload(globals(), "appendfile changegroup")
-demandload(globals(), "changelog dirstate filelog manifest repo context")
+demandload(globals(), "changelog dirstate filelog manifest context")
 demandload(globals(), "re lock transaction tempfile stat mdiff errno ui")
-demandload(globals(), "os revlog util")
+demandload(globals(), "os revlog time util")
 
-class localrepository(object):
+class localrepository(repo.repository):
     capabilities = ()
 
     def __del__(self):
         self.transhandle = None
     def __init__(self, parentui, path=None, create=0):
+        repo.repository.__init__(self)
         if not path:
             p = os.getcwd()
             while not os.path.isdir(os.path.join(p, ".hg")):
@@ -1183,7 +1185,7 @@
         # unbundle assumes local user cannot lock remote repo (new ssh
         # servers, http servers).
 
-        if 'unbundle' in remote.capabilities:
+        if remote.capable('unbundle'):
             return self.push_unbundle(remote, force, revs)
         return self.push_addchangegroup(remote, force, revs)
 
@@ -2201,6 +2203,47 @@
             self.ui.warn(_("%d integrity errors encountered!\n") % errors[0])
             return 1
 
+    def stream_in(self, remote):
+        self.ui.status(_('streaming all changes\n'))
+        fp = remote.stream_out()
+        total_files, total_bytes = map(int, fp.readline().split(' ', 1))
+        self.ui.status(_('%d files to transfer, %s of data\n') %
+                       (total_files, util.bytecount(total_bytes)))
+        start = time.time()
+        for i in xrange(total_files):
+            name, size = fp.readline().split('\0', 1)
+            size = int(size)
+            self.ui.debug('adding %s (%s)\n' % (name, util.bytecount(size)))
+            ofp = self.opener(name, 'w')
+            for chunk in util.filechunkiter(fp, limit=size):
+                ofp.write(chunk)
+            ofp.close()
+        elapsed = time.time() - start
+        self.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
+                       (util.bytecount(total_bytes), elapsed,
+                        util.bytecount(total_bytes / elapsed)))
+        self.reload()
+        return len(self.heads()) + 1
+        
+    def clone(self, remote, heads=[], pull=False):
+        '''clone remote repository.
+        if possible, changes are streamed from remote server.
+
+        keyword arguments:
+        heads: list of revs to clone (forces use of pull)
+        pull: force use of pull, even if remote can stream'''
+
+        # now, all clients that can stream can read repo formats
+        # supported by all servers that can stream.
+
+        # if revlog format changes, client will have to check version
+        # and format flags on "stream" capability, and stream only if
+        # compatible.
+
+        if not pull and not heads and remote.capable('stream'):
+            return self.stream_in(remote)
+        return self.pull(remote, heads)
+
 # used to avoid circular references so destructors work
 def aftertrans(base):
     p = base
--- a/mercurial/remoterepo.py	Thu Jul 13 09:50:51 2006 -0700
+++ b/mercurial/remoterepo.py	Fri Jul 14 11:17:22 2006 -0700
@@ -5,7 +5,9 @@
 # This software may be used and distributed according to the terms
 # of the GNU General Public License, incorporated herein by reference.
 
-class remoterepository(object):
+import repo
+
+class remoterepository(repo.repository):
     def dev(self):
         return -1
 
--- a/mercurial/repo.py	Thu Jul 13 09:50:51 2006 -0700
+++ b/mercurial/repo.py	Fri Jul 14 11:17:22 2006 -0700
@@ -5,4 +5,19 @@
 # This software may be used and distributed according to the terms
 # of the GNU General Public License, incorporated herein by reference.
 
-class RepoError(Exception): pass
+class RepoError(Exception):
+    pass
+
+class repository(object):
+    def capable(self, name):
+        '''tell whether repo supports named capability.
+        return False if not supported.
+        if boolean capability, return True.
+        if string capability, return string.'''
+        name_eq = name + '='
+        for cap in self.capabilities:
+            if name == cap:
+                return True
+            if cap.startswith(name_eq):
+                return cap[len(name_eq):]
+        return False
--- a/mercurial/sshrepo.py	Thu Jul 13 09:50:51 2006 -0700
+++ b/mercurial/sshrepo.py	Fri Jul 14 11:17:22 2006 -0700
@@ -198,3 +198,6 @@
         if not r:
             return 1
         return int(r)
+
+    def stream_out(self):
+        return self.do_cmd('stream_out')
--- a/mercurial/sshserver.py	Thu Jul 13 09:50:51 2006 -0700
+++ b/mercurial/sshserver.py	Fri Jul 14 11:17:22 2006 -0700
@@ -8,7 +8,7 @@
 from demandload import demandload
 from i18n import gettext as _
 from node import *
-demandload(globals(), "os sys tempfile util")
+demandload(globals(), "os streamclone sys tempfile util")
 
 class sshserver(object):
     def __init__(self, ui, repo):
@@ -60,7 +60,7 @@
         capabilities: space separated list of tokens
         '''
 
-        r = "capabilities: unbundle\n"
+        r = "capabilities: unbundle stream=%d\n" % (self.repo.revlogversion,)
         self.respond(r)
 
     def do_lock(self):
@@ -167,3 +167,5 @@
             fp.close()
             os.unlink(tempname)
 
+    def do_stream_out(self):
+        streamclone.stream_out(self.repo, self.fout)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mercurial/streamclone.py	Fri Jul 14 11:17:22 2006 -0700
@@ -0,0 +1,82 @@
+# streamclone.py - streaming clone server support for mercurial
+#
+# Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
+#
+# This software may be used and distributed according to the terms
+# of the GNU General Public License, incorporated herein by reference.
+
+from demandload import demandload
+from i18n import gettext as _
+demandload(globals(), "os stat util")
+
+# if server supports streaming clone, it advertises "stream"
+# capability with value that is version+flags of repo it is serving.
+# client only streams if it can read that repo format.
+
+def walkrepo(root):
+    '''iterate over metadata files in repository.
+    walk in natural (sorted) order.
+    yields 2-tuples: name of .d or .i file, size of file.'''
+
+    strip_count = len(root) + len(os.sep)
+    def walk(path, recurse):
+        ents = os.listdir(path)
+        ents.sort()
+        for e in ents:
+            pe = os.path.join(path, e)
+            st = os.lstat(pe)
+            if stat.S_ISDIR(st.st_mode):
+                if recurse:
+                    for x in walk(pe, True):
+                        yield x
+            else:
+                if not stat.S_ISREG(st.st_mode) or len(e) < 2:
+                    continue
+                sfx = e[-2:]
+                if sfx in ('.d', '.i'):
+                    yield pe[strip_count:], st.st_size
+    # write file data first
+    for x in walk(os.path.join(root, 'data'), True):
+        yield x
+    # write manifest before changelog
+    meta = list(walk(root, False))
+    meta.sort(reverse=True)
+    for x in meta:
+        yield x
+
+# stream file format is simple.
+#
+# server writes out line that says how many files, how many total
+# bytes.  separator is ascii space, byte counts are strings.
+#
+# then for each file:
+#
+#   server writes out line that says file name, how many bytes in
+#   file.  separator is ascii nul, byte count is string.
+#
+#   server writes out raw file data.
+
+def stream_out(repo, fileobj):
+    '''stream out all metadata files in repository.
+    writes to file-like object, must support write() and optional flush().'''
+    # get consistent snapshot of repo. lock during scan so lock not
+    # needed while we stream, and commits can happen.
+    lock = repo.lock()
+    repo.ui.debug('scanning\n')
+    entries = []
+    total_bytes = 0
+    for name, size in walkrepo(repo.path):
+        entries.append((name, size))
+        total_bytes += size
+    lock.release()
+
+    repo.ui.debug('%d files, %d bytes to transfer\n' %
+                  (len(entries), total_bytes))
+    fileobj.write('%d %d\n' % (len(entries), total_bytes))
+    for name, size in entries:
+        repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
+        fileobj.write('%s\0%d\n' % (name, size))
+        for chunk in util.filechunkiter(repo.opener(name), limit=size):
+            fileobj.write(chunk)
+    flush = getattr(fileobj, 'flush', None)
+    if flush: flush()
--- a/mercurial/util.py	Thu Jul 13 09:50:51 2006 -0700
+++ b/mercurial/util.py	Fri Jul 14 11:17:22 2006 -0700
@@ -961,3 +961,24 @@
         else:
             _rcpath = os_rcpath()
     return _rcpath
+
+def bytecount(nbytes):
+    '''return byte count formatted as readable string, with units'''
+
+    units = (
+        (100, 1<<30, _('%.0f GB')),
+        (10, 1<<30, _('%.1f GB')),
+        (1, 1<<30, _('%.2f GB')),
+        (100, 1<<20, _('%.0f MB')),
+        (10, 1<<20, _('%.1f MB')),
+        (1, 1<<20, _('%.2f MB')),
+        (100, 1<<10, _('%.0f KB')),
+        (10, 1<<10, _('%.1f KB')),
+        (1, 1<<10, _('%.2f KB')),
+        (1, 1, _('%.0f bytes')),
+        )
+
+    for multiplier, divisor, format in units:
+        if nbytes >= divisor * multiplier:
+            return format % (nbytes / float(divisor))
+    return units[-1][2] % nbytes
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/test-http	Fri Jul 14 11:17:22 2006 -0700
@@ -0,0 +1,25 @@
+#!/bin/sh
+
+mkdir test
+cd test
+echo foo>foo
+hg init
+hg addremove
+hg commit -m 1
+hg verify
+hg serve -p 20059 -d --pid-file=hg.pid
+cat hg.pid >> $DAEMON_PIDS
+cd ..
+
+echo % clone via stream
+http_proxy= hg clone http://localhost:20059/ copy 2>&1 | \
+  sed -e 's/[0-9][0-9.]*/XXX/g'
+cd copy
+hg verify
+
+cd ..
+
+echo % clone via pull
+http_proxy= hg clone --pull http://localhost:20059/ copy-pull
+cd copy-pull
+hg verify
--- a/tests/test-http-proxy	Thu Jul 13 09:50:51 2006 -0700
+++ b/tests/test-http-proxy	Fri Jul 14 11:17:22 2006 -0700
@@ -13,17 +13,27 @@
 cat proxy.pid >> $DAEMON_PIDS
 sleep 2
 
-echo %% url for proxy
-http_proxy=http://localhost:20060/ hg --config http_proxy.always=True clone http://localhost:20059/ b
+echo %% url for proxy, stream
+http_proxy=http://localhost:20060/ hg --config http_proxy.always=True clone http://localhost:20059/ b | \
+  sed -e 's/[0-9][0-9.]*/XXX/g'
+cd b
+hg verify
+cd ..
+
+echo %% url for proxy, pull
+http_proxy=http://localhost:20060/ hg --config http_proxy.always=True clone --pull http://localhost:20059/ b-pull
+cd b-pull
+hg verify
+cd ..
 
 echo %% host:port for proxy
-http_proxy=localhost:20060 hg clone --config http_proxy.always=True http://localhost:20059/ c
+http_proxy=localhost:20060 hg clone --pull --config http_proxy.always=True http://localhost:20059/ c
 
 echo %% proxy url with user name and password
-http_proxy=http://user:passwd@localhost:20060 hg clone --config http_proxy.always=True http://localhost:20059/ d
+http_proxy=http://user:passwd@localhost:20060 hg clone --pull --config http_proxy.always=True http://localhost:20059/ d
 
 echo %% url with user name and password
-http_proxy=http://user:passwd@localhost:20060 hg clone --config http_proxy.always=True http://user:passwd@localhost:20059/ e
+http_proxy=http://user:passwd@localhost:20060 hg clone --pull --config http_proxy.always=True http://user:passwd@localhost:20059/ e
 
 echo %% bad host:port for proxy
 http_proxy=localhost:20061 hg clone --config http_proxy.always=True http://localhost:20059/ f
--- a/tests/test-http-proxy.out	Thu Jul 13 09:50:51 2006 -0700
+++ b/tests/test-http-proxy.out	Fri Jul 14 11:17:22 2006 -0700
@@ -1,11 +1,26 @@
 adding a
-%% url for proxy
+%% url for proxy, stream
+streaming all changes
+XXX files to transfer, XXX bytes of data
+transferred XXX bytes in XXX seconds (XXX KB/sec)
+XXX files updated, XXX files merged, XXX files removed, XXX files unresolved
+checking changesets
+checking manifests
+crosschecking files in changesets and manifests
+checking files
+1 files, 1 changesets, 1 total revisions
+%% url for proxy, pull
 requesting all changes
 adding changesets
 adding manifests
 adding file changes
 added 1 changesets with 1 changes to 1 files
 1 files updated, 0 files merged, 0 files removed, 0 files unresolved
+checking changesets
+checking manifests
+crosschecking files in changesets and manifests
+checking files
+1 files, 1 changesets, 1 total revisions
 %% host:port for proxy
 requesting all changes
 adding changesets
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/test-http.out	Fri Jul 14 11:17:22 2006 -0700
@@ -0,0 +1,29 @@
+(the addremove command is deprecated; use add and remove --after instead)
+adding foo
+checking changesets
+checking manifests
+crosschecking files in changesets and manifests
+checking files
+1 files, 1 changesets, 1 total revisions
+% clone via stream
+streaming all changes
+XXX files to transfer, XXX bytes of data
+transferred XXX bytes in XXX seconds (XXX KB/sec)
+XXX files updated, XXX files merged, XXX files removed, XXX files unresolved
+checking changesets
+checking manifests
+crosschecking files in changesets and manifests
+checking files
+1 files, 1 changesets, 1 total revisions
+% clone via pull
+requesting all changes
+adding changesets
+adding manifests
+adding file changes
+added 1 changesets with 1 changes to 1 files
+1 files updated, 0 files merged, 0 files removed, 0 files unresolved
+checking changesets
+checking manifests
+crosschecking files in changesets and manifests
+checking files
+1 files, 1 changesets, 1 total revisions
--- a/tests/test-pull	Thu Jul 13 09:50:51 2006 -0700
+++ b/tests/test-pull	Fri Jul 14 11:17:22 2006 -0700
@@ -11,7 +11,7 @@
 cat hg.pid >> $DAEMON_PIDS
 cd ..
 
-http_proxy= hg clone http://localhost:20059/ copy
+http_proxy= hg clone --pull http://localhost:20059/ copy
 cd copy
 hg verify
 hg co
--- a/tests/test-ssh	Thu Jul 13 09:50:51 2006 -0700
+++ b/tests/test-ssh	Fri Jul 14 11:17:22 2006 -0700
@@ -30,8 +30,15 @@
 
 cd ..
 
-echo "# clone remote"
-hg clone -e ./dummyssh ssh://user@dummy/remote local
+echo "# clone remote via stream"
+hg clone -e ./dummyssh ssh://user@dummy/remote local-stream 2>&1 | \
+  sed -e 's/[0-9][0-9.]*/XXX/g'
+cd local-stream
+hg verify
+cd ..
+
+echo "# clone remote via pull"
+hg clone -e ./dummyssh --pull ssh://user@dummy/remote local
 
 echo "# verify"
 cd local
--- a/tests/test-ssh.out	Thu Jul 13 09:50:51 2006 -0700
+++ b/tests/test-ssh.out	Fri Jul 14 11:17:22 2006 -0700
@@ -1,5 +1,15 @@
 # creating 'remote'
-# clone remote
+# clone remote via stream
+streaming all changes
+XXX files to transfer, XXX bytes of data
+transferred XXX bytes in XXX seconds (XXX KB/sec)
+XXX files updated, XXX files merged, XXX files removed, XXX files unresolved
+checking changesets
+checking manifests
+crosschecking files in changesets and manifests
+checking files
+1 files, 1 changesets, 1 total revisions
+# clone remote via pull
 requesting all changes
 adding changesets
 adding manifests
@@ -70,6 +80,7 @@
 Got arguments 1:user@dummy 2:hg -R remote serve --stdio 3: 4: 5:
 Got arguments 1:user@dummy 2:hg -R remote serve --stdio 3: 4: 5:
 Got arguments 1:user@dummy 2:hg -R remote serve --stdio 3: 4: 5:
+Got arguments 1:user@dummy 2:hg -R remote serve --stdio 3: 4: 5:
 Got arguments 1:user@dummy 2:hg -R local serve --stdio 3: 4: 5:
 Got arguments 1:user@dummy 2:hg -R remote serve --stdio 3: 4: 5:
 Got arguments 1:user@dummy 2:hg -R remote serve --stdio 3: 4: 5: