Skip to content

Commit cf49199

Browse files
bfbachmannBruno Bachmann
authored andcommitted
[v1.5.10] s4: Fix 478
1 parent 1f4f216 commit cf49199

File tree

9 files changed

+351
-72
lines changed

9 files changed

+351
-72
lines changed

cluster/addons.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ func (c *Cluster) StoreAddonConfigMap(ctx context.Context, addonYaml string, add
560560
select {
561561
case <-timeout:
562562
return updated, nil
563-
case <-time.After(time.Second * UpdateStateTimeout):
563+
case <-time.After(UpdateStateTimeout):
564564
return updated, fmt.Errorf("[addons] Timeout waiting for kubernetes to be ready")
565565
}
566566
}

cluster/cluster.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,9 @@ const (
8686
AuthnWebhookProvider = "webhook"
8787
StateConfigMapName = "cluster-state"
8888
FullStateConfigMapName = "full-cluster-state"
89-
UpdateStateTimeout = 30
90-
GetStateTimeout = 30
89+
FullStateSecretName = "full-cluster-state"
90+
UpdateStateTimeout = time.Second * 30
91+
GetStateTimeout = time.Second * 30
9192
RewriteWorkers = 5
9293
SyncWorkers = 10
9394
NoneAuthorizationMode = "none"

cluster/state.go

Lines changed: 155 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cluster
33
import (
44
"context"
55
"encoding/json"
6+
"errors"
67
"fmt"
78
"io"
89
"os"
@@ -11,8 +12,6 @@ import (
1112
"strings"
1213
"time"
1314

14-
"k8s.io/client-go/transport"
15-
1615
"github.com/rancher/rke/hosts"
1716
"github.com/rancher/rke/k8s"
1817
"github.com/rancher/rke/log"
@@ -22,13 +21,22 @@ import (
2221
"github.com/sirupsen/logrus"
2322
"gopkg.in/yaml.v2"
2423
v1 "k8s.io/api/core/v1"
24+
apierrors "k8s.io/apimachinery/pkg/api/errors"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/util/wait"
27+
"k8s.io/client-go/kubernetes"
28+
"k8s.io/client-go/transport"
2529
)
2630

2731
const (
2832
stateFileExt = ".rkestate"
2933
certDirExt = "_certs"
3034
)
3135

36+
var (
37+
ErrFullStateIsNil = errors.New("fullState argument cannot be nil")
38+
)
39+
3240
type FullState struct {
3341
DesiredState State `json:"desiredState,omitempty"`
3442
CurrentState State `json:"currentState,omitempty"`
@@ -79,45 +87,152 @@ func (c *Cluster) GetStateFileFromConfigMap(ctx context.Context) (string, error)
7987
}
8088
return stateFile, nil
8189
}
82-
return "", fmt.Errorf("Unable to get ConfigMap with cluster state from any Control Plane host")
90+
return "", fmt.Errorf("[state] Unable to get ConfigMap with cluster state from any Control Plane host")
8391
}
8492

85-
func SaveFullStateToKubernetes(ctx context.Context, kubeCluster *Cluster, fullState *FullState) error {
86-
k8sClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
87-
if err != nil {
88-
return fmt.Errorf("Failed to create Kubernetes Client: %v", err)
89-
}
93+
// SaveFullStateToK8s saves the full cluster state to a k8s secret. If any errors that occur on attempts to update
94+
// the secret will be retired up until some limit.
95+
func SaveFullStateToK8s(ctx context.Context, k8sClient kubernetes.Interface, fullState *FullState) error {
9096
log.Infof(ctx, "[state] Saving full cluster state to Kubernetes")
91-
stateFile, err := json.Marshal(*fullState)
97+
98+
if fullState == nil {
99+
return ErrFullStateIsNil
100+
}
101+
102+
secrets := k8sClient.CoreV1().Secrets(metav1.NamespaceSystem)
103+
configMaps := k8sClient.CoreV1().ConfigMaps(metav1.NamespaceSystem)
104+
stateBytes, err := json.Marshal(fullState)
92105
if err != nil {
93-
return err
106+
return fmt.Errorf("[state] error marshalling full state to JSON: %w", err)
94107
}
95-
timeout := make(chan bool, 1)
96-
go func() {
97-
for {
98-
_, err := k8s.UpdateConfigMap(k8sClient, stateFile, FullStateConfigMapName)
108+
109+
// Back off for 1s between attempts.
110+
backoff := wait.Backoff{
111+
Duration: time.Second,
112+
Steps: int(UpdateStateTimeout.Seconds()),
113+
}
114+
115+
// Try to create or update the secret and delete the old configmap in k8s, if it still exists.
116+
saveState := func(ctx context.Context) (bool, error) {
117+
// Check if the secret already exists.
118+
existingSecret, err := secrets.Get(ctx, FullStateSecretName, metav1.GetOptions{})
119+
if err == nil {
120+
// The secret already exists, update it.
121+
existingSecretCopy := existingSecret.DeepCopy()
122+
existingSecretCopy.Data[FullStateSecretName] = stateBytes
123+
if _, err := secrets.Update(ctx, existingSecretCopy, metav1.UpdateOptions{}); err != nil {
124+
return false, fmt.Errorf("[state] error updating secret: %w", err)
125+
}
126+
} else if apierrors.IsNotFound(err) {
127+
// The secret does not exist, create it.
128+
_, err := secrets.Create(ctx, &v1.Secret{
129+
ObjectMeta: metav1.ObjectMeta{
130+
Name: FullStateSecretName,
131+
Namespace: metav1.NamespaceSystem,
132+
},
133+
Data: map[string][]byte{
134+
FullStateSecretName: stateBytes,
135+
},
136+
}, metav1.CreateOptions{})
99137
if err != nil {
100-
time.Sleep(time.Second * 5)
101-
continue
138+
return false, fmt.Errorf("[state] error creating secret: %w", err)
102139
}
103-
log.Infof(ctx, "[state] Successfully Saved full cluster state to Kubernetes ConfigMap: %s", FullStateConfigMapName)
104-
timeout <- true
105-
break
140+
} else {
141+
return false, fmt.Errorf("[state] error getting secret: %w", err)
106142
}
107-
}()
108-
select {
109-
case <-timeout:
110-
return nil
111-
case <-time.After(time.Second * UpdateStateTimeout):
112-
return fmt.Errorf("[state] Timeout waiting for kubernetes to be ready")
143+
144+
// Delete the old configmap.
145+
err = configMaps.Delete(ctx, FullStateConfigMapName, metav1.DeleteOptions{})
146+
if err != nil && !apierrors.IsNotFound(err) {
147+
return false, fmt.Errorf("[state] error deleting configmap: %w", err)
148+
}
149+
150+
return true, nil
113151
}
152+
153+
// Retry until success or backoff.Steps has been reached ctx is cancelled.
154+
if err = wait.ExponentialBackoffWithContext(ctx, backoff, saveState); err != nil {
155+
return fmt.Errorf("[state] error updating secret: %w", err)
156+
}
157+
158+
return nil
159+
}
160+
161+
// GetFullStateFromK8s fetches the full cluster state from the k8s cluster.
162+
// In earlier versions of RKE, the full cluster state was stored in a configmap, but it has since been moved
163+
// to a secret. This function tries fetching it from the secret first and will fall back on the configmap if the secret
164+
// doesn't exist.
165+
func GetFullStateFromK8s(ctx context.Context, k8sClient kubernetes.Interface) (*FullState, error) {
166+
// Back off for 1s between attempts.
167+
backoff := wait.Backoff{
168+
Duration: time.Second,
169+
Steps: int(GetStateTimeout.Seconds()),
170+
}
171+
172+
// Try to fetch secret or configmap in k8s.
173+
var fullState FullState
174+
getState := func(ctx context.Context) (bool, error) {
175+
fullStateBytes, err := getFullStateBytesFromSecret(ctx, k8sClient, FullStateSecretName)
176+
if err != nil {
177+
if apierrors.IsNotFound(err) {
178+
logrus.Debug("full-state secret not found, falling back to configmap")
179+
180+
fullStateBytes, err = getFullStateBytesFromConfigMap(ctx, k8sClient, FullStateConfigMapName)
181+
if err != nil {
182+
return false, fmt.Errorf("[state] error getting full state from configmap: %w", err)
183+
}
184+
} else {
185+
return false, fmt.Errorf("[state] error getting full state from secret: %w", err)
186+
}
187+
}
188+
189+
if err := json.Unmarshal(fullStateBytes, &fullState); err != nil {
190+
return false, fmt.Errorf("[state] error unmarshalling full state from JSON: %w", err)
191+
}
192+
193+
return true, nil
194+
}
195+
196+
// Retry until success or backoff.Steps has been reached or ctx is cancelled.
197+
err := wait.ExponentialBackoffWithContext(ctx, backoff, getState)
198+
return &fullState, err
199+
}
200+
201+
// getFullStateBytesFromConfigMap fetches the full state from the configmap with the given name in the kube-system namespace.
202+
func getFullStateBytesFromConfigMap(ctx context.Context, k8sClient kubernetes.Interface, name string) ([]byte, error) {
203+
confMap, err := k8sClient.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(ctx, name, metav1.GetOptions{})
204+
if err != nil {
205+
return nil, fmt.Errorf("[state] error getting configmap %s: %w", name, err)
206+
}
207+
208+
data, ok := confMap.Data[name]
209+
if !ok {
210+
return nil, fmt.Errorf("[state] expected configmap %s to have field %s, but none was found", name, name)
211+
}
212+
213+
return []byte(data), nil
214+
}
215+
216+
// getFullStateBytesFromSecret fetches the full state from the secret with the given name in the kube-system namespace.
217+
func getFullStateBytesFromSecret(ctx context.Context, k8sClient kubernetes.Interface, name string) ([]byte, error) {
218+
secret, err := k8sClient.CoreV1().Secrets(metav1.NamespaceSystem).Get(ctx, name, metav1.GetOptions{})
219+
if err != nil {
220+
return nil, fmt.Errorf("[state] error getting secret %s: %w", name, err)
221+
}
222+
223+
data, ok := secret.Data[name]
224+
if !ok {
225+
return nil, fmt.Errorf("[state] expected secret %s to have field %s, but none was found", name, name)
226+
}
227+
228+
return data, nil
114229
}
115230

116231
func GetStateFromKubernetes(ctx context.Context, kubeCluster *Cluster) (*Cluster, error) {
117232
log.Infof(ctx, "[state] Fetching cluster state from Kubernetes")
118233
k8sClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
119234
if err != nil {
120-
return nil, fmt.Errorf("Failed to create Kubernetes Client: %v", err)
235+
return nil, fmt.Errorf("[state] Failed to create Kubernetes Client: %v", err)
121236
}
122237
var cfgMap *v1.ConfigMap
123238
var currentCluster Cluster
@@ -139,26 +254,26 @@ func GetStateFromKubernetes(ctx context.Context, kubeCluster *Cluster) (*Cluster
139254
clusterData := cfgMap.Data[StateConfigMapName]
140255
err := yaml.Unmarshal([]byte(clusterData), &currentCluster)
141256
if err != nil {
142-
return nil, fmt.Errorf("Failed to unmarshal cluster data")
257+
return nil, fmt.Errorf("[state] Failed to unmarshal cluster data")
143258
}
144259
return &currentCluster, nil
145-
case <-time.After(time.Second * GetStateTimeout):
260+
case <-time.After(GetStateTimeout):
146261
log.Infof(ctx, "Timed out waiting for kubernetes cluster to get state")
147-
return nil, fmt.Errorf("Timeout waiting for kubernetes cluster to get state")
262+
return nil, fmt.Errorf("[state] Timeout waiting for kubernetes cluster to get state")
148263
}
149264
}
150265

151266
func GetK8sVersion(localConfigPath string, k8sWrapTransport transport.WrapperFunc) (string, error) {
152267
logrus.Debugf("[version] Using %s to connect to Kubernetes cluster..", localConfigPath)
153268
k8sClient, err := k8s.NewClient(localConfigPath, k8sWrapTransport)
154269
if err != nil {
155-
return "", fmt.Errorf("Failed to create Kubernetes Client: %v", err)
270+
return "", fmt.Errorf("[state] Failed to create Kubernetes Client: %v", err)
156271
}
157272
discoveryClient := k8sClient.DiscoveryClient
158273
logrus.Debugf("[version] Getting Kubernetes server version..")
159274
serverVersion, err := discoveryClient.ServerVersion()
160275
if err != nil {
161-
return "", fmt.Errorf("Failed to get Kubernetes server version: %v", err)
276+
return "", fmt.Errorf("[state] Failed to get Kubernetes server version: %v", err)
162277
}
163278
return fmt.Sprintf("%#v", *serverVersion), nil
164279
}
@@ -174,11 +289,11 @@ func RebuildState(ctx context.Context, kubeCluster *Cluster, oldState *FullState
174289
if flags.CustomCerts {
175290
certBundle, err := pki.ReadCertsAndKeysFromDir(flags.CertificateDir)
176291
if err != nil {
177-
return nil, fmt.Errorf("Failed to read certificates from dir [%s]: %v", flags.CertificateDir, err)
292+
return nil, fmt.Errorf("[state] Failed to read certificates from dir [%s]: %v", flags.CertificateDir, err)
178293
}
179294
// make sure all custom certs are included
180295
if err := pki.ValidateBundleContent(rkeConfig, certBundle, flags.ClusterFilePath, flags.ConfigDir); err != nil {
181-
return nil, fmt.Errorf("Failed to validates certificates from dir [%s]: %v", flags.CertificateDir, err)
296+
return nil, fmt.Errorf("[state] Failed to validates certificates from dir [%s]: %v", flags.CertificateDir, err)
182297
}
183298
newState.DesiredState.CertificatesBundle = certBundle
184299
newState.CurrentState = oldState.CurrentState
@@ -207,11 +322,11 @@ func RebuildState(ctx context.Context, kubeCluster *Cluster, oldState *FullState
207322
func (s *FullState) WriteStateFile(ctx context.Context, statePath string) error {
208323
stateFile, err := json.MarshalIndent(s, "", " ")
209324
if err != nil {
210-
return fmt.Errorf("Failed to Marshal state object: %v", err)
325+
return fmt.Errorf("[state] Failed to Marshal state object: %v", err)
211326
}
212327
logrus.Tracef("Writing state file: %s", stateFile)
213328
if err := os.WriteFile(statePath, stateFile, 0600); err != nil {
214-
return fmt.Errorf("Failed to write state file: %v", err)
329+
return fmt.Errorf("[state] Failed to write state file: %v", err)
215330
}
216331
log.Infof(ctx, "Successfully Deployed state file at [%s]", statePath)
217332
return nil
@@ -264,19 +379,19 @@ func ReadStateFile(ctx context.Context, statePath string) (*FullState, error) {
264379
rkeFullState := &FullState{}
265380
fp, err := filepath.Abs(statePath)
266381
if err != nil {
267-
return rkeFullState, fmt.Errorf("failed to lookup current directory name: %v", err)
382+
return rkeFullState, fmt.Errorf("[state] failed to lookup current directory name: %v", err)
268383
}
269384
file, err := os.Open(fp)
270385
if err != nil {
271-
return rkeFullState, fmt.Errorf("Can not find RKE state file: %v", err)
386+
return rkeFullState, fmt.Errorf("[state] Can not find RKE state file: %v", err)
272387
}
273388
defer file.Close()
274389
buf, err := io.ReadAll(file)
275390
if err != nil {
276-
return rkeFullState, fmt.Errorf("failed to read state file: %v", err)
391+
return rkeFullState, fmt.Errorf("[state] failed to read state file: %v", err)
277392
}
278393
if err := json.Unmarshal(buf, rkeFullState); err != nil {
279-
return rkeFullState, fmt.Errorf("failed to unmarshal the state file: %v", err)
394+
return rkeFullState, fmt.Errorf("[state] failed to unmarshal the state file: %v", err)
280395
}
281396
rkeFullState.DesiredState.CertificatesBundle = pki.TransformPEMToObject(rkeFullState.DesiredState.CertificatesBundle)
282397
rkeFullState.CurrentState.CertificatesBundle = pki.TransformPEMToObject(rkeFullState.CurrentState.CertificatesBundle)
@@ -322,7 +437,7 @@ func buildFreshState(ctx context.Context, kubeCluster *Cluster, newState *FullSt
322437
// Get the certificate Bundle
323438
certBundle, err := pki.GenerateRKECerts(ctx, *rkeConfig, "", "")
324439
if err != nil {
325-
return fmt.Errorf("Failed to generate certificate bundle: %v", err)
440+
return fmt.Errorf("[state] Failed to generate certificate bundle: %v", err)
326441
}
327442
newState.DesiredState.CertificatesBundle = certBundle
328443
if isEncryptionEnabled(rkeConfig) {

0 commit comments

Comments
 (0)