|
| 1 | +from django.test import TestCase |
| 2 | +from django.db.models.query import CollectedObjects |
| 3 | +from django.db.models.query_utils import CyclicDependency |
| 4 | +from django.db.models.loading import cache |
| 5 | +import django.db.models.sql |
| 6 | + |
| 7 | +from models import A, B, C, D, E, F |
| 8 | + |
| 9 | +test_last_cleared_field = '' |
| 10 | + |
| 11 | +def clear_rel_obj_caches(models): |
| 12 | + for m in models: |
| 13 | + if hasattr(m._meta, '_related_objects_cache'): |
| 14 | + del m._meta._related_objects_cache |
| 15 | + |
| 16 | +class LoggingUpdateQuery(django.db.models.sql.UpdateQuery): |
| 17 | + def clear_related(self, related_field, pk_list, using): |
| 18 | + global test_last_cleared_field |
| 19 | + test_last_cleared_field = related_field.name |
| 20 | + return super(LoggingUpdateQuery, self).clear_related(related_field, pk_list, using) |
| 21 | + |
| 22 | +class DeleteTestCase(TestCase): |
| 23 | + ### Tests for models A,B,C,D ### |
| 24 | + def test_collected_objects_data_structure(self): |
| 25 | + ## Test the CollectedObjects data structure directly |
| 26 | + |
| 27 | + g = CollectedObjects() |
| 28 | + self.assertFalse(g.add("key1", 1, "item1", None)) |
| 29 | + self.assertEqual(g["key1"], {1: 'item1'}) |
| 30 | + self.assertFalse(g.add("key2", 1, "item1", "key1")) |
| 31 | + self.assertFalse(g.add("key2", 2, "item2", "key1")) |
| 32 | + self.assertEqual(g["key2"], {1: 'item1', 2: 'item2'}) |
| 33 | + self.assertFalse(g.add("key3", 1, "item1", "key1")) |
| 34 | + self.assertTrue(g.add("key3", 1, "item1", "key2")) |
| 35 | + self.assertEqual(g.ordered_keys(), ['key3', 'key2', 'key1']) |
| 36 | + self.assertTrue(g.add("key2", 1, "item1", "key3")) |
| 37 | + self.assertRaises(CyclicDependency, |
| 38 | + g.ordered_keys) |
| 39 | + |
| 40 | + def test_collected_objects_by_model_delete(self): |
| 41 | + ## Test the usage of CollectedObjects by Model.delete() |
| 42 | + |
| 43 | + # Due to the way that transactions work in the test harness, |
| 44 | + # doing m.delete() here can work but fail in a real situation, |
| 45 | + # since it may delete all objects, but not in the right order. |
| 46 | + # So we manually check that the order of deletion is correct. |
| 47 | + |
| 48 | + # Also, it is possible that the order is correct 'accidentally', due |
| 49 | + # solely to order of imports etc. To check this, we set the order |
| 50 | + # that 'get_models()' will retrieve to a known 'nice' order, and |
| 51 | + # then try again with a known 'tricky' order. Slightly naughty |
| 52 | + # access to internals here :-) |
| 53 | + |
| 54 | + # If implementation changes, then the tests may need to be simplified: |
| 55 | + # - remove the lines that set the .keyOrder and clear the related |
| 56 | + # object caches |
| 57 | + # - remove the second set of tests (with a2, b2 etc) |
| 58 | + |
| 59 | + # Nice order |
| 60 | + cache.app_models['delete'].keyOrder = ['a', 'b', 'c', 'd'] |
| 61 | + clear_rel_obj_caches([A, B, C, D]) |
| 62 | + |
| 63 | + a1 = A() |
| 64 | + a1.save() |
| 65 | + b1 = B(a=a1) |
| 66 | + b1.save() |
| 67 | + c1 = C(b=b1) |
| 68 | + c1.save() |
| 69 | + d1 = D(c=c1, a=a1) |
| 70 | + d1.save() |
| 71 | + |
| 72 | + o = CollectedObjects() |
| 73 | + a1._collect_sub_objects(o) |
| 74 | + self.assertQuerysetEqual(o.keys(), |
| 75 | + ["<class 'modeltests.delete.models.D'>", |
| 76 | + "<class 'modeltests.delete.models.C'>", |
| 77 | + "<class 'modeltests.delete.models.B'>", |
| 78 | + "<class 'modeltests.delete.models.A'>"]) |
| 79 | + a1.delete() |
| 80 | + |
| 81 | + # Same again with a known bad order |
| 82 | + cache.app_models['delete'].keyOrder = ['d', 'c', 'b', 'a'] |
| 83 | + clear_rel_obj_caches([A, B, C, D]) |
| 84 | + |
| 85 | + a2 = A() |
| 86 | + a2.save() |
| 87 | + b2 = B(a=a2) |
| 88 | + b2.save() |
| 89 | + c2 = C(b=b2) |
| 90 | + c2.save() |
| 91 | + d2 = D(c=c2, a=a2) |
| 92 | + d2.save() |
| 93 | + |
| 94 | + o = CollectedObjects() |
| 95 | + a2._collect_sub_objects(o) |
| 96 | + self.assertQuerysetEqual(o.keys(), |
| 97 | + ["<class 'modeltests.delete.models.D'>", |
| 98 | + "<class 'modeltests.delete.models.C'>", |
| 99 | + "<class 'modeltests.delete.models.B'>", |
| 100 | + "<class 'modeltests.delete.models.A'>"]) |
| 101 | + a2.delete() |
| 102 | + |
| 103 | + ### Tests for models E,F - nullable related fields ### |
| 104 | + def test_nullable_related_fields_collected_objects(self): |
| 105 | + |
| 106 | + ## First, test the CollectedObjects data structure directly |
| 107 | + g = CollectedObjects() |
| 108 | + self.assertFalse(g.add("key1", 1, "item1", None)) |
| 109 | + self.assertFalse(g.add("key2", 1, "item1", "key1", nullable=True)) |
| 110 | + self.assertTrue(g.add("key1", 1, "item1", "key2")) |
| 111 | + self.assertEqual(g.ordered_keys(), ['key1', 'key2']) |
| 112 | + |
| 113 | + def test_nullable_related_fields_collected_objects_model_delete(self): |
| 114 | + ## Second, test the usage of CollectedObjects by Model.delete() |
| 115 | + |
| 116 | + e1 = E() |
| 117 | + e1.save() |
| 118 | + f1 = F(e=e1) |
| 119 | + f1.save() |
| 120 | + e1.f = f1 |
| 121 | + e1.save() |
| 122 | + |
| 123 | + # Since E.f is nullable, we should delete F first (after nulling out |
| 124 | + # the E.f field), then E. |
| 125 | + |
| 126 | + o = CollectedObjects() |
| 127 | + e1._collect_sub_objects(o) |
| 128 | + self.assertQuerysetEqual(o.keys(), |
| 129 | + ["<class 'modeltests.delete.models.F'>", |
| 130 | + "<class 'modeltests.delete.models.E'>"]) |
| 131 | + |
| 132 | + # temporarily replace the UpdateQuery class to verify that E.f |
| 133 | + # is actually nulled out first |
| 134 | + |
| 135 | + original_class = django.db.models.sql.UpdateQuery |
| 136 | + django.db.models.sql.UpdateQuery = LoggingUpdateQuery |
| 137 | + |
| 138 | + # this is ugly, but it works |
| 139 | + global test_last_cleared_field |
| 140 | + test_last_cleared_field = '' |
| 141 | + e1.delete() |
| 142 | + self.assertEqual(test_last_cleared_field, 'f') |
| 143 | + |
| 144 | + |
| 145 | + e2 = E() |
| 146 | + e2.save() |
| 147 | + f2 = F(e=e2) |
| 148 | + f2.save() |
| 149 | + e2.f = f2 |
| 150 | + e2.save() |
| 151 | + |
| 152 | + # Same deal as before, though we are starting from the other object. |
| 153 | + |
| 154 | + o = CollectedObjects() |
| 155 | + f2._collect_sub_objects(o) |
| 156 | + o.keys() |
| 157 | + ["<class 'modeltests.delete.models.F'>", "<class 'modeltests.delete.models.E'>"] |
| 158 | + |
| 159 | + test_last_cleared_field = '' |
| 160 | + f2.delete() |
| 161 | + self.assertEqual(test_last_cleared_field, 'f') |
| 162 | + |
| 163 | + # Put this back to normal |
| 164 | + django.db.models.sql.UpdateQuery = original_class |
| 165 | + |
| 166 | + # Restore the app cache to previous condition so that all |
| 167 | + # models are accounted for. |
| 168 | + cache.app_models['delete'].keyOrder = ['a', 'b', 'c', 'd', 'e', 'f'] |
| 169 | + clear_rel_obj_caches([A, B, C, D, E, F]) |
0 commit comments