157 lines
5.4 KiB
Python
157 lines
5.4 KiB
Python
"""Test the pyzor.engines.mysql module."""
|
|
|
|
import unittest
|
|
import threading
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
import pyzor.engines
|
|
import pyzor.engines.mysql
|
|
import pyzor.engines.common
|
|
|
|
class MockTimer():
|
|
def __init__(self, *args, **kwargs):
|
|
pass
|
|
def start(self):
|
|
pass
|
|
def setDaemon(self, daemon):
|
|
pass
|
|
|
|
def make_MockMySQL(result, queries):
|
|
class MockCursor():
|
|
def __init__(self):
|
|
pass
|
|
|
|
def fetchone(self):
|
|
return result
|
|
def fetchall(self):
|
|
return [result]
|
|
def execute(self, query, args=None):
|
|
queries.append((query, args))
|
|
def close(self):
|
|
pass
|
|
class MockDB():
|
|
def cursor(self):
|
|
return MockCursor()
|
|
def close(self):
|
|
pass
|
|
def commit(self):
|
|
pass
|
|
def autocommit(self, value):
|
|
pass
|
|
class MockMysql():
|
|
@staticmethod
|
|
def connect(*args, **kwargs):
|
|
return MockDB()
|
|
class Error(Exception):
|
|
pass
|
|
return MockMysql
|
|
|
|
|
|
class MySQLTest(unittest.TestCase):
|
|
"""Test the GdbmDBHandle class"""
|
|
|
|
max_age = 60 * 60 * 24 * 30 * 4
|
|
r_count = 24
|
|
wl_count = 42
|
|
entered = datetime.now() - timedelta(days=10)
|
|
updated = datetime.now() - timedelta(days=2)
|
|
wl_entered = datetime.now() - timedelta(days=20)
|
|
wl_updated = datetime.now() - timedelta(days=3)
|
|
|
|
def setUp(self):
|
|
unittest.TestCase.setUp(self)
|
|
self.real_timer = threading.Timer
|
|
threading.Timer = MockTimer
|
|
|
|
self.record = pyzor.engines.common.Record(self.r_count, self.wl_count,
|
|
self.entered, self.updated,
|
|
self.wl_entered, self.wl_updated)
|
|
|
|
self.response = self.record_unpack()
|
|
self.queries = []
|
|
|
|
mock_MySQL = make_MockMySQL(self.response, self.queries)
|
|
self.real_mysql = pyzor.engines.mysql.MySQLdb
|
|
pyzor.engines.mysql.MySQLdb = mock_MySQL
|
|
|
|
|
|
def tearDown(self):
|
|
unittest.TestCase.tearDown(self)
|
|
threading.Timer = self.real_timer
|
|
pyzor.engines.mysql.MySQLdb = self.real_mysql
|
|
|
|
def record_unpack(self, record=None):
|
|
if not record:
|
|
record = self.record
|
|
return (record.r_count, record.wl_count,
|
|
record.r_entered, record.r_updated,
|
|
record.wl_entered, record.wl_updated)
|
|
|
|
def test_reconnect(self):
|
|
"""Test MySQLDBHandle.__init__"""
|
|
expected = "DELETE FROM testtable WHERE r_updated<%s"
|
|
|
|
pyzor.engines.mysql.MySQLDBHandle("testhost,testuser,testpass,testdb,testtable",
|
|
None, max_age=self.max_age)
|
|
|
|
self.assertEqual(self.queries[0][0], expected)
|
|
|
|
def test_no_reorganize(self):
|
|
pyzor.engines.mysql.MySQLDBHandle("testhost,testuser,testpass,testdb,testtable",
|
|
None, max_age=None)
|
|
self.assertFalse(self.queries)
|
|
|
|
def test_set_item(self):
|
|
"""Test MySQLDBHandle.__setitem__"""
|
|
digest = "2aedaac999d71421c9ee49b9d81f627a7bc570aa"
|
|
expected = ("INSERT INTO testtable (digest, r_count, wl_count, "
|
|
"r_entered, r_updated, wl_entered, wl_updated) "
|
|
"VALUES (%s, %s, %s, %s, %s, %s, %s) ON "
|
|
"DUPLICATE KEY UPDATE r_count=%s, wl_count=%s, "
|
|
"r_entered=%s, r_updated=%s, wl_entered=%s, "
|
|
"wl_updated=%s",
|
|
(digest, self.r_count, self.wl_count, self.entered,
|
|
self.updated, self.wl_entered, self.wl_updated,
|
|
self.r_count, self.wl_count, self.entered,
|
|
self.updated, self.wl_entered, self.wl_updated))
|
|
handle = pyzor.engines.mysql.MySQLDBHandle("testhost,testuser,testpass,testdb,testtable",
|
|
None, max_age=self.max_age)
|
|
|
|
handle[digest] = self.record
|
|
self.assertEqual(self.queries[1], expected)
|
|
|
|
def test_get_item(self):
|
|
"""Test MySQLDBHandle.__getitem__"""
|
|
digest = "2aedaac999d71421c9ee49b9d81f627a7bc570aa"
|
|
expected = ("SELECT r_count, wl_count, r_entered, r_updated, "
|
|
"wl_entered, wl_updated FROM testtable WHERE digest=%s",
|
|
(digest,))
|
|
handle = pyzor.engines.mysql.MySQLDBHandle("testhost,testuser,testpass,testdb,testtable",
|
|
None, max_age=self.max_age)
|
|
|
|
result = handle[digest]
|
|
self.assertEqual(self.queries[1], expected)
|
|
self.assertEqual(self.record_unpack(result), self.record_unpack())
|
|
|
|
def test_del_item(self):
|
|
"""Test MySQLDBHandle.__detitem__"""
|
|
digest = "2aedaac999d71421c9ee49b9d81f627a7bc570aa"
|
|
expected = ("DELETE FROM testtable WHERE digest=%s", (digest,))
|
|
|
|
handle = pyzor.engines.mysql.MySQLDBHandle("testhost,testuser,testpass,testdb,testtable",
|
|
None, max_age=self.max_age)
|
|
del handle[digest]
|
|
self.assertEqual(self.queries[1], expected)
|
|
|
|
|
|
def suite():
|
|
"""Gather all the tests from this module in a test suite."""
|
|
test_suite = unittest.TestSuite()
|
|
test_suite.addTest(unittest.makeSuite(MySQLTest))
|
|
return test_suite
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|
|
|