Skip to content

Commit 15e7f77

Browse files
committed
Add basic statistcs command.
1 parent bbd6810 commit 15e7f77

File tree

6 files changed

+208
-7
lines changed

6 files changed

+208
-7
lines changed

partitionmanager/cli.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
toSqlUrl,
2323
TableInformationException,
2424
)
25+
from partitionmanager.stats import get_statistics
2526
from partitionmanager.sql import SubprocessDatabaseCommand, IntegratedDatabaseCommand
2627

2728
parser = argparse.ArgumentParser(
@@ -63,11 +64,12 @@ def from_argparse(self, args):
6364
self.dbcmd = IntegratedDatabaseCommand(args.dburl)
6465
else:
6566
self.dbcmd = SubprocessDatabaseCommand(args.mariadb)
66-
if args.days:
67+
if "days" in args and args.days:
6768
self.partition_duration = timedelta(days=args.days)
6869
if self.partition_duration <= timedelta():
6970
raise ValueError("Negative lifespan is not allowed")
70-
self.noop = args.noop
71+
if "noop" in args:
72+
self.noop = args.noop
7173

7274
def from_yaml_file(self, file):
7375
data = yaml.safe_load(file)
@@ -111,7 +113,7 @@ def partition_cmd(args):
111113
assert_table_is_compatible(conf.dbcmd, table)
112114
except TableInformationException as tie:
113115
logging.error(f"Cannot proceed: {tie}")
114-
return {}
116+
return dict()
115117

116118
all_results = dict()
117119
for table in conf.tables:
@@ -174,6 +176,38 @@ def partition_cmd(args):
174176
partition_parser.set_defaults(func=partition_cmd)
175177

176178

179+
def stats_cmd(args):
180+
conf = Config()
181+
conf.from_argparse(args)
182+
if args.config:
183+
conf.from_yaml_file(args.config)
184+
185+
# Preflight
186+
try:
187+
for table in conf.tables:
188+
assert_table_is_compatible(conf.dbcmd, table)
189+
except TableInformationException as tie:
190+
logging.error(f"Cannot proceed: {tie}")
191+
return dict()
192+
193+
all_results = dict()
194+
for table in conf.tables:
195+
map_data = get_partition_map(conf.dbcmd, table)
196+
statistics = get_statistics(map_data["partitions"], conf.curtime, table)
197+
all_results[table.name] = statistics
198+
199+
return all_results
200+
201+
202+
stats_parser = subparsers.add_parser("stats", help="get stats for partitions")
203+
stats_group = stats_parser.add_mutually_exclusive_group()
204+
stats_group.add_argument(
205+
"--config", "-c", type=argparse.FileType("r"), help="Configuration YAML"
206+
)
207+
stats_group.add_argument("--table", "-t", type=SqlInput, nargs="+", help="table names")
208+
stats_parser.set_defaults(func=stats_cmd)
209+
210+
177211
def main():
178212
"""
179213
Start here.

partitionmanager/cli_test.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import unittest
33
from datetime import datetime, timezone
44
from pathlib import Path
5-
from .cli import parser, partition_cmd
5+
from .cli import parser, partition_cmd, stats_cmd
66

77
fake_exec = Path(__file__).absolute().parent.parent / "test_tools/fake_mariadb.sh"
88
nonexistant_exec = fake_exec.parent / "not_real"
@@ -174,3 +174,15 @@ def test_partition_duration_seven_days(self):
174174
"""
175175
)
176176
self.assertSequenceEqual(list(o), ["partitioned_last_week"])
177+
178+
179+
class TestStatsCmd(unittest.TestCase):
180+
def test_stats(self):
181+
args = parser.parse_args(
182+
["--mariadb", str(fake_exec), "stats", "--table", "partitioned_yesterday"]
183+
)
184+
r = stats_cmd(args)
185+
self.assertEqual(r["partitioned_yesterday"]["partitions"], 2)
186+
self.assertLess(r["partitioned_yesterday"]["time_since_last_partition"].days, 2)
187+
self.assertNotIn("mean_partition_delta", r["partitioned_yesterday"])
188+
self.assertNotIn("max_partition_delta", r["partitioned_yesterday"])

partitionmanager/stats.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import logging
2+
3+
from datetime import timedelta
4+
from itertools import tee
5+
from .types import MaxValuePartition, Partition, UnexpectedPartitionException
6+
7+
8+
def pairwise(iterable):
9+
"""
10+
iterable -> (s0,s1), (s1,s2), (s2, s3), ...
11+
"""
12+
a, b = tee(iterable)
13+
next(b, None)
14+
return zip(a, b)
15+
16+
17+
def get_statistics(partitions, current_timestamp, table):
18+
results = {"partitions": len(partitions)}
19+
20+
if not partitions:
21+
return results
22+
23+
head_part = partitions[0]
24+
tail_part = partitions[-1]
25+
26+
for p in partitions:
27+
if not isinstance(p, Partition):
28+
logging.warning(
29+
f"{table} get_statistics called with a partition list "
30+
+ f"that included a non-Partition entry: {p}"
31+
)
32+
raise UnexpectedPartitionException(p)
33+
34+
if not isinstance(tail_part, MaxValuePartition):
35+
logging.warning(
36+
f"{table} get_statistics called with a partition list tail "
37+
+ f"that wasn't a MaxValuePartition: {p}"
38+
)
39+
raise UnexpectedPartitionException(tail_part)
40+
41+
if tail_part.timestamp():
42+
results["time_since_last_partition"] = current_timestamp - tail_part.timestamp()
43+
44+
if head_part == tail_part:
45+
return results
46+
47+
if head_part.timestamp() and tail_part.timestamp():
48+
results["mean_partition_delta"] = (
49+
tail_part.timestamp() - head_part.timestamp()
50+
) / (len(partitions) - 1)
51+
52+
max_d = timedelta()
53+
for a, b in pairwise(partitions):
54+
if not a.timestamp() or not b.timestamp():
55+
logging.debug(f"{table} had partitions that aren't comparable: {a} and {b}")
56+
continue
57+
d = b.timestamp() - a.timestamp()
58+
if d > max_d:
59+
max_d = d
60+
61+
if max_d > timedelta():
62+
results["max_partition_delta"] = max_d
63+
64+
return results

