...
 
Commits (4)
......@@ -24,167 +24,177 @@
# -m debian.org -a admin@db.debian.org \
# -e /etc/userdir-ldap/templtes/error-reply -- test.sh
import sys, traceback, time, os;
import pwd, getopt;
import email, email.parser
from userdir_gpg import *;
import email
import email.parser
import getopt
import os
import pwd
import sys
import time
import traceback
EX_TEMPFAIL = 75;
EX_PERMFAIL = 65; # EX_DATAERR
from userdir_gpg import *
EX_TEMPFAIL = 75
EX_PERMFAIL = 65 # EX_DATAERR
# Configuration
ReplayCacheFile = None;
LDAPDn = None;
LDAPServer = None;
GroupMember = None;
Phrases = None;
AllowMIME = 1;
Verbose = 0;
ReplayCacheFile = None
LDAPDn = None
LDAPServer = None
GroupMember = None
Phrases = None
AllowMIME = 1
Verbose = 0
class MessageError(Exception):
pass
pass
def verbmsg(msg):
if Verbose:
sys.stderr.write(msg + "\n")
if Verbose:
sys.stderr.write(msg + "\n")
# Match the key fingerprint against an LDAP directory
def CheckLDAP(FingerPrint):
import ldap;
import userdir_ldap;
# Connect to the ldap server
global ErrTyp, ErrMsg;
ErrType = EX_TEMPFAIL;
ErrMsg = "An error occurred while performing the LDAP lookup:";
global l;
l = userdir_ldap.connectLDAP(LDAPServer);
l.simple_bind_s("","");
# Search for the matching key fingerprint
verbmsg("Processing fingerprint %s" % FingerPrint)
Attrs = l.search_s(LDAPDn,ldap.SCOPE_ONELEVEL,"keyfingerprint=" + FingerPrint);
if len(Attrs) == 0:
raise MessageError("Key not found")
if len(Attrs) != 1:
raise MessageError("Oddly your key fingerprint is assigned to more than one account..")
gidnumber_found = 0;
for key in Attrs[0][1].keys():
if (key == "gidNumber"):
gidnumber_found = 1
if (gidnumber_found != 1):
raise MessageError("No gidnumber in attributes for fingerprint %s" % FingerPrint)
# Look for the group with the gid of the user
GAttr = l.search_s(LDAPDn,ldap.SCOPE_ONELEVEL,"(&(objectClass=debianGroup)(gidnumber=%s))" % Attrs[0][1]["gidNumber"][0], ["gid"])
if len(GAttr) == 0:
raise MessageError("Database inconsistency found: main group for account not found in database")
# See if the group membership is OK
# Only if a group was given on the commandline
if GroupMember is not None:
Hit = 0;
# Check primary group first
if GAttr[0][1]["gid"][0] == GroupMember:
Hit = 1
else:
# Check supplementary groups
for x in Attrs[0][1].get("supplementaryGid",[]):
if x == GroupMember:
Hit = 1;
if Hit != 1:
raise MessageError("You don't have %s group permissions."%(GroupMember))
import ldap
import userdir_ldap
# Connect to the ldap server
global ErrTyp, ErrMsg
ErrType = EX_TEMPFAIL
ErrMsg = "An error occurred while performing the LDAP lookup:"
global l
l = userdir_ldap.connectLDAP(LDAPServer)
l.simple_bind_s("", "")
# Search for the matching key fingerprint
verbmsg("Processing fingerprint %s" % FingerPrint)
Attrs = l.search_s(LDAPDn, ldap.SCOPE_ONELEVEL, "keyfingerprint=" + FingerPrint)
if len(Attrs) == 0:
raise MessageError("Key not found")
if len(Attrs) != 1:
raise MessageError("Oddly your key fingerprint is assigned to more than one account..")
gidnumber_found = 0
for key in Attrs[0][1].keys():
if key == "gidNumber":
gidnumber_found = 1
if gidnumber_found != 1:
raise MessageError("No gidnumber in attributes for fingerprint %s" % FingerPrint)
# Look for the group with the gid of the user
GAttr = l.search_s(LDAPDn, ldap.SCOPE_ONELEVEL, "(&(objectClass=debianGroup)(gidnumber=%s))" % Attrs[0][1]["gidNumber"][0], ["gid"])
if len(GAttr) == 0:
raise MessageError("Database inconsistency found: main group for account not found in database")
# See if the group membership is OK
# Only if a group was given on the commandline
if GroupMember is not None:
Hit = 0
# Check primary group first
if GAttr[0][1]["gid"][0] == GroupMember:
Hit = 1
else:
# Check supplementary groups
for x in Attrs[0][1].get("supplementaryGid", []):
if x == GroupMember:
Hit = 1
if Hit != 1:
raise MessageError("You don't have %s group permissions." % GroupMember)
# Start of main program
# Process options
(options, arguments) = getopt.getopt(sys.argv[1:], "r:k:d:l:g:mp:v");
options, arguments = getopt.getopt(sys.argv[1:], "r:k:d:l:g:mp:v")
for (switch, val) in options:
if (switch == '-r'):
ReplayCacheFile = val;
elif (switch == '-k'):
SetKeyrings(val.split(":"));
elif (switch == '-d'):
LDAPDn = val;
elif (switch == '-l'):
LDAPServer = val;
elif (switch == '-g'):
GroupMember = val;
elif (switch == '-m'):
AllowMIME = 0;
elif (switch == '-v'):
Verbose = 1;
elif (switch == '-p'):
Phrases = val;
Now = time.strftime("%a, %d %b %Y %H:%M:%S",time.gmtime(time.time()));
ErrMsg = "Indeterminate Error";
ErrType = EX_TEMPFAIL;
MsgID = None;
if (switch == '-r'):
ReplayCacheFile = val
elif (switch == '-k'):
SetKeyrings(val.split(":"))
elif (switch == '-d'):
LDAPDn = val
elif (switch == '-l'):
LDAPServer = val
elif (switch == '-g'):
GroupMember = val
elif (switch == '-m'):
AllowMIME = 0
elif (switch == '-v'):
Verbose = 1
elif (switch == '-p'):
Phrases = val
Now = time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime(time.time()))
ErrMsg = "Indeterminate Error"
ErrType = EX_TEMPFAIL
MsgID = None
try:
# Startup the replay cache
ErrType = EX_TEMPFAIL;
if ReplayCacheFile is not None:
ErrMsg = "Failed to initialize the replay cache:";
RC = ReplayCache(ReplayCacheFile);
# Get the email
ErrType = EX_PERMFAIL;
ErrMsg = "Failed to understand the email or find a signature:";
mail = email.parser.Parser().parse(sys.stdin);
MsgID = mail["Message-ID"]
print "Inspecting message %s"%MsgID;
verbmsg("Processing message %s" % MsgID)
Msg = GetClearSig(mail,1);
if AllowMIME == 0 and Msg[1] != 0:
raise MessageError("PGP/MIME disallowed")
ErrMsg = "Message is not PGP signed:"
if Msg[0].find("-----BEGIN PGP SIGNED MESSAGE-----") == -1:
raise MessageError("No PGP signature")
# Check the signature
ErrMsg = "Unable to check the signature or the signature was invalid:";
pgp = GPGCheckSig2(Msg[0])
if not pgp.ok:
raise UDFormatError(pgp.why)
if pgp.text is None:
raise UDFormatError("Null signature text")
# Check the signature against the replay cache
if ReplayCacheFile is not None:
RC.process(pgp.sig_info)
# Do LDAP stuff
if LDAPDn is not None:
CheckLDAP(pgp.key_fpr)
ErrMsg = "Verifying message:";
if Phrases is not None:
F = open(Phrases,"r");
while 1:
Line = F.readline();
if Line == "": break;
if pgp.text.find(Line.strip()) == -1:
raise MessageError("Phrase '%s' was not found" % (Line.strip()))
except:
ErrMsg = "[%s] \"%s\" \"%s %s\"\n"%(Now,MsgID,ErrMsg,sys.exc_value);
sys.stderr.write(ErrMsg);
Trace = "==> %s: %s\n" %(sys.exc_type,sys.exc_value);
List = traceback.extract_tb(sys.exc_traceback);
if len(List) >= 1:
Trace = Trace + "Python Stack Trace:\n";
for x in List:
Trace = Trace + " %s %s:%u: %s\n" %(x[2],x[0],x[1],x[3]);
#print Trace;
sys.exit(EX_PERMFAIL);
# For Main
print "Message %s passed"%MsgID;
sys.exit(0);
# Startup the replay cache
ErrType = EX_TEMPFAIL
if ReplayCacheFile is not None:
ErrMsg = "Failed to initialize the replay cache:"
RC = ReplayCache(ReplayCacheFile)
# Get the email
ErrType = EX_PERMFAIL
ErrMsg = "Failed to understand the email or find a signature:"
mail = email.parser.Parser().parse(sys.stdin)
MsgID = mail["Message-ID"]
print "Inspecting message %s" % MsgID
verbmsg("Processing message %s" % MsgID)
Msg = GetClearSig(mail, 1)
if AllowMIME == 0 and Msg[1] != 0:
raise MessageError("PGP/MIME disallowed")
ErrMsg = "Message is not PGP signed:"
if Msg[0].find("-----BEGIN PGP SIGNED MESSAGE-----") == -1:
raise MessageError("No PGP signature")
# Check the signature
ErrMsg = "Unable to check the signature or the signature was invalid:"
pgp = GPGCheckSig2(Msg[0])
if not pgp.ok:
raise UDFormatError(pgp.why)
if pgp.text is None:
raise UDFormatError("Null signature text")
# Check the signature against the replay cache
if ReplayCacheFile is not None:
RC.process(pgp.sig_info)
# Do LDAP stuff
if LDAPDn is not None:
CheckLDAP(pgp.key_fpr)
ErrMsg = "Verifying message:"
if Phrases is not None:
F = open(Phrases, "r")
while 1:
Line = F.readline()
if Line == "":
break
if pgp.text.find(Line.strip()) == -1:
raise MessageError("Phrase '%s' was not found" % (Line.strip()))
except Exception:
ErrMsg = "[%s] \"%s\" \"%s %s\"\n" % (Now, MsgID, ErrMsg, sys.exc_value)
sys.stderr.write(ErrMsg)
Trace = "==> %s: %s\n" % (sys.exc_type, sys.exc_value)
List = traceback.extract_tb(sys.exc_traceback)
if len(List) >= 1:
Trace = Trace + "Python Stack Trace:\n"
for x in List:
Trace = Trace + " %s %s:%u: %s\n" % (x[2], x[0], x[1], x[3])
sys.exit(EX_PERMFAIL)
# For Main
print "Message %s passed" % MsgID
sys.exit(0)
......@@ -1330,7 +1330,7 @@ def generate_host(host, global_dir, all_accounts, all_hosts, ssh_userkeys):
v = entry.split('=',1)
if v[0] != 'GITOLITE' or len(v) != 2: continue
options = v[1].split(',')
group = options.pop(0);
group = options.pop(0)
gitolite_accounts = filter(lambda x: IsInGroup(x, [group], current_host), all_accounts)
if 'nohosts' not in options:
gitolite_hosts = filter(lambda x: GitoliteExportHosts.match(x[1]["hostname"][0]), all_hosts)
......