...
 
Commits (4)
......@@ -4,3 +4,4 @@ external*
photos/
debian/
__pycache__/
test.*
#!/usr/bin/python3
"""
Reconnaissance faciale, recadrage et harmonisation des photos du trombinoscope
Jean-Bart
Dépendances logicielles:
python3-opencv, python3-numpy
"""
import cv2, sys, math, os, os.path
import numpy as np
from PIL import Image
from io import BytesIO
import base64
thisdir=os.path.dirname(__file__)
jpgPrefix=b'data:image/jpeg;base64,'
class FaceImage(object):
"""
a class to implement an image with face detection
"""
__CASCADE = cv2.CascadeClassifier(os.path.join(
thisdir,"haarcascade_frontalface_default.xml"))
def __init__(self, indata, size=(150,192)):
"""
the constructor
@param indata either a file name for an image or a bytes with
an URL-encoded image
@param size the size (width, height) of the cropped face to get
"""
self.photo=None # should become a cv2 image
self.size=size
self.cropRect={} # should become ("x":x, "y":y, "w":w, "h":h)
self.cropped=None # should become a cv2 image
self.ok=False # will become True when a face is detected
if type(indata) == str and os.path.exists(indata):
self.photo=cv2.imread(indata)
elif type(indata) == bytes and indata[:len(jpgPrefix)] == jpgPrefix:
photo=BytesIO(base64.b64decode(indata[len(jpgPrefix):]))
photo=np.array(Image.open(photo))
self.photo=cv2.cvtColor(photo, cv2.COLOR_BGR2RGB)
else:
raise Exception("Could not get a photo.")
self.crop()
return
def crop(self):
"""
Tries to find a face in self.photo, then crops it if possible
into self.cropped and puts the status into self.ok
"""
height, width = self.photo.shape[:2]
gray=cv2.cvtColor(self.photo, cv2.COLOR_BGR2GRAY)
faces = self.__CASCADE.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=5,
minSize=(30, 30),
flags = cv2.CASCADE_SCALE_IMAGE
)
if len(faces)==0 or len(faces)>1:
self.ok=False
self.cropped=self.photo
else:
self.ok=True
x, y, w, h = faces[0]
R=self.size[1]/self.size[0]
if h/w < R: # the face rectangle is not high enough
y=int((R-1)*h/2)
h=int(R*w) # so h/w is quite R
# int() casts are necessary since cv2 uses int32 which
# is not JSON serializable to communicate with Javascript
self.cropRect = {"x": int(x), "y": int(y), "w": int(w), "h": int(h)}
# calculate 'a' such as 25a * 32a equals the area 2 * w * h
a=math.sqrt(2*w*h/800)
# upper left coordinates
x1=round(x+w/2-12.5*a); y1=round(y+7/12*h-16*a)
x1=int(max(x1,0)); y1=int(max(y1,0))
# lower right coordinates
x2=round(x+w/2+12.5*a);y2=round(y+7/12*h+16*a)
x2=int(min(x2,width)); y2=int(min(y2,height))
crop_img=self.photo[y1:y2, x1:x2]
# try to normalize Hue, Saturation, Value
hsv=cv2.cvtColor(crop_img,cv2.COLOR_RGB2HSV)
h,s,v = cv2.split(hsv)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(4,4))
h1=h # same hue
s1=s # same saturation
v1 = clahe.apply(v) # normalize the value
hsv1=cv2.merge((h1,s1,v1))
newimg=cv2.cvtColor(hsv1, cv2.COLOR_HSV2RGB)
self.cropped = cv2.resize(newimg, self.size)
return
@property
def toDataUrl(self):
"""
returns a DataUrl unicode string with self.cropped as an image
"""
data=cv2.imencode(".jpg",self.cropped)[1].tostring()
return (jpgPrefix+base64.b64encode(data)).decode("ascii")
def saveAs(self, path):
"""
saves self.cropped into a file
@param path a path to the file system
"""
cv2.imwrite(path, self.cropped)
return
if __name__=="__main__":
fi=FaceImage(sys.argv[1])
fi.saveAs(sys.argv[2])
[global]
server.socket_port = 8901
log.screen = False
[/]
tools.staticdir.root = "/home/georgesk/developpement/photodb/photodb-1.5"
[/static]
tools.staticdir.on = True
tools.staticdir.dir = "static"
#!/usr/bin/python3
import sys, csv, re, datetime
import sqlite3
encodingList=[
"utf-8",
"latin1",
"dos",
"cp437",
]
secondNamePattern=re.compile(r"^nom$", re.I)
firstNamePattern=re.compile(r"^pr[eé]nom$", re.I)
connection=None # connection to a sqlite3 database
def find_encoding(f, encodings=encodingList):
"""
finds the encoding of a text file
"""
for e in encodings:
try:
with open(f,encoding=e) as infile:
s=infile.read()
return e
except:
pass
return None
def getReader(csvfile):
"""
makes a DictReader from the file f, with the given encoding.
This Dictrader must have one field with name matched by secondNamePattern
and another one with name matched by firstNamePattern
@param csvfile an open string stream
@return a Dictreader, and the names of fields for the surname and
the given name
"""
reader=None
for delimiter in (";", ",", "\t",):
csvfile.seek(0)
reader = csv.DictReader(csvfile, delimiter=delimiter)
fields2=[f for f in reader.fieldnames if secondNamePattern.match(f)]
fields1=[f for f in reader.fieldnames if firstNamePattern.match(f)]
if (len(fields2), len(fields1)) == (1, 1):
return reader, fields2[0], fields1[0]
return reader, "", ""
def addToDb(row, field2, field1, verbose=False):
"""
adds surname and given name records to the database.
"""
date=datetime.datetime.utcnow().isoformat(sep=' ', timespec='seconds')
c = connection.cursor()
surname=row[field2]
givenname=row[field1]
c.execute("select * from person where surname=:surname and givenname=:givenname",
{"surname": surname, "givenname": givenname})
if c.fetchone()==None:
## the key surname + givenname does not yet exist !
print("+",end="")
sys.stdout.flush()
c.execute("INSERT INTO person (surname, givenname, date) VALUES (?,?,?)",
(surname, givenname, date))
connection.commit()
return 1
else:
if verbose:
print("-",end="")
sys.stdout.flush()
return 0
if __name__=="__main__":
infile=sys.argv[1]
try:
outfile=sys.argv[2]
except:
outfile="db/names.db"
connection = sqlite3.connect(outfile)
c = connection.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS person
(surname text, givenname text, photo text, date text)''')
connection.commit()
encoding=find_encoding(infile)
written=0
with open(infile, encoding=encoding) as csvfile:
reader, field2, field1 = getReader(csvfile)
for r in reader:
written=written + addToDb(r, field2, field1, verbose=True)
print ("\n{} ==>{}\nwritten {} records".format(infile,outfile,written))
#!/bin/sh
user=$(id -un)
if [ "$user" = "www-data" ]; then
cd /var/lib/photodb
exec python3 webretouche.py
else
sudo su www-data --shell /bin/sh --command "$0"
fi
......@@ -40,7 +40,6 @@ jQuery(document).ready(function () {
$.getJSON("/chercheNom", term, callback);
},
// effacer le prénom pendant qu'on bricole le nom !
// et aussi : remttre en saisie de vidéo
search: function( event, ui ) {$("#prenom").val("")},
change: function( event, ui ) {$("#prenom").val("");},
});
......
{% load static %}
<!DOCTYPE html>
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="content-type" content="text/html;charset=utf-8" />
<title>Importation de données d'élèves</title>
<script src="{% static 'jquery.js' %}" type="text/javascript"></script>
<link rel="stylesheet" href="{% static 'jquery-ui-themes/smoothness/jquery-ui.css' %}"/>
<script src="{% static 'jquery-ui/jquery-ui.js' %}" type="text/javascript"></script>
<script type="text/javascript" src="{% static 'portrait.js' %}"></script>
<link rel="stylesheet" href="{% static 'portrait.css' %}" type="text/css"/>
</head>
<body>
<input type="hidden" id="csrf" value="{{csrf_token}}"/>
<h1>Importation d'élèves</h1>
<dl>
<dt>Fichier</dt>
<dd>{{fname}}</dd>
<dt>Nombre d'ajouts effectués</dt>
<dd>{{written}}</dd>
</dl>
</body>
</html>
<!--
Local Variables:
mode: nxml
End:
-->
......@@ -5,4 +5,7 @@ urlpatterns = [
url(r'^$', views.index, name='index'),
url(r'^encadre$', views.encadre, name='encadre'),
url(r'^envoi$', views.envoi, name='envoi'),
url(r'^chercheNom$', views.chercheNom, name='chercheNom'),
url(r'^cherchePrenom$', views.cherchePrenom, name='cherchePrenom'),
url(r'^importeCSV$', views.importeCSV, name='importeCSV'),
]
from django.shortcuts import render
from django.http import JsonResponse
from django.http import JsonResponse, HttpResponse
from django.utils import timezone
from .models import Person
from .autoretouche import jpgPrefix, FaceImage
from photodb.settings import BASE_DIR
import re, os, base64, uuid
import re, os, base64, uuid, csv, json
def json_response(F):
......@@ -16,7 +16,7 @@ def json_response(F):
"""
def result(request, *args, **kwargs):
data=F(request, *args, **kwargs)
return JsonResponse(data)
return JsonResponse(data, safe=False) # can return lists, not only dicts
return result
def index(request):
......@@ -145,3 +145,133 @@ def envoi(request):
"message": message,
}
@json_response
def chercheNom(request):
"""
search the database in order to make an autocompletion
on the First Name field
"""
term=request.GET.get('term', None)
if term is None:
return []
persons=Person.objects.filter(lastName__icontains=term)
return [p.lastName for p in persons]
@json_response
def cherchePrenom(request):
"""
search the database in order to make an autocompletion
on the Second Name field
"""
nom=request.GET.get('nom', None)
prenom=request.GET.get('prenom', None)
if nom is None or prenom is None:
return []
persons=Person.objects.filter(lastName=nom, firstName__icontains=prenom)
return [p.firstName for p in persons]
def find_encoding(f, encodings):
"""
finds the encoding of a text file
@param f file path
@param encodings a list of encodings to try
@return the first encoding which allowed one to read the file, or None
"""
for e in encodings:
try:
with open(f,encoding=e) as infile:
s=infile.read()
return e
except:
pass
return None
def getReader(csvfile, fields):
"""
makes a DictReader from the file f, with the given encoding.
This Dictrader must have one field with name matched by secondNamePattern
and another one with name matched by firstNamePattern
@param csvfile an open string stream
@param fields a dictionary:
Person's field => (compiled re pattern, found field name)
@return a Dictreader, and the updated dictionary
"""
reader=None
for delimiter in (";", ",", "\t",):
csvfile.seek(0)
reader = csv.DictReader(csvfile, delimiter=delimiter)
nbfound=0
for f in fields:
found=[fn for fn in reader.fieldnames if fields[f][0].match(fn)]
if found:
fields[f][1]=found[0]
nbfound+=1
else:
fields[f][1]=""
if nbfound>=2: # fields were made visible
return reader, fields
return reader, fields
def addToDb(row, fields):
"""
adds a record to the database.
@param row a data row coming from a csv dict reader
@param fields a dictionary:
Person's field => (compiled re pattern, found field name)
@return 0 or 1 depending on the success
"""
if not fields["firstName"][1] or not fields["lastName"][1]:
return 0
persons=Person.objects.filter(
firstName=row[fields["firstName"][1]],
lastName=row[fields["lastName"][1]],
)
if not persons:
p=Person(
firstName=row[fields["firstName"][1]],
lastName=row[fields["lastName"][1]],
)
else:
p=persons[0]
for k in fields: # update other fields if any
if k in ("firstName","lastName"):
continue
if fields[k][1]:
setattr(p,k,row[fields[k][1]])
p.save()
return 1
def importeCSV(request):
"""
import a CSV file
the first line may contain field names like:
"#?nom", "pr[eé]nom", "niveau", "classe"
"""
encodingList=[
"utf-8",
"latin1",
"dos",
"cp437",
]
# dictionary Person's field => [compiled re pattern, found field name]
fields={
"firstName": [re.compile(r"^pr[eé]nom$", re.I), ""],
"lastName": [re.compile(r"^#?nom$", re.I), ""],
"level": [re.compile(r"^niveau$", re.I), ""],
"className": [re.compile(r"^classe$", re.I), ""],
}
infile=os.path.join(BASE_DIR, "test.csv")
encoding=find_encoding(infile, encodingList)
written=0
with open(infile, encoding=encoding) as csvfile:
reader, fields = getReader(csvfile, fields)
for r in reader:
written += addToDb(r, fields)
return render(request, "pose/importCSV.html", {
"fname": infile,
"written": written,
})
#!/bin/sh
# create or update a database for photodb
# check the existence of a previous database
db=/var/lib/photodb/db/names.db
version=1.4
if [ -f $db ] && [ $(echo "PRAGMA integrity_check;"| sqlite3 $db) = "ok" ]; then
foundVersion=$(echo "select version from version" | sqlite3 $db)
echo "The version number of $db is $version"
else
echo "$db must be created"
mkdir -p $(dirname $db)
sqlite3 $db <<EOF
CREATE TABLE person (surname text, givenname text, photo text, date text);
CREATE TABLE version (version text);
INSERT INTO version VALUES ('$version');
EOF
fi
#!/usr/bin/python3
"""
A small web service based on cherrypy and opencv, which allow one
to normalize photos of faces
"""
import os, sys, cherrypy, sqlite3, re, uuid, base64
from io import BytesIO
from datetime import datetime
thisdir=os.path.dirname(__file__)
sys.path.insert(0, thisdir)
staticdir=os.path.join(thisdir,"static")
db=os.path.join(thisdir,'db','names.db')
jpgPrefix=b'data:image/jpeg;base64,'
from autoretouche import jpgPrefix, FaceImage
def protect(s):
"""
prepare a name to be compatible with every file system
@ a string with no spaces, no diacritics, etc.
"""
return re.sub(r'[^A-Za-z0-9_\-]','_',s)
def nommage(nom, prenom):
"""
return a unique file name based on two strings
@param nom surname
@param prenom given name
@return a unique filename
"""
result=protect(nom)+'_'+protect(prenom)
result=result[:20]+'_'+str(uuid.uuid1())+'.jpg'
return result
def staticFile(path):
"""
@return the content of a file in the static/ directory
"""
return open(os.path.join(staticdir, path)).read()
def timestamp():
"""
@return a UTC time stamp
"""
return datetime.utcnow().isoformat(sep=" ", timespec="seconds")
class Retouche(object):
@cherrypy.expose
def index(self):
return staticFile("portrait.html")
@cherrypy.expose
def test(self):
return staticFile("test.html")
@cherrypy.expose
@cherrypy.tools.json_out()
def chercheNom(self, term=None):
"""
search the database in order to make an autocompletion
on the First Name field
"""
if term is None:
return []
result=[]
c = sqlite3.connect(db).cursor()
for row in c.execute("SELECT surname FROM person where surname like '%{}%'".format(term)):
result.append(row[0])
return result
@cherrypy.expose
@cherrypy.tools.json_out()
def cherchePrenom(self, **kw):
"""
search the database in order to make an autocompletion
on the Second Name field
"""
if 'nom' not in kw or 'prenom' not in kw:
return []
result=[]
c = sqlite3.connect(db).cursor()
for row in c.execute("SELECT givenname FROM person WHERE surname = '{nom}' and givenname LIKE '{prenom}%'".format(**kw)):
result.append(row[0])
return result
@cherrypy.expose
@cherrypy.tools.json_out()
def envoi(self, **kw):
"""
callback page to deal with first and second name, plus an image
@param kw dictionary of named parameters, with mandatory keys:
nom, prenom, photo
@return dictionary with those keys: status and message; when
status is not "ko", two other keys are given: fname and base64
"""
keys=('nom','prenom','photo')
# give an error message when one parameter is missing or empty
missingKeys=[k for k in keys if k not in kw]
missingVals=[k for k in kw if not kw[k]]
if missingKeys + missingVals:
return {
"status": "ko",
"message": "appel erroné, paramètres incorrects: {l}".format(l=",".join(missingKeys + missingVals)),
}
fi = FaceImage(kw['photo'].encode("utf-8"))
#### default return components, when no face is detected ####
status="malretouche"
fname=""
base64=fi.toDataUrl
message="""<p>Le système détecte mal le visage à recadrer.</p>
<p>Veuillez refaire la photo.</p>
"""
if fi.ok:
# a face was detected, good!
fname=nommage(kw['nom'],kw['prenom'])
fi.saveAs(os.path.join(thisdir,'photos',fname))
conn=sqlite3.connect(db)
c = conn.cursor()
rows=list(c.execute("""
SELECT photo FROM person
WHERE surname = '{nom}' and givenname = '{prenom}'
""".format(**kw)))
if not rows:
# the user does not exist so far: create a new entry
status="nouveau"
message="""
<p>Nouvel enregistrement créé pour {nom} {prenom}</p>
""".format(**kw)
c.execute("""
INSERT INTO person (surname, givenname, photo, date)
VALUES ('{nom}','{prenom}','{fname}','{date}')
""".format(fname=fname,date=timestamp(),**kw))
conn.commit()
else:
photo=rows[0][0]
# the user already exists, make an update
status="ok"
message="""
<p>Enregistrement de la photo effectué pour {nom} {prenom}</p>
""".format(**kw)
if photo: # erase an earlier photo file
moremessage="<p>L'ancienne photo n'existait pas : erreur ?</p>"
try:
os.unlink(os.path.join(thisdir,'photos',photo))
moremessage="<p>L'ancienne photo a été effacée</p>"
except:
pass
message+=moremessage
# the update is made in either case, even if there was no photo
c.execute("""
UPDATE person SET photo='{fname}', date='{date}'
WHERE surname = '{nom}' and givenname = '{prenom}'
""".format(fname=fname,date=timestamp(),**kw))
conn.commit()
# in any case, return status, fname, base64 message
return {
"statut": "ok",
"fname": fname,
"base64": fi.toDataUrl,
"message": message,
}
@cherrypy.expose
@cherrypy.tools.json_out()
def encadre(self, **kw):
"""
callback page to get the rectangle of a recognized face, if any
@param kw the keywords with keys "photo", "nom", "prenom"
which yields a dataURL encoded JPG image
"""
fi = FaceImage(kw['photo'].encode("utf-8"))
c = sqlite3.connect(db).cursor()
rows=list(c.execute("SELECT photo FROM person where surname = '{nom}' and givenname = '{prenom}'".format(**kw)))
message=""
oldimage=""
if not rows:
message="Inconnu(e) dans la base"
cssclass="red"
else:
message="Trouvé(e) dans la base"
cssclass="green"
photo=rows[0][0]
if photo:
photo=open(os.path.join(thisdir,"photos",photo),'rb').read()
photo=jpgPrefix+base64.b64encode(photo)
oldimage=photo.decode("ascii")
message="Trouvé(e) avec la photo"
cssclass="orange"
return {
"status": fi.ok,
"rect": fi.cropRect,
"message": message,
"oldimage": oldimage,
"cssclass": cssclass,
}
@cherrypy.expose
@cherrypy.tools.json_out()
def retouche(self, data=None):
"""
@param data should be an url-encoded JPG image
(magic bytes: 'data:image/jpeg;base64,')
@return a JSON response, with status => "OK" when a face was recognized
then imgdata => an url-encoded JPG image. If the status is something
else, an anonymous image is loaded in imgdata.
"""
if data is None:
with open("nobody.jpg", "rb") as image_file:
encoded_string = base64.b64encode(image_file.read())
data=jpgPrefix+encoded_string
else:
data=data.encode("utf-8") # to bytes
fi = FaceImage(data)
status = "OK" if fi.ok else "Face auto-detection failed"
return {
"status": status,
"imgdata": fi.toDataUrl,
}
if __name__=="__main__":
print("service dans quelques secondes à http://localhost:8901")
cherrypy.quickstart(Retouche(),'/','cherryApp.conf')