活动介绍
file-type

dotenvenc工具:安全地加密.env文件管理敏感信息

ZIP文件

下载需积分: 50 | 23KB | 更新于2025-08-13 | 148 浏览量 | 4 下载量 举报 收藏
download 立即下载
### 知识点详细说明 #### dotenvenc 的概念与作用 dotenvenc 是一个用于加密和解密 `.env` 文件的工具,使得敏感信息如密码和令牌可以在不暴露的情况下,安全地存储在源代码管理中。`.env` 文件是常用来存储应用程序配置信息的文本文件,其中包含了键值对,每个键值对代表一个配置项。 使用 dotenvenc 的好处在于: 1. **安全性提升**:敏感信息不直接暴露在版本控制中,降低了信息泄露的风险。 2. **易用性**:dotenvenc 与 `.env` 文件的使用配合无缝,不影响现有的使用 `.env` 文件的开发习惯。 3. **灵活性**:在需要时可以轻松地从加密的 `.env.enc` 文件中恢复出原始的 `.env` 文件。 #### dotenvenc 的使用场景 dotenvenc 适用于所有需要在代码中存储敏感信息,但又不希望这些信息被公开的场景。具体来说: - **开发环境配置**:开发人员往往需要在本地机器上运行代码,此时需要读取 `.env` 文件中的配置信息。 - **持续集成/持续部署(CI/CD)**:在自动化部署流程中,需要将敏感信息安全地传送到部署环境,此时使用加密文件是一种常见的做法。 - **代码共享与协作**:在团队开发中,可以通过dotenvenc共享敏感信息的配置而不泄露具体内容。 #### dotenvenc 的使用方法 dotenvenc 的基本使用流程包括安装、加密和解密三个步骤: 1. **安装**:通过 `npm`(Node.js 的包管理器)安装 dotenvenc 到项目中,命令为 `npm i dotenvenc`。这一步完成后,dotenvenc 将作为依赖项存放在项目中。 2. **加密**:使用 dotenvenc 的命令行工具来生成加密版本的 `.env` 文件。具体命令如下: ```bash npx dotenvenc encrypt .env .env.enc ``` 这条命令会读取项目根目录下的 `.env` 文件,并生成对应的加密版本 `.env.enc` 文件。 3. **解密**:在运行时,如果需要访问敏感数据,可以通过 dotenvenc 将 `.env.enc` 文件解密回 `.env` 文件。解密命令如下: ```bash npx dotenvenc decrypt .env.enc .env ``` 这条命令会将 `.env.enc` 文件解密,并生成可读的 `.env` 文件。 #### dotenvenc 与其他包的配合使用 dotenvenc 通常与能够处理 `.env` 文件的包配合使用,例如 `dotenv`。`dotenv` 是一个流行的 Node.js 包,它会自动加载 `.env` 文件中定义的环境变量,并将它们添加到 `process.env` 中,使其可以在 Node.js 应用程序中使用。 在使用 dotenvenc 加密 `.env` 文件后,`dotenv` 仍然可以使用解密后的 `.env` 文件来设置环境变量。需要注意的是,只有在运行时需要访问这些敏感信息时,才进行解密操作。 #### dotenvenc 的安全注意事项 - **保护密钥**:dotenvenc 在加密过程中会使用到一个密钥。这个密钥必须安全地管理,不能泄露。如果密钥丢失,则加密文件将无法解密。 - **不要版本化 `.env`**:需要确保 `.env` 文件不会被推送到源代码仓库中。在 Git 中,可以通过 `.gitignore` 文件添加规则 `/.env` 来确保 `.env` 文件不会被版本控制。 - **安全更新**:如果 `.env` 文件中的敏感信息需要更新,应重新进行加密操作,而不是直接修改 `.env.enc` 文件。 #### 结语 dotenvenc 为开发人员提供了一种安全存储敏感信息的方法,极大地简化了在共享代码的同时保护敏感数据的流程。通过简单的命令行操作,即可实现对敏感信息的加密和解密,使其既能满足开发的便利性,又保障了数据的安全性。在使用过程中,开发者应注意保护密钥,避免密钥泄露,并确保敏感信息的 `.env` 文件不被版本化。

相关推荐

filetype

