diff hgext/gpg.py @ 1592:347c44611348

gpg signing extension for hg the principle is almost the same as how tags work: .hgsigs stores signatures, localsigs stores local signatures the format of the signatures is: nodeid sigversion base64_detached_sig sigversion 0 signs simply the nodeid (maybe we would like to sign other things in the future). you can associate fingerprints with roles in hgrc like: [gpg] fingerprint_of_a_key_without_spaces = release fingerprint_of_a_key_without_spaces = contributor, testing the key used for signing can be specified on the command line or via hgrc (key =) thanks to Eric Hopper for testing and bugfixing
author Benoit Boissinot <benoit.boissinot@ens-lyon.org>
date Fri, 16 Dec 2005 11:12:08 -0600
parents
children ff339dd21976
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/hgext/gpg.py	Fri Dec 16 11:12:08 2005 -0600
@@ -0,0 +1,207 @@
+import os, tempfile, binascii, errno
+from mercurial import util
+from mercurial import node as hgnode
+
+class gpg:
+    def __init__(self, path, key=None):
+        self.path = path
+        self.key = (key and " --local-user \"%s\"" % key) or ""
+
+    def sign(self, data):
+        gpgcmd = "%s --sign --detach-sign%s" % (self.path, self.key)
+        return util.filter(data, gpgcmd)
+
+    def verify(self, data, sig):
+        """ returns of the good and bad signatures"""
+        try:
+            fd, sigfile = tempfile.mkstemp(prefix="hggpgsig")
+            fp = os.fdopen(fd, 'wb')
+            fp.write(sig)
+            fp.close()
+            fd, datafile = tempfile.mkstemp(prefix="hggpgdata")
+            fp = os.fdopen(fd, 'wb')
+            fp.write(data)
+            fp.close()
+            gpgcmd = "%s --logger-fd 1 --status-fd 1 --verify \"%s\" \"%s\"" % (self.path, sigfile, datafile)
+            #gpgcmd = "%s --status-fd 1 --verify \"%s\" \"%s\"" % (self.path, sigfile, datafile)
+            ret = util.filter("", gpgcmd)
+        except:
+            for f in (sigfile, datafile):
+                try:
+                    if f: os.unlink(f)
+                except: pass
+            raise
+        keys = []
+        key, fingerprint = None, None
+        err = ""
+        for l in ret.splitlines():
+            # see DETAILS in the gnupg documentation
+            # filter the logger output
+            if not l.startswith("[GNUPG:]"):
+                continue
+            l = l[9:]
+            if l.startswith("ERRSIG"):
+                err = "error while verifying signature"
+                break
+            elif l.startswith("VALIDSIG"):
+                # fingerprint of the primary key
+                fingerprint = l.split()[10]
+            elif (l.startswith("GOODSIG") or
+                  l.startswith("EXPSIG") or
+                  l.startswith("EXPKEYSIG") or
+                  l.startswith("BADSIG")):
+                if key is not None:
+                    keys.append(key + [fingerprint])
+                key = l.split(" ", 2)
+                fingerprint = None
+        if err:
+            return err, []
+        if key is not None:
+            keys.append(key + [fingerprint])
+        return err, keys
+
+def newgpg(ui, **opts):
+    gpgpath = ui.config("gpg", "cmd", "gpg")
+    gpgkey = opts.get('key')
+    if not gpgkey:
+        gpgkey = ui.config("gpg", "key", None)
+    return gpg(gpgpath, gpgkey)
+
+def check(ui, repo, rev):
+    """verify all the signatures there may be for a particular revision"""
+    mygpg = newgpg(ui)
+    rev = repo.lookup(rev)
+    hexrev = hgnode.hex(rev)
+    keys = []
+
+    def addsig(fn, ln, l):
+        if not l: return
+        n, v, sig = l.split(" ", 2)
+        if n == hexrev:
+            data = node2txt(repo, rev, v)
+            sig = binascii.a2b_base64(sig)
+            err, k = mygpg.verify(data, sig)
+            if not err:
+                keys.append((k, fn, ln))
+            else:
+                ui.warn("%s:%d %s\n" % (fn, ln , err))
+
+    fl = repo.file(".hgsigs")
+    h = fl.heads()
+    h.reverse()
+    # read the heads
+    for r in h:
+        ln = 1
+        for l in fl.read(r).splitlines():
+            addsig(".hgsigs|%s" % hgnode.short(r), ln, l)
+            ln +=1
+    try:
+        # read local signatures
+        ln = 1
+        f = repo.opener("localsigs")
+        for l in f:
+            addsig("localsigs", ln, l)
+            ln +=1
+    except IOError:
+        pass
+
+    if not keys:
+        ui.write("%s not signed\n" % hgnode.short(rev))
+        return
+    valid = []
+    # warn for expired key and/or sigs
+    for k, fn, ln in keys:
+        prefix = "%s:%d" % (fn, ln)
+        for key in k:
+            if key[0] == "BADSIG":
+                ui.write("%s Bad signature from \"%s\"\n" % (prefix, key[2]))
+                continue
+            if key[0] == "EXPSIG":
+                ui.write("%s Note: Signature has expired"
+                         " (signed by: \"%s\")\n" % (prefix, key[2]))
+            elif key[0] == "EXPKEYSIG":
+                ui.write("%s Note: This key has expired"
+                         " (signed by: \"%s\")\n" % (prefix, key[2]))
+            valid.append((key[1], key[2], key[3]))
+    # print summary
+    ui.write("%s is signed by:\n" % hgnode.short(rev))
+    for keyid, user, fingerprint in valid:
+        role = getrole(ui, fingerprint)
+        ui.write("  %s (%s)\n" % (user, role))
+
+def getrole(ui, fingerprint):
+    return ui.config("gpg", fingerprint, "no role defined")
+
+def sign(ui, repo, *revs, **opts):
+    """add a signature for the current tip or a given revision"""
+    mygpg = newgpg(ui, **opts)
+    sigver = "0"
+    sigmessage = ""
+    if revs:
+        nodes = [repo.lookup(n) for n in revs]
+    else:
+        nodes = [repo.changelog.tip()]
+
+    for n in nodes:
+        hexnode = hgnode.hex(n)
+        ui.write("Signing %d:%s\n" % (repo.changelog.rev(n),
+                                      hgnode.short(n)))
+        # build data
+        data = node2txt(repo, n, sigver)
+        sig = mygpg.sign(data)
+        if not sig:
+            raise util.Abort("Error while signing")
+        sig = binascii.b2a_base64(sig)
+        sig = sig.replace("\n", "")
+        sigmessage += "%s %s %s\n" % (hexnode, sigver, sig)
+
+    # write it
+    if opts['local']:
+        repo.opener("localsigs", "ab").write(sigmessage)
+        return
+
+    (c, a, d, u) = repo.changes()
+    for x in (c, a, d, u):
+        if ".hgsigs" in x and not opts["force"]:
+            raise util.Abort("working copy of .hgsigs is changed "
+                             "(please commit .hgsigs manually"
+                             "or use --force)")
+
+    repo.wfile(".hgsigs", "ab").write(sigmessage)
+
+    if repo.dirstate.state(".hgsigs") == '?':
+        repo.add([".hgsigs"])
+
+    if opts["no_commit"]:
+        return
+
+    message = opts['message']
+    if not message:
+        message = "\n".join(["Added signature for changeset %s" % hgnode.hex(n)
+                             for n in nodes])
+    try:
+        repo.commit([".hgsigs"], message, opts['user'], opts['date'])
+    except ValueError, inst:
+        raise util.Abort(str(inst))
+
+def node2txt(repo, node, ver):
+    """map a manifest into some text"""
+    if ver == "0":
+        return "%s\n" % hgnode.hex(node)
+    else:
+        util.Abort("unknown signature version")
+
+cmdtable = {
+    "sign":
+        (sign,
+         [('l', 'local', None, "make the signature local"),
+          ('f', 'force', None, "sign even if the sigfile is modified"),
+          ('', 'no-commit', None, "do not commit the sigfile after signing"),
+          ('m', 'message', "", "commit message"),
+          ('d', 'date', "", "date code"),
+          ('u', 'user', "", "user"),
+          ('k', 'key', "", "the key id to sign with")],
+         "hg sign [OPTION]... REVISIONS"),
+    "sigcheck": (check, [], 'hg sigcheck REVISION')
+}
+