-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathschema.py
More file actions
139 lines (121 loc) · 4.88 KB
/
schema.py
File metadata and controls
139 lines (121 loc) · 4.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from dataclasses import dataclass, field
import random
from typing import List, Type, Optional
from gemini_python.column_types import Column, ALL_COLUMN_TYPES
from gemini_python.query_driver import QueryDriver
from gemini_python import CqlDto, GeminiConfiguration
from gemini_python.replication_strategy import ReplicationStrategy
@dataclass
class Table:
"""Represents Scylla table"""
name: str
keyspace_name: str
partition_keys: List[Column]
clustering_keys: List[Column] = field(default_factory=list)
columns: List[Column] = field(default_factory=list)
ttl: int = 0
@property
def all_columns(self) -> List[Column]:
return self.partition_keys + self.clustering_keys + self.columns
def as_query(self) -> CqlDto:
partition_key = f"{', '.join([column.name for column in self.partition_keys])}"
partition_key = f"({partition_key})" if len(self.partition_keys) > 1 else partition_key
clustering_key = (
f", {', '.join([column.name for column in self.clustering_keys])}"
if self.clustering_keys
else ""
)
cql_dto = CqlDto(
f"CREATE TABLE IF NOT EXISTS {self.keyspace_name}.{self.name} "
f"({', '.join([str(col) for col in self.all_columns])},"
f" PRIMARY KEY ({partition_key}{clustering_key}));"
)
if self.ttl:
cql_dto.statement = cql_dto.statement.replace(
");", f") WITH default_time_to_live={int(self.ttl)};"
)
return cql_dto
def as_sql(self) -> CqlDto:
return CqlDto(
f"CREATE TABLE IF NOT EXISTS '{self.keyspace_name}.{self.name}' ("
f"id INTEGER PRIMARY KEY AUTOINCREMENT, "
f"d_time INTEGER , "
f"{', '.join([column.name + ' ' + column.sql_type for column in self.partition_keys])}, "
f"{', '.join([column.name + ' ' + column.sql_type for column in self.clustering_keys])});"
)
@dataclass
class Schema:
"""Represents Scylla keyspace with tables"""
name: str
tables: List[Table]
def as_queries(self, replication_strategy: ReplicationStrategy) -> List[CqlDto]:
queries = [
CqlDto(
f"CREATE KEYSPACE IF NOT EXISTS {self.name} "
f"with replication = {replication_strategy};"
)
]
for table in self.tables:
queries.append(table.as_query())
return queries
def as_sql(self) -> List[CqlDto]:
queries = []
for table in self.tables:
queries.append(table.as_sql())
return queries
def create(self, query_driver: QueryDriver, replication_strategy: ReplicationStrategy) -> None:
"""Creates keyspace with tables in database."""
for statement in self.as_queries(replication_strategy):
query_driver.execute(statement)
def drop(self, query_driver: QueryDriver) -> None:
"""Drops whole keyspace"""
query_driver.execute(CqlDto(f"drop keyspace if exists {self.name}"))
def _generate_random_column(
rand: random.Random,
seed: int,
prefix: str,
index: int,
column_types: List[Type[Column]],
max_size: Optional[int] = None,
) -> Column:
params = {"name": f"{prefix}{index}", "seed": seed}
if max_size:
params["max_size"] = max_size
return rand.choice(column_types)(**params) # type: ignore
def generate_schema( # pylint: disable=dangerous-default-value
config: GeminiConfiguration,
pk_types: List[Type[Column]] = ALL_COLUMN_TYPES,
ck_types: List[Type[Column]] = ALL_COLUMN_TYPES,
c_types: List[Type[Column]] = ALL_COLUMN_TYPES,
) -> Schema:
"""Generates schema: Keyspace with tables according to configuration."""
ks_name = "gemini"
rand = random.Random(config.seed)
tables = []
for idx in range(config.max_tables):
num_partition_keys = rand.randint(config.min_partition_keys, config.max_partition_keys)
num_clustering_keys = rand.randint(config.min_clustering_keys, config.max_clustering_keys)
num_columns = rand.randint(config.min_columns, config.max_columns)
partition_keys = [
_generate_random_column(rand, config.seed, "pk", idx, pk_types)
for idx in range(num_partition_keys)
]
clustering_keys = [
_generate_random_column(rand, config.seed, "ck", idx, ck_types)
for idx in range(num_clustering_keys)
]
columns = [
_generate_random_column(rand, config.seed, "col", idx, c_types)
for idx in range(num_columns)
]
tables.append(
Table(
name=f"table{idx}",
keyspace_name=ks_name,
partition_keys=partition_keys,
clustering_keys=clustering_keys,
columns=columns,
ttl=config.ttl,
)
)
return Schema(name=ks_name, tables=tables)