Commit a0c1c599 authored by Enrico Zini's avatar Enrico Zini

Initial version

parent 644c5289
This diff is collapsed.
#!/usr/bin/python3
import argparse
import logging
import mailbox
import sys
import email.utils
import email.header
from texttable import Texttable
from collections import Counter, defaultdict
import re
import statistics
import math
log = logging.getLogger("election-stats")
class Fail(Exception):
pass
class Tfidf:
re_tokenize = re.compile(r"\b\w+\b")
def __init__(self):
self.term_frequency = Counter()
self.document_count = 0
def tokenize(self, text, stopwords=()):
for word in self.re_tokenize.findall(text):
if word.isdigit():
continue
if word[0].isdigit():
continue
if len(word) < 4:
continue
word = word.lower()
if word in stopwords:
continue
yield word.lower()
def add_document(self, lines):
self.document_count += 1
words = set()
for line in lines:
words.update(self.tokenize(line))
for word in words:
self.term_frequency[word] += 1
def score_document_words(self, lines, stopwords=()):
df = Counter()
words = set()
for line in lines:
for word in self.tokenize(line, stopwords=stopwords):
words.add(word)
df[word] += 1
return Counter({
word: math.log(1 + df[word]) * math.log(self.document_count / self.term_frequency[word])
for word in words
})
class Stats:
re_quote = re.compile(r"^\s*\w*>")
re_tokenize = re.compile(r"\b")
def __init__(self):
self.realnames = {}
self.counts = Counter()
# Lines per message, by address
self.lpm = defaultdict(list)
# Message unquoted body text lines, by address
self.contents = defaultdict(list)
def read_mbox(self, filename):
mbox = mailbox.Maildir(filename, create=False)
for mail in mbox:
self.read_mail(mail)
def read_mail(self, mail):
_from = mail.get("From")
if _from is None:
log.warning("skipping message without a From header")
return
if isinstance(_from, email.header.Header):
realname, address = email.utils.parseaddr(str(_from))
else:
realname, address = email.utils.parseaddr(_from)
# Normalise people writing with multiple emails
if address == "kurt@roeckx.be":
address = "secretary@debian.org"
self.realnames[address] = realname
self.counts[address] += 1
lines = []
text = self.get_text(mail)
is_gpg_signature = False
if text is not None:
for line in text.splitlines():
if self.re_quote.match(line):
continue
line = line.strip()
if line == "-----BEGIN PGP SIGNATURE-----":
is_gpg_signature = True
elif line == "-----END PGP SIGNATURE-----":
is_gpg_signature = False
if is_gpg_signature:
continue
if line.startswith("--"):
continue
if line == "Hash: SHA512":
continue
if line == "--":
break
lines.append(line.strip())
self.lpm[address].append(len(lines))
self.contents[address].append(lines)
def get_text(self, mail):
text_parts = []
for part in mail.walk():
if part.get_content_type() != "text/plain":
continue
data = part.get_payload(decode=True)
try:
text = data.decode("utf-8")
except UnicodeDecodeError:
log.warning("%s: non-utf8 in mail", mail.get("From"))
text = data.decode(errors="replace")
text_parts.append(text)
if len(text_parts) != 1:
log.warning("mail has %d text parts", len(text_parts))
return None
return text_parts[0]
def print_post_count(self):
table = Texttable()
table.set_deco(Texttable.HEADER)
table.set_header_align(("l", "l", "r"))
table.set_cols_align(("l", "l", "r"))
table.header(("Name", "Email", "Mails"))
rows = []
for address, count in self.counts.items():
if count <= 2:
continue
rows.append((self.realnames[address], address, count))
rows.sort(key=lambda x: (-x[2], x[0]))
for row in rows:
table.add_row(row)
print()
print(" * Posted more than two messages")
print()
print(table.draw())
def print_tldr(self):
table = Texttable()
table.set_deco(Texttable.HEADER)
table.set_header_align(("l", "l", "r", "r"))
table.set_cols_align(("l", "l", "r", "r"))
table.header(("Name", "Email", "Sum", "Avg"))
rows = []
for address, lengths in self.lpm.items():
if len(lengths) <= 2:
continue
rows.append((self.realnames[address], address, sum(lengths), round(statistics.mean(lengths))))
rows.sort(key=lambda x: (-x[2], x[0]))
for row in rows:
table.add_row(row)
print()
print(" * Sum and average of non-quoted message lines")
print()
print(table.draw())
def print_tfidf(self):
tfidf = Tfidf()
for bodies in self.contents.values():
for lines in bodies:
tfidf.add_document(lines)
print()
print(" * Message top keywords")
print()
for address, bodies in sorted(self.contents.items()):
if address not in ("hartmans@debian.org", "tbm@cyrius.com", "jcc@debian.org", "joerg@debian.org"):
continue
if len(bodies) <= 2:
continue
realname = self.realnames[address]
stopwords = ["highvoltage", "jonathancarter", "bureado", "https", "march"]
stopwords.extend(tfidf.tokenize(realname))
stopwords.extend(tfidf.tokenize(address))
sums = Counter()
for lines in bodies:
sums.update(tfidf.score_document_words(lines, stopwords=stopwords))
top5 = sorted(sums.items(), key=lambda x: -x[1])[:7]
print("{} <{}>".format(realname, address))
print(" ", ", ".join(x[0] for x in top5))
def print(self):
self.print_post_count()
self.print_tldr()
self.print_tfidf()
def main():
parser = argparse.ArgumentParser(description="Statistics on -vote mailbox")
parser.add_argument("--verbose", "-v", action="store_true", help="verbose output")
parser.add_argument("--debug", action="store_true", help="debug output")
parser.add_argument("mailbox", action="store", help="mailbox to read")
args = parser.parse_args()
log_format = "%(levelname)s %(message)s"
level = logging.WARN
if args.debug:
level = logging.DEBUG
elif args.verbose:
level = logging.INFO
logging.basicConfig(level=level, stream=sys.stderr, format=log_format)
stats = Stats()
stats.read_mbox(args.mailbox)
stats.print()
if __name__ == "__main__":
try:
main()
except Fail as e:
print(e, file=sys.stderr)
sys.exit(1)
except Exception:
log.exception("uncaught exception")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment