tst_compression.py 4.91 KB
Newer Older
1
from numpy.random.mtrand import uniform
2 3
from netCDF4 import Dataset
from netCDF4.utils import _quantize
4 5 6 7 8 9 10
from numpy.testing import assert_almost_equal
import os, tempfile, unittest

ndim = 100000
ndim2 = 100
chunk1 = 10; chunk2 = ndim2
nfiles = 7
11
files = [tempfile.NamedTemporaryFile(suffix='.nc', delete=False).name for nfile in range(nfiles)]
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
array = uniform(size=(ndim,))
array2 = uniform(size=(ndim,ndim2))
lsd = 3

def write_netcdf(filename,zlib,least_significant_digit,data,dtype='f8',shuffle=False,contiguous=False,\
                 chunksizes=None,complevel=6,fletcher32=False):
    file = Dataset(filename,'w')
    file.createDimension('n', ndim)
    foo = file.createVariable('data',\
            dtype,('n'),zlib=zlib,least_significant_digit=least_significant_digit,\
            shuffle=shuffle,contiguous=contiguous,complevel=complevel,fletcher32=fletcher32,chunksizes=chunksizes)
    foo[:] = data
    file.close()
    file = Dataset(filename)
    data = file.variables['data'][:]
    file.close()

def write_netcdf2(filename,zlib,least_significant_digit,data,dtype='f8',shuffle=False,contiguous=False,\
                 chunksizes=None,complevel=6,fletcher32=False):
    file = Dataset(filename,'w')
    file.createDimension('n', ndim)
    file.createDimension('n2', ndim2)
    foo = file.createVariable('data2',\
            dtype,('n','n2'),zlib=zlib,least_significant_digit=least_significant_digit,\
            shuffle=shuffle,contiguous=contiguous,complevel=complevel,fletcher32=fletcher32,chunksizes=chunksizes)
    foo[:] = data
    file.close()
    file = Dataset(filename)
    data = file.variables['data2'][:]
    file.close()

class CompressionTestCase(unittest.TestCase):

    def setUp(self):
        self.files = files
        # no compression
        write_netcdf(self.files[0],False,None,array)
        # compressed, lossless, no shuffle.
        write_netcdf(self.files[1],True,None,array)
        # compressed, lossless, with shuffle.
        write_netcdf(self.files[2],True,None,array,shuffle=True)
        # compressed, lossy, no shuffle.
        write_netcdf(self.files[3],True,lsd,array)
        # compressed, lossy, with shuffle.
        write_netcdf(self.files[4],True,lsd,array,shuffle=True)
        # compressed, lossy, with shuffle and fletcher32 checksum.
        write_netcdf(self.files[5],True,lsd,array,shuffle=True,fletcher32=True)
        # 2-d compressed, lossy, with shuffle and fletcher32 checksum and
        # chunksizes.
        write_netcdf2(self.files[6],True,lsd,array2,shuffle=True,fletcher32=True,chunksizes=(chunk1,chunk2))

    def tearDown(self):
        # Remove the temporary files
        for file in self.files:
            os.remove(file)

    def runTest(self):
        """testing zlib and shuffle compression filters"""
        uncompressed_size = os.stat(self.files[0]).st_size
        # check compressed data.
        f = Dataset(self.files[1])
        size = os.stat(self.files[1]).st_size
        assert_almost_equal(array,f.variables['data'][:])
        assert f.variables['data'].filters() == {'zlib':True,'shuffle':False,'complevel':6,'fletcher32':False}
        assert(size < 0.95*uncompressed_size)
        f.close()
        # check compression with shuffle
        f = Dataset(self.files[2])
        size = os.stat(self.files[2]).st_size
        assert_almost_equal(array,f.variables['data'][:])
        assert f.variables['data'].filters() == {'zlib':True,'shuffle':True,'complevel':6,'fletcher32':False}
        assert(size < 0.85*uncompressed_size)
        f.close()
        # check lossy compression without shuffle
        f = Dataset(self.files[3])
        size = os.stat(self.files[3]).st_size
        checkarray = _quantize(array,lsd)
        assert_almost_equal(checkarray,f.variables['data'][:])
        assert(size < 0.27*uncompressed_size)
        f.close()
        # check lossy compression with shuffle
        f = Dataset(self.files[4])
        size = os.stat(self.files[4]).st_size
        assert_almost_equal(checkarray,f.variables['data'][:])
        assert(size < 0.20*uncompressed_size)
        size_save = size
        f.close()
        # check lossy compression with shuffle and fletcher32 checksum.
        f = Dataset(self.files[5])
        size = os.stat(self.files[5]).st_size
        assert_almost_equal(checkarray,f.variables['data'][:])
        assert f.variables['data'].filters() == {'zlib':True,'shuffle':True,'complevel':6,'fletcher32':True}
        assert(size < 0.20*uncompressed_size)
        # should be slightly larger than without fletcher32
        assert(size > size_save)
        # check chunksizes
        f.close()
        f = Dataset(self.files[6])
        checkarray2 = _quantize(array2,lsd)
        assert_almost_equal(checkarray2,f.variables['data2'][:])
        assert f.variables['data2'].filters() == {'zlib':True,'shuffle':True,'complevel':6,'fletcher32':True}
        assert f.variables['data2'].chunking() == [chunk1,chunk2]
        f.close()

if __name__ == '__main__':
    unittest.main()