partitionmanager/stats_test.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import unittest
2+
from datetime import datetime, timedelta, timezone
3+
from .stats import get_statistics
4+
from .types import Table, MaxValuePartition, PositionPartition
5+
6+
7+
def mkPPart(name, *pos):
8+
p = PositionPartition(name)
9+
for x in pos:
10+
p.add_position(x)
11+
return p
12+
13+
14+
ts = datetime(1949, 1, 12, tzinfo=timezone.utc)
15+
16+
17+
class TestStatistics(unittest.TestCase):
18+
def test_statistics_no_partitions(self):
19+
s = get_statistics(list(), ts, Table("no_parts"))
20+
self.assertEqual(s, {"partitions": 0})
21+
22+
def test_statistics_single_unnamed_partition(self):
23+
s = get_statistics([MaxValuePartition("p_start", 1)], ts, Table("single_part"))
24+
self.assertEqual(s, {"partitions": 1})
25+
26+
def test_statistics_single_partition(self):
27+
s = get_statistics(
28+
[MaxValuePartition("p_19480113", 1)], ts, Table("single_part")
29+
)
30+
self.assertEqual(
31+
s, {"partitions": 1, "time_since_last_partition": timedelta(days=365)}
32+
)
33+
34+
def test_statistics_two_partitions(self):
35+
s = get_statistics(
36+
[mkPPart("p_19480101", 42), MaxValuePartition("p_19490101", 1)],
37+
ts,
38+
Table("two_parts"),
39+
)
40+
self.assertEqual(
41+
s,
42+
{
43+
"partitions": 2,
44+
"time_since_last_partition": timedelta(days=11),
45+
"mean_partition_delta": timedelta(days=366),
46+
"max_partition_delta": timedelta(days=366),
47+
},
48+
)
49+
50+
def test_statistics_weekly_partitions_year(self):
51+
parts = list()
52+
base = datetime(2020, 5, 20, tzinfo=timezone.utc)
53+
for w in range(0, 52):
54+
partName = f"p_{base + timedelta(weeks=w):%Y%m%d}"
55+
parts.append(mkPPart(partName, w * 1024))
56+
parts.append(MaxValuePartition(f"p_{base + timedelta(weeks=52):%Y%m%d}", 1))
57+
58+
s = get_statistics(
59+
parts, base + timedelta(weeks=54), Table("weekly_partitions_year_retention")
60+
)
61+
self.assertEqual(
62+
s,
63+
{
64+
"partitions": 53,
65+
"time_since_last_partition": timedelta(days=14),
66+
"mean_partition_delta": timedelta(days=7),
67+
"max_partition_delta": timedelta(days=7),
68+
},
69+
)

partitionmanager/types.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import abc
22
import argparse
33
import re
4-
from datetime import timedelta
4+
from datetime import datetime, timedelta, timezone
55
from urllib.parse import urlparse
66

77

@@ -86,6 +86,17 @@ def num_columns(self):
8686
Return the number of columns this partition represents
8787
"""
8888

89+
def timestamp(self):
90+
"""
91+
Returns a datetime object representing this partition's
92+
date, if the partition is of the form "p_YYYYMMDD", otherwise
93+
returns None
94+
"""
95+
try:
96+
return datetime.strptime(self.name, "p_%Y%m%d").replace(tzinfo=timezone.utc)
97+
except ValueError:
98+
return None
99+
89100
def __repr__(self):
90101
return f"{type(self).__name__}<{str(self)}>"
91102

partitionmanager/types_test.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import argparse
22
import unittest
3-
from datetime import timedelta
4-
from .types import Table, toSqlUrl, SqlInput, retention_from_dict
3+
from datetime import datetime, timedelta, timezone
4+
from .types import PositionPartition, retention_from_dict, SqlInput, Table, toSqlUrl
55

66

77
class TestTypes(unittest.TestCase):
@@ -72,3 +72,14 @@ def test_table(self):
7272

7373
r = retention_from_dict({"days": 30})
7474
self.assertEqual(timedelta(days=30), r)
75+
76+
77+
class TestPartition(unittest.TestCase):
78+
def test_partition_timestamps(self):
79+
self.assertIsNone(PositionPartition("").timestamp())
80+
self.assertIsNone(PositionPartition("not_a_date").timestamp())
81+
self.assertIsNone(PositionPartition("p_202012310130").timestamp())
82+
self.assertEqual(
83+
PositionPartition("p_20201231").timestamp(),
84+
datetime(2020, 12, 31, tzinfo=timezone.utc),
85+
)

0 commit comments

Comments
 (0)