154 lines
4.8 KiB
Python
154 lines
4.8 KiB
Python
"""Test the pyzor.engines.gdbm_ module."""
|
|
|
|
import gdbm
|
|
import unittest
|
|
import threading
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
import pyzor.engines
|
|
import pyzor.engines.gdbm_
|
|
import pyzor.engines.common
|
|
|
|
class MockTimer():
|
|
def __init__(self, *args, **kwargs):
|
|
pass
|
|
def start(self):
|
|
pass
|
|
def setDaemon(self, daemon):
|
|
pass
|
|
|
|
class MockGdbm(dict):
|
|
"""Mock a gdbm database"""
|
|
|
|
def firstkey(self):
|
|
if not self.keys():
|
|
return None
|
|
self.key_index = 1
|
|
return self.keys()[0]
|
|
|
|
def nextkey(self, key):
|
|
if len(self.keys()) <= self.key_index:
|
|
return None
|
|
else:
|
|
self.key_index += 1
|
|
return self.keys()[self.key_index]
|
|
|
|
def sync(self):
|
|
pass
|
|
def reorganize(self):
|
|
pass
|
|
|
|
class GdbmTest(unittest.TestCase):
|
|
"""Test the GdbmDBHandle class"""
|
|
|
|
max_age = 60 * 60 * 24 * 30 * 4
|
|
r_count = 24
|
|
wl_count = 42
|
|
entered = datetime.now() - timedelta(days=10)
|
|
updated = datetime.now() - timedelta(days=2)
|
|
wl_entered = datetime.now() - timedelta(days=20)
|
|
wl_updated = datetime.now() - timedelta(days=3)
|
|
|
|
def setUp(self):
|
|
unittest.TestCase.setUp(self)
|
|
self.real_timer = threading.Timer
|
|
threading.Timer = MockTimer
|
|
|
|
self.db = MockGdbm()
|
|
def mock_open(fn, mode):
|
|
return self.db
|
|
self.real_open = gdbm.open
|
|
gdbm.open = mock_open
|
|
|
|
self.record = pyzor.engines.common.Record(self.r_count, self.wl_count,
|
|
self.entered, self.updated,
|
|
self.wl_entered, self.wl_updated)
|
|
|
|
def tearDown(self):
|
|
unittest.TestCase.tearDown(self)
|
|
threading.Timer = self.real_timer
|
|
gdbm.open = self.real_open
|
|
|
|
def record_as_str(self, record=None):
|
|
if not record:
|
|
record = self.record
|
|
return ("1,%s,%s,%s,%s,%s,%s" % (record.r_count, record.r_entered,
|
|
record.r_updated, record.wl_count,
|
|
record.wl_entered, record.wl_updated)).encode("utf8")
|
|
|
|
def test_set_item(self):
|
|
"""Test GdbmDBHandle.__setitem__"""
|
|
digest = "2aedaac999d71421c9ee49b9d81f627a7bc570aa"
|
|
|
|
handle = pyzor.engines.gdbm_.GdbmDBHandle(None, None,
|
|
max_age=self.max_age)
|
|
handle[digest] = self.record
|
|
|
|
self.assertEqual(self.db[digest], self.record_as_str().decode("utf8"))
|
|
|
|
def test_get_item(self):
|
|
"""Test GdbmDBHandle.__getitem__"""
|
|
digest = "2aedaac999d71421c9ee49b9d81f627a7bc570aa"
|
|
|
|
handle = pyzor.engines.gdbm_.GdbmDBHandle(None, None,
|
|
max_age=self.max_age)
|
|
self.db[digest] = self.record_as_str()
|
|
|
|
result = handle[digest]
|
|
|
|
self.assertEqual(self.record_as_str(result), self.record_as_str())
|
|
|
|
def test_del_item(self):
|
|
"""Test GdbmDBHandle.__delitem__"""
|
|
digest = "2aedaac999d71421c9ee49b9d81f627a7bc570aa"
|
|
handle = pyzor.engines.gdbm_.GdbmDBHandle(None, None,
|
|
max_age=self.max_age)
|
|
self.db[digest] = self.record_as_str()
|
|
|
|
del handle[digest]
|
|
|
|
self.assertFalse(self.db.get(digest))
|
|
|
|
def test_reorganize_older(self):
|
|
"""Test GdbmDBHandle.start_reorganizing with older records"""
|
|
digest = "2aedaac999d71421c9ee49b9d81f627a7bc570aa"
|
|
|
|
self.db[digest] = self.record_as_str()
|
|
handle = pyzor.engines.gdbm_.GdbmDBHandle(None, None,
|
|
max_age=3600 * 24)
|
|
|
|
self.assertFalse(self.db.get(digest))
|
|
|
|
def test_reorganize_older_no_max_age(self):
|
|
"""Test GdbmDBHandle.start_reorganizing with older records, but no
|
|
max_age set.
|
|
"""
|
|
digest = "2aedaac999d71421c9ee49b9d81f627a7bc570aa"
|
|
|
|
self.db[digest] = self.record_as_str()
|
|
handle = pyzor.engines.gdbm_.GdbmDBHandle(None, None,
|
|
max_age=None)
|
|
|
|
self.assertEqual(self.db[digest], self.record_as_str())
|
|
|
|
def test_reorganize_fresh(self):
|
|
"""Test GdbmDBHandle.start_reorganizing with newer records"""
|
|
digest = "2aedaac999d71421c9ee49b9d81f627a7bc570aa"
|
|
|
|
self.db[digest] = self.record_as_str()
|
|
handle = pyzor.engines.gdbm_.GdbmDBHandle(None, None,
|
|
max_age=3600 * 24 * 3)
|
|
|
|
self.assertEqual(self.db[digest], self.record_as_str())
|
|
|
|
|
|
def suite():
|
|
"""Gather all the tests from this module in a test suite."""
|
|
test_suite = unittest.TestSuite()
|
|
test_suite.addTest(unittest.makeSuite(GdbmTest))
|
|
return test_suite
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|