是E:\AI_System\core里面的#E:\AI_System\core\config.py # import os import json import logging import configparser from pathlib import Path from typing import Dict, Any, Optional, Union from dotenv import load_dotenv class CoreConfig: """ 核心配置管理类 - 提供统一的配置管理接口 支持多来源配置加载:环境变量 > 配置文件 > 默认值 """ def __init__(self, config_path: Optional[Union[str, Path]] = None, env_prefix: str = "APP_", default_config: Optional[Dict[str, Any]] = None): """ 初始化配置管理器 :param config_path: 配置文件路径(支持.json, .ini, .env) :param env_prefix: 环境变量前缀 :param default_config: 默认配置字典 """ self.logger = logging.getLogger('CoreConfig') self._setup_logger() self.env_prefix = env_prefix self.config_path = Path(config_path) if config_path else None self.config_data = default_config or {} self.config_modified = False self.log(f"📋 初始化配置管理器 | 环境前缀: {env_prefix}") # 加载配置 self.load_configuration() def _setup_logger(self): """配置日志记录器""" if not self.logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) self.logger.addHandler(handler) self.logger.setLevel(logging.INFO) self.logger.propagate = False def log(self, message: str, level: str = "info"): """记录日志""" log_levels = { "debug": self.logger.debug, "info": self.logger.info, "warning": self.logger.warning, "error": self.logger.error } log_func = log_levels.get(level.lower(), self.logger.info) log_func(message) def load_configuration(self): """加载所有配置源""" # 1. 加载环境变量 self._load_environment_vars() # 2. 加载配置文件(如果存在) if self.config_path and self.config_path.exists(): self._load_config_file() elif self.config_path: self.log(f"⚠️ 配置文件不存在: {self.config_path}", "warning") self.log(f"✅ 配置加载完成 | 条目数: {len(self.config_data)}") def _load_environment_vars(self): """加载环境变量""" self.log("🔍 加载环境变量...") load_dotenv() # 加载.env文件(如果存在) # 获取所有以指定前缀开头的环境变量 for key, value in os.environ.items(): if key.startswith(self.env_prefix): config_key = key[len(self.env_prefix):].lower() self.config_data[config_key] = self._parse_value(value) self.log(f" - 加载环境变量: {config_key} = {self._mask_secret(config_key, value)}") def _load_config_file(self): """根据文件扩展名加载配置文件""" suffix = self.config_path.suffix.lower() self.log(f"📄 加载配置文件: {self.config_path} (类型: {suffix[1:]})") try: if suffix == '.json': self._load_json_config() elif suffix in ('.ini', '.cfg'): self._load_ini_config() elif suffix == '.env': self._load_dotenv_config() else: self.log(f"❌ 不支持的配置文件类型: {suffix}", "error") except Exception as e: self.log(f"❌ 配置文件加载失败: {str(e)}", "error") def _load_json_config(self): """加载JSON配置文件""" with open(self.config_path, 'r', encoding='utf-8') as f: json_data = json.load(f) self._merge_config(json_data) def _load_ini_config(self): """加载INI配置文件""" parser = configparser.ConfigParser() parser.read(self.config_path) config_dict = {} for section in parser.sections(): section_dict = {} for key, value in parser.items(section): section_dict[key] = self._parse_value(value) config_dict[section] = section_dict self._merge_config(config_dict) def _load_dotenv_config(self): """加载.env配置文件""" # 已由load_dotenv()处理,这里只需记录 self.log(" - .env文件已加载") def _merge_config(self, new_config: Dict[str, Any]): """合并新配置到现有配置""" for key, value in new_config.items(): # 处理嵌套配置 if isinstance(value, dict) and key in self.config_data and isinstance(self.config_data[key], dict): self.config_data[key].update(value) else: self.config_data[key] = value self.log(f" - 加载配置项: {key} = {self._mask_secret(key, value)}") def _parse_value(self, value: str) -> Any: """智能解析字符串值为合适的Python类型""" # 尝试解析为布尔值 if value.lower() in ('true', 'yes', 'on'): return True if value.lower() in ('false', 'no', 'off'): return False # 尝试解析为数字 try: return int(value) except ValueError: try: return float(value) except ValueError: pass # 尝试解析为列表 if ',' in value: return [self._parse_value(item.strip()) for item in value.split(',')] # 返回原始字符串 return value def _mask_secret(self, key: str, value: Any) -> str: """对敏感信息进行掩码处理""" if 'secret' in key or 'password' in key or 'key' in key: return '******' if value else '空' if isinstance(value, list): return f"[列表, 长度: {len(value)}]" if isinstance(value, dict): return f"{{字典, 键数: {len(value)}}}" return str(value) def get(self, key: str, default: Any = None) -> Any: """ 获取配置值,支持点分路径 (如: 'database.host') :param key: 配置键名 :param default: 默认值(如果键不存在) :return: 配置值 """ keys = key.split('.') current = self.config_data for k in keys: if isinstance(current, dict) and k in current: current = current[k] else: return default return current def set(self, key: str, value: Any, persist: bool = False): """ 设置配置值 :param key: 配置键名 :param value: 配置值 :param persist: 是否持久化到配置文件 """ keys = key.split('.') current = self.config_data # 遍历创建嵌套字典 for i, k in enumerate(keys[:-1]): if k not in current or not isinstance(current[k], dict): current[k] = {} current = current[k] # 设置最终值 last_key = keys[-1] current[last_key] = value self.config_modified = True self.log(f"⚙️ 设置配置: {key} = {self._mask_secret(last_key, value)}") # 持久化到文件 if persist and self.config_path: self.save_config() def save_config(self, path: Optional[Union[str, Path]] = None): """ 保存配置到文件 :param path: 可选的自定义保存路径 """ save_path = Path(path) if path else self.config_path if not save_path: self.log("❌ 保存失败: 未指定配置文件路径", "error") return False suffix = save_path.suffix.lower() try: if suffix == '.json': with open(save_path, 'w', encoding='utf-8') as f: json.dump(self.config_data, f, indent=2, ensure_ascii=False) elif suffix in ('.ini', '.cfg'): self._save_ini_config(save_path) else: self.log(f"❌ 不支持的文件格式: {suffix}", "error") return False self.log(f"💾 配置已保存到: {save_path}") self.config_modified = False return True except Exception as e: self.log(f"❌ 配置保存失败: {str(e)}", "error") return False def _save_ini_config(self, path: Path): """保存为INI格式配置文件""" parser = configparser.ConfigParser() # 递归处理嵌套字典 def add_section(data, section_name=None): if section_name: parser.add_section(section_name) for key, value in data.items(): if isinstance(value, dict): add_section(value, f"{section_name}.{key}" if section_name else key) else: if not section_name: parser.set('DEFAULT', key, str(value)) else: parser.set(section_name, key, str(value)) add_section(self.config_data) with open(path, 'w', encoding='utf-8') as f: parser.write(f) def reload(self): """重新加载所有配置源""" self.log("🔄 重新加载配置...") self.config_data = {} self.load_configuration() def to_dict(self) -> Dict[str, Any]: """返回当前配置的完整字典""" return self.config_data.copy() def __getitem__(self, key: str) -> Any: """支持字典式访问""" return self.get(key) def __setitem__(self, key: str, value: Any): """支持字典式设置""" self.set(key, value) def __contains__(self, key: str) -> bool: """检查配置项是否存在""" return self.get(key) is not None # 测试代码 if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) print("=" * 50) print("核心配置管理器测试") print("=" * 50) # 创建临时配置文件 config_json = """ { "database": { "host": "localhost", "port": 5432, "username": "admin", "password": "db_secret" }, "logging": { "level": "INFO", "path": "/var/log/app.log" }, "features": ["auth", "caching", "analytics"] } """ with open("test_config.json", "w") as f: f.write(config_json) # 设置环境变量 os.environ["APP_API_KEY"] = "env_secret_key" os.environ["APP_DATABASE_PORT"] = "6432" # 覆盖配置文件中的端口 # 创建配置管理器实例 config = CoreConfig( config_path="test_config.json", env_prefix="APP_", default_config={ "app_name": "MyApp", "version": "1.0.0" } ) # 测试配置获取 print("\n测试配置获取:") print(f"数据库主机: {config.get('database.host')}") print(f"数据库端口: {config.get('database.port')} (应被环境变量覆盖)") print(f"API密钥: {config.get('api_key')} (来自环境变量)") print(f"应用名称: {config.get('app_name')} (来自默认配置)") print(f"日志级别: {config.get('logging.level')}") print(f"功能列表: {config.get('features')}") # 测试配置设置 print("\n测试配置设置:") config.set("new_feature.enabled", True) config.set("database.password", "new_secret", persist=True) print(f"新功能状态: {config.get('new_feature.enabled')}") # 测试配置保存 print("\n测试配置保存:") config.save_config("test_config_saved.json") # 测试配置包含检查 print("\n测试配置包含检查:") print(f"数据库用户名存在: {'database.username' in config}") print(f"不存在的键: {'nonexistent.key' in config}") # 测试配置转字典 print("\n完整配置字典:") print(json.dumps(config.to_dict(), indent=2, ensure_ascii=False)) # 创建全局系统配置实例 system_config = CoreConfig( config_path=Path(__file__).parent.parent / "config" / "system_config.json", env_prefix="AI_SYSTEM_" ) # 测试代码更新 if __name__ == "__main__": # ... [测试代码保持不变] ... print(f"系统配置实例: {system_config}") 吗?还是在E:\AI_System里面重建一个config.py?

filetype

package com.hw.camunda.config.datasource; import com.baomidou.dynamic.datasource.creator.DataSourceProperty; import com.baomidou.dynamic.datasource.spring.boot.autoconfigure.DynamicDataSourceProperties; import com.hw.camunda.utils.Sm4Util; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.AbstractEnvironment; import org.springframework.core.env.Environment; import org.springframework.core.env.MutablePropertySources; import org.springframework.core.env.PropertySource; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.HashSet; @Configuration public class DataSourceDecryptConfig { @Autowired private Environment env; @Bean public DynamicDataSourceProperties dynamicDataSourceProperties() { DynamicDataSourceProperties properties = new DynamicDataSourceProperties(); // 动态获取主数据源名称 String primaryName = env.getProperty("spring.datasource.dynamic.primary"); properties.setPrimary(primaryName); // 动态获取所有数据源名称 Set<String> dataSourceNames = getDataSourceNames(); // 解密所有数据源 Map<String, DataSourceProperty> dataSources = new HashMap<>(); for (String name : dataSourceNames) { dataSources.put(name, decryptDataSource("spring.datasource.dynamic.datasource." + name)); } properties.setDatasource(dataSources); return properties; } // 动态获取所有数据源名称 private Set<String> getDataSourceNames() { Set<String> names = new HashSet<>(); String prefix = "spring.datasource.dynamic.datasource."; if (env instanceof AbstractEnvironment) { MutablePropertySources sources = ((AbstractEnvironment) env).getPropertySources(); for (PropertySource<?> source : sources) { if (source instanceof org.springframework.core.env.MapPropertySource) { for (String name : ((org.springframework.core.env.MapPropertySource) source).getPropertyNames()) { if (name.startsWith(prefix)) { // 提取数据源名称:prefix后的第一个部分 String[] parts = name.substring(prefix.length()).split("\\."); if (parts.length > 0) { names.add(parts[0]); } } } } } } return names; } private DataSourceProperty decryptDataSource(String prefix) { DataSourceProperty dsProp = new DataSourceProperty(); dsProp.setUrl(decrypt(env.getProperty(prefix + ".url"))); dsProp.setUsername(decrypt(env.getProperty(prefix + ".username"))); dsProp.setPassword(decrypt(env.getProperty(prefix + ".password"))); // 处理其他可能存在的属性 String driver = env.getProperty(prefix + ".driver-class-name"); if (driver != null) { dsProp.setDriverClassName(driver); } return dsProp; } private String decrypt(String encryptedText) { if (encryptedText == null) return null; // 调用解密工具 - 实际实现需替换为您的解密逻辑 try { return new String(Sm4Util.decrypt(encryptedText.getBytes(StandardCharsets.UTF_8))); } catch (Exception e) { throw new RuntimeException(e); } } } 帮我修复一下,我是动态的多数据源

filetype

#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 模块提供图片加密解密服务 """ import os import cv2 import numpy as np from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import padding import hashlib class ImageEncryptor: def __init__(self, key): """ 初始化加密器 :param key: 加密密钥(字符串或字节) """ if isinstance(key, str): key = key.encode('utf-8') self.key = hashlib.sha256(key).digest() # 生成256位AES密钥 def encrypt_image(self, img_path, output_path=None): """ 加密单个图像文件 :param img_path: 输入图像路径 :param output_path: 加密后输出路径(默认添加.enc后缀) :return: 加密后文件路径 """ # 读取二进制图片 # img_bytes = np.frombuffer(img_path, dtype=np.uint8).tobytes() # 读取图像并转换为OpenCV格式 img = cv2.imdecode(np.frombuffer(img_path, dtype=np.uint8), cv2.IMREAD_COLOR) if img is None: raise ValueError(f"无法读取图像: {img_path}") # 将图像转换为字节流 _, buffer = cv2.imencode('.png', img) img_bytes = buffer.tobytes() # 生成随机初始化向量 iv = os.urandom(16) # 创建加密器 cipher = Cipher( algorithms.AES(self.key), modes.CBC(iv), backend=default_backend() ) encryptor = cipher.encryptor() # 填充数据 padder = padding.PKCS7(128).padder() padded_data = padder.update(img_bytes) + padder.finalize() # 加密数据 encrypted = encryptor.update(padded_data) + encryptor.finalize() # 确定输出路径 if output_path is None: output_path = img_path + '.enc' # 写入加密文件(包含自定义文件头和IV) with open(output_path, 'wb') as f: f.write(b'ENCIMG1.0') # 自定义文件头标识 f.write(iv) f.write(encrypted) return output_path def decrypt_to_opencv(self, encrypted_path): """ 解密加密图像并返回OpenCV图像对象 :param encrypted_path: 加密图像路径 :return: OpenCV图像对象 (numpy数组) """ with open(encrypted_path, 'rb') as f: header = f.read(8) # 读取文件头 if header != b'ENCIMG1.0': raise ValueError("无效的加密文件格式") iv = f.read(16) encrypted = f.read() # 创建解密器 cipher = Cipher( algorithms.AES(self.key), modes.CBC(iv), backend=default_backend() ) decryptor = cipher.decryptor() # 解密数据 decrypted = decryptor.update(encrypted) + decryptor.finalize() # 移除填充 unpadder = padding.PKCS7(128).unpadder() img_bytes = unpadder.update(decrypted) + unpadder.finalize() # 将字节数据转换为OpenCV图像 nparr = np.frombuffer(img_bytes, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if img is None: raise ValueError("解密后图像解码失败") return img def encrypt_directory(self, src_dir, dst_dir): """ 递归加密整个目录结构 :param src_dir: 源目录路径 :param dst_dir: 目标目录路径 """ for root, dirs, files in os.walk(src_dir): # 创建对应的输出目录结构 rel_path = os.path.relpath(root, src_dir) output_dir = os.path.join(dst_dir, rel_path) os.makedirs(output_dir, exist_ok=True) for file in files: if file.lower().endswith('.png'): src_path = os.path.join(root, file) dst_path = os.path.join(output_dir, file + '.enc') self.encrypt_image(src_path, dst_path) print(f"加密完成: {src_path} -> {dst_path}") # 使用示例 if __name__ == "__main__": # 初始化加密器(实际应用中应从安全位置获取密钥) SECRET_KEY = "大话西游2经典版" # 建议使用环境变量或密钥管理系统 encryptor = ImageEncryptor(SECRET_KEY) input_path = 'E:\\DHP\\server_socket\\server_resources\\image\\界面\\莲叶载书栏.png' # 加密单个图像示例 encrypted_file = encryptor.encrypt_image(input_path) # 解密单个文件 disen_file = encryptor.decrypt_to_opencv(encrypted_file) # 使用OpenCV显示解密后的图像 cv2.imshow('Decrypted Image', disen_file) cv2.waitKey(0) img = cv2.imdecode(np.frombuffer(img_path, dtype=np.uint8), cv2.IMREAD_COLOR)格式不对

filetype

package com.hw.camunda.config.datasource; import com.baomidou.dynamic.datasource.creator.DataSourceProperty; import com.baomidou.dynamic.datasource.spring.boot.autoconfigure.DynamicDataSourceProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; import java.util.HashMap; import java.util.Map; @Configuration public class DataSourceDecryptConfig { @Autowired private Environment env; @Bean public DynamicDataSourceProperties dynamicDataSourceProperties() { DynamicDataSourceProperties properties = new DynamicDataSourceProperties(); // 获取并解密主数据源 DataSourceProperty primary = decryptDataSource("spring.datasource.dynamic.primary"); properties.setPrimary(primary.getName()); // 解密所有数据源 Map<String, DataSourceProperty> dataSources = new HashMap<>(); dataSources.put("master", decryptDataSource("spring.datasource.dynamic.datasource.master")); dataSources.put("slave", decryptDataSource("spring.datasource.dynamic.datasource.slave")); properties.setDatasource(dataSources); return properties; } private DataSourceProperty decryptDataSource(String prefix) { DataSourceProperty dsProp = new DataSourceProperty(); dsProp.setUrl(decrypt(env.getProperty(prefix + ".url"))); dsProp.setUsername(decrypt(env.getProperty(prefix + ".username"))); dsProp.setPassword(decrypt(env.getProperty(prefix + ".password"))); return dsProp; } private String decrypt(String encryptedText) { // 调用你的解密工具 (示例: Jasypt) return EncryptUtil.decrypt(encryptedText); } } 有问题,我也不知道我的yml会有几个数据源,所以能不能帮我改为全部动态的

传奇panda
  • 粉丝: 36
上传资源 快速赚钱