Oracle如何判断blob列的值是否为空
时间: 2023-08-14 12:03:13 浏览: 173
Oracle可以使用函数`DBMS_LOB.ISNULL`来判断BLOB列是否为空。该函数返回一个布尔值,如果指定的BLOB列为空,则返回TRUE,否则返回FALSE。以下是一个使用DBMS_LOB.ISNULL函数的示例:
```
SELECT CASE WHEN DBMS_LOB.ISNULL(blob_column) = TRUE THEN 'Blob column is NULL' ELSE 'Blob column is not NULL' END AS is_null
FROM your_table;
```
在上面的示例中,替换“blob_column”和“your_table”为实际的BLOB列名称和表名称。
相关问题
读取oracle中clob列
<think>我们正在处理从Oracle数据库中读取CLOB列数据的问题。CLOB(Character Large Object)用于存储大量文本数据(例如长篇文章、XML文档等)。在Python中,我们可以使用`cx_Oracle`(现在更名为`oracledb`)库来连接Oracle数据库并读取CLOB数据。
### 步骤1:安装必要的库
首先,确保安装了`oracledb`库:
```bash
pip install oracledb
```
### 步骤2:连接Oracle数据库
使用正确的连接信息建立数据库连接。
### 步骤3:执行查询并读取CLOB数据
当查询结果中包含CLOB列时,我们可以通过游标获取该列的值。在`oracledb`中,CLOB列返回的是`oracledb.LOB`对象(具体是`CLOB`类型)。为了获取其内容,我们需要读取该对象。
### 示例代码
以下是一个完整的示例,展示如何读取包含CLOB列的数据:
```python
import oracledb
# 配置数据库连接
username = "your_username"
password = "your_password"
dsn = "your_host:your_port/your_service_name" # 例如:localhost:1521/orclpdb1
# 建立连接
connection = oracledb.connect(user=username, password=password, dsn=dsn)
try:
# 创建游标
with connection.cursor() as cursor:
# 执行SQL查询(假设表名为my_table,CLOB列名为clob_col)
sql = "SELECT id, clob_col FROM my_table WHERE id = :id_val"
cursor.execute(sql, id_val=1) # 假设我们查询id=1的记录
# 获取结果
row = cursor.fetchone()
if row:
# 读取CLOB数据
clob_data = row[1] # 假设clob_col是第二列
# 注意:如果CLOB为空,则clob_data为None
if clob_data is not None:
# 读取CLOB内容(如果数据量很大,可能需要分块读取,但oracledb会自动处理)
# 使用read()方法读取整个CLOB内容
content = clob_data.read()
print(f"ID: {row[0]}, CLOB content: {content}")
else:
print(f"ID: {row[0]}, CLOB is NULL")
else:
print("No data found")
finally:
# 关闭连接
connection.close()
```
### 注意事项
1. **资源管理**:使用`try-finally`确保连接被关闭,避免资源泄露。
2. **大CLOB处理**:如果CLOB非常大(例如几百MB或GB),一次性读取可能导致内存问题。在这种情况下,可以分块读取:
```python
chunk_size = 1024*1024 # 每次读取1MB
content = []
while True:
chunk = clob_data.read(chunk_size)
if not chunk:
break
content.append(chunk)
full_content = ''.join(content) # 如果是文本,注意编码
```
但请注意,`oracledb`的`read()`方法在没有参数时默认读取全部内容,而且对于大多数情况(CLOB通常不会太大)是可行的。
3. **编码**:`oracledb`默认使用操作系统的编码,通常为UTF-8。如果CLOB数据是其他编码,可能需要指定编码方式读取。但通常Oracle数据库中的CLOB是使用数据库字符集存储的,而`oracledb`会自动处理转换。
4. **空值处理**:如果CLOB列允许为空,记得检查是否为`None`。
### 性能优化
- 如果只需要部分数据,可以在SQL查询中使用`DBMS_LOB.SUBSTR`函数截取部分内容,避免传输整个大对象:
```sql
SELECT id, DBMS_LOB.SUBSTR(clob_col, 4000, 1) AS clob_snippet FROM my_table
```
这里取前4000个字符(Oracle中`SUBSTR`函数对CLOB的限制是4000字节,对于多字节字符集可能不够,但可以多次调用)。
### 相关问题
1. 如何将Oracle数据库中的CLOB数据导出为文本文件?
2. 在Python中如何处理Oracle的BLOB(二进制大对象)数据?
3. 如何高效地将包含CLOB的表从Oracle迁移到MySQL?
[^1]: Oracle官方文档中关于LOB处理的示例代码可以参考`$ORACLE_HOME/rdbms/demo`目录下的文件。
[^2]: 在将Oracle迁移到MySQL时,CLOB类型需要转换为MySQL的`LONGTEXT`类型,但要注意两者在存储和性能上的差异。</think>### 如何从 Oracle 数据库读取 CLOB 列数据
在 Oracle 数据库中,CLOB(Character Large Object)用于存储大量文本数据(最大 4GB)。以下是使用 Python 读取 CLOB 列的完整解决方案:
#### 1. **安装依赖库**
```bash
pip install oracledb # 官方推荐的 Oracle Python 驱动
```
#### 2. **连接数据库并读取 CLOB**
```python
import oracledb
# 创建连接池(提高性能)
pool = oracledb.create_pool(
user="your_username",
password="your_password",
dsn="host:port/service_name", # 例如: localhost:1521/XEPDB1
min=1,
max=5,
increment=1
)
def read_clob_data(table_name, clob_column, id_value):
"""读取指定记录的CLOB内容"""
with pool.acquire() as conn:
with conn.cursor() as cursor:
# 关键:使用 outputtypehandler 处理CLOB类型
def output_type_handler(cursor, name, default_type, size, precision, scale):
if default_type == oracledb.DB_TYPE_CLOB:
return cursor.var(str, arraysize=cursor.arraysize)
cursor.outputtypehandler = output_type_handler
# 执行查询
sql = f"SELECT {clob_column} FROM {table_name} WHERE id = :id"
cursor.execute(sql, [id_value])
# 获取结果
result = cursor.fetchone()
if result:
return result[0] # 返回CLOB文本内容
return None
# 示例用法
clob_content = read_clob_data("documents", "content", 123)
print(f"CLOB内容长度: {len(clob_content)} 字符")
```
#### 3. **关键处理技术说明**
1. **`outputtypehandler` 机制**
将 CLOB 自动转换为 Python 字符串类型,避免手动处理 LOB 对象[^1]:
```python
def output_type_handler(cursor, name, default_type, ...):
if default_type == oracledb.DB_TYPE_CLOB:
return cursor.var(str) # 自动转换为字符串
```
2. **分块读取大 CLOB**
处理超大 CLOB(>100MB)时避免内存溢出:
```python
cursor.execute("SELECT DBMS_LOB.SUBSTR(content, 4000, 1) FROM ...")
# 分块读取逻辑
position = 1
chunk_size = 4000 # Oracle 单次最大读取字节数
full_content = []
while True:
chunk = cursor.fetchone()[0]
if not chunk:
break
full_content.append(chunk)
position += chunk_size
return "".join(full_content)
```
3. **空值处理**
Oracle 中空字符串 (`''`) 被存储为 `NULL`:
```python
if result[0] is None: # 检查NULL值
return ""
```
#### 4. **迁移到 MySQL 的注意事项
当迁移到 MySQL 时:
1. **数据类型转换**
Oracle `CLOB` → MySQL `LONGTEXT`(最大 4GB)
2. **空值差异**
MySQL 区分 `NULL` 和空字符串 `''`,需转换:
```sql
-- 迁移时处理
INSERT INTO mysql_table
SELECT COALESCE(clob_column, '') FROM oracle_table;
```
3. **编码问题**
确保两端数据库使用相同字符集(推荐 UTF8MB4):
```python
# 连接Oracle时指定编码
oracledb.connect(encoding="UTF-8", nencoding="UTF-8")
```
#### 5. **性能优化建议
- **连接池**:使用 `create_pool()` 减少连接开销
- **批量读取**:一次获取多行减少网络往返
- **列投影**:只选择必要列 `SELECT clob_col FROM ...`
- **异步读取**:使用 `cursor.execute_async()` 处理大CLOB
> 示例:批量读取 CLOB
```python
def batch_read_clobs(table_name, clob_column, id_list):
with pool.acquire() as conn:
with conn.cursor() as cursor:
cursor.outputtypehandler = lambda *args: (
cursor.var(str) if args[2] == oracledb.DB_TYPE_CLOB else None
)
sql = f"SELECT id, {clob_column} FROM {table_name} WHERE id IN ({','.join(':id' + str(i) for i in range(len(id_list)))})"
cursor.execute(sql, id_list)
return {row[0]: row[1] for row in cursor} # 返回 {id: clob_content} 字典
```
#### 6. **常见错误处理
| 错误 | 解决方案 |
|------|----------|
| `DatabaseError: ORA-24816` | 使用 `DBMS_LOB.SUBSTR` 分块读取 |
| `UnicodeDecodeError` | 连接时指定 `encoding="UTF-8"` |
| `TypeError: expecting string` | 添加 `outputtypehandler` 转换 |
[^1]: Oracle 官方推荐使用 `outputtypehandler` 处理 LOB 类型,可参考 `$ORACLE_HOME/rdbms/demo` 中的示例代码。
[^2]: 迁移到 MySQL 时需注意 LOB 字段的空值处理差异,避免数据不一致。
./bin/flink run -d -t yarn-per-job \ -Dyarn.application.name="flinkcdc_prod_old_smpcdb_3" \ -Dyarn.application.queue="hive" \ -Dstate.backend=rocksdb \ -Dstate.externalize-checkpoints=true \ -Dexecution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \ -Dexecution.checkpointing.interval=180s \ -Dexecution.checkpointing.timeout=7200s \ -Dexecution.checkpointing.min-pause=60s \ -Dexecution.checkpointing.max-concurrent-checkpoints=1 \ -Dstate.checkpoints.dir=hdfs://hadoop102:8020/flink_ck/offline/prod_old_smpcdb \ --allowNonRestoredState \ -Dparallelism.default=4 \ -Djobmanager.memory.process.size=2048m \ -Dtaskmanager.memory.process.size=5120m \ -Dtaskmanager.numberOfTaskSlots=2 \ -c org.apache.doris.flink.tools.cdc.CdcTools \ /opt/module/flink-1.17.2-sql-cdc/lib/flink-doris-connector-1.17-1.6.2.jar \ oracle-sync-database \ --database prod_old_smpcdb \ --oracle-conf hostname=10.1.9.2 \ --oracle-conf port=1521 \ --oracle-conf database-name=PMS2 \ --oracle-conf username=pms \ --oracle-conf password="SRX3yak#8" \ --oracle-conf schema-name=PMS \ --oracle-conf debezium.database.history.store.only.captured.tables.ddl=true \ --oracle-conf debezium.log.miner.enabled=false \ --oracle-conf debezium.snapshot.mode=initial_only \ --oracle-conf debezium.snapshot.fetch.size=20000 \ --oracle-conf debezium.column.mapping="CLOB:STRING,BLOB:STRING" \ --oracle-conf debezium.snapshot.checkpoint.interval.ms=0 \ --oracle-conf debezium.transforms=encodeBlob \ --oracle-conf debezium.transforms.encodeBlob.format=base64 \ --oracle-conf debezium.database.url="jdbc:oracle:thin:@//10.1.9.2:1521/PMS" \ --including-tables "EXP_PHOTO" \ --sink-conf fenodes=10.210.31.92:18030 \ --sink-conf username=root \ --sink-conf password=zjj000000 \ --sink-conf jdbc-url=jdbc:mysql://10.210.31.92:9030 \ --sink.buffer-size=100M \ --sink.buffer-flush.interval=60000 \ --sink-conf sink.enable-2pc=false \ --sink-conf sink.label-prefix=label_cdc_prod_old_smpcdb_340 \ --sink-conf sink.stream-load.timeout=7200 \ --sink-conf sink.connect.timeout.ms=60000 \ --sink-conf sink.request.timeout.ms=3600000 \ --table-conf replication_num=1 采集oracle 数据表到doris,大字段为空其他的字段有数据
<think>嗯,用户遇到了Flink CDC从Oracle同步数据到Doris时,大字段(CLOB/BLOB)为空的问题。这个问题很典型,涉及数据管道多个环节的配置兼容性。
用户可能正在构建实时数据仓库,Oracle作为传统业务系统,Doris做分析层,CLOB/BLOB字段可能存储的是富文本或文件数据。从ta提问的专业程度看,应该是中级以上大数据工程师,但可能对Flink CDC的细节不够熟悉。
需要重点排查几个方向:首先是数据类型映射,Oracle的CLOB/BLOB在Flink SQL里对应STRING和BYTES,但Doris端可能有差异;其次是传输过程中的截断问题,特别是默认的max_allowed_packet设置;最后是反序列化逻辑,Flink CDC的Oracle解析器对大字段处理可能有特殊逻辑。
突然想到一个关键点:Doris对VARCHAR有1MB限制,而CLOB可能超大。如果用户没配置Doris的text类型,大字段会被截断为空。这个在社区常见问题里出现过。
调试方案要分步骤:先确认源端数据是否正常,用独立SQL查询验证;然后检查Flink日志里是否有"truncated"或"overflow"警告;最后在Doris端用hex()函数检查字段是否收到0x00这样的空字节。
用户没提Flink版本,要特别注意1.16之前的老版本有CLOB反序列化bug。另外如果字段包含非UTF8编码的二进制,BLOB转STRING肯定会出问题,可能需要base64编码中转。
安全方面要提醒用户:测试环境先脱敏,特别是BLOB可能包含敏感文件。性能上,大字段同步会显著影响吞吐量,如果不需要原始内容,建议用substr截断。
补充一个隐藏坑点:Oracle的CLOB在超过4000字节时,Flink CDC可能拆分成多个LCB字段,而默认配置可能只取第一个分片。这个要在Debezium.OracleConnector的文档里查。</think>好的,我们来排查 Flink CDC 从 Oracle 采集数据到 Doris 时,大字段(CLOB/BLOB)为空而其他字段正常的问题。这个问题通常涉及数据类型映射、序列化/反序列化、传输配置或目标端处理。
**问题排查步骤:**
1. **确认源端 (Oracle) 数据是否真实存在且可访问:**
* 直接在 Oracle 数据库上执行 SQL 查询,确认目标表中的 CLOB/BLOB 字段在那些记录中确实包含非空数据。`SELECT <clob/blob_column_name> FROM <your_table> WHERE ...;`
* 检查你的 Flink CDC Oracle Connector 使用的数据库用户是否具有读取这些 CLOB/BLOB 字段的权限。
2. **检查 Flink CDC Oracle Connector 配置:**
* **`debezium.source.column.include.list` / `table.include.list`:** 确保你的配置明确包含了含有 CLOB/BLOB 字段的表和列。如果使用黑名单排除,确保没有意外排除这些列。
* **`debezium.source.database.connection.adapter`:** 确保使用的是 `logminer` 或 `xstream` (取决于你的 Oracle 版本和许可)。确认配置正确。
* **`debezium.source.*.handling.mode` (CLOB/BLOB 特定):**
* **`clob.handling.mode`:** Debezium 处理 CLOB 的策略。常用选项:
* `bytes`: 将 CLOB 作为原始字节数组传输(推荐,避免编码问题)。
* `string`: 尝试将 CLOB 解码为字符串。**这是最常见的问题点!** 如果 CLOB 包含二进制数据(如图片、PDF 等,虽然不常见但可能)或数据库字符集与 Flink 任务字符集不匹配,解码会失败导致空值或乱码。**强烈建议优先使用 `bytes` 模式。**
* `base64`: 将 CLOB 内容编码为 Base64 字符串传输。
* **`blob.handclob.handling.mode` (或 `blob.handling.mode`):** 处理 BLOB 的策略。常用选项:
* `bytes`: 将 BLOB 作为原始字节数组传输(最直接)。
* `base64`: 将 BLOB 内容编码为 Base64 字符串传输。
* **配置示例 (推荐使用 `bytes`):**
```sql
'debezium.source.clob.handling.mode' = 'bytes',
'debezium.source.blob.handling.mode' = 'bytes'
```
* **`debezium.source.max.*.size`:** 检查是否有设置 `max.clob.size` 或 `max.blob.size` 限制,并且这个限制是否小于你实际的数据大小?如果数据大于这个限制,可能会被截断或处理为空。确保该值足够大(例如 `104857600` 表示 100MB)。
* **`debezium.source.snapshot.mode`:** 如果问题只出现在增量同步,而全量快照时数据正常,考虑快照模式下处理大字段的差异(较少见,但需留意)。
* **日志级别:** 将 Flink 任务的日志级别(特别是 `org.apache.flink.cdc` 和 `io.debezium` 包)设置为 `DEBUG` 或 `TRACE`。检查日志中是否有关于处理 CLOB/BLOB 字段的警告或错误信息,例如字符集转换错误、截断提示、反序列化失败等。这是**最重要的诊断信息来源**。
3. **检查 Flink SQL 数据类型映射:**
* 在你的 `CREATE TABLE` 语句(用于 Flink CDC Source)中,确保 CLOB/BLOB 字段被映射到了正确的 Flink SQL 类型。
* 如果使用 `clob.handling.mode=bytes` 或 `blob.handling.mode=bytes`,对应的 Flink SQL 类型应该是 `BYTES`。
* 如果使用 `clob.handling.mode=string`,对应的 Flink SQL 类型应该是 `STRING`。
* 如果使用 `clob.handling.mode=base64` 或 `blob.handling.mode=base64`,对应的 Flink SQL 类型应该是 `STRING` (存储 Base64 字符串)。
* **错误的映射(例如配置为 `bytes` 传输但 Flink SQL 定义为 `STRING`)会导致反序列化失败,通常表现为空值或异常。**
* 示例 Source DDL (使用 `bytes` 模式):
```sql
CREATE TABLE oracle_source (
id INT,
clob_data BYTES, -- 对应 debezium.source.clob.handling.mode=bytes
blob_data BYTES, -- 对应 debezium.source.blob.handling.mode=bytes
other_fields STRING,
...
) WITH (
'connector' = 'oracle-cdc',
...
'debezium.source.clob.handling.mode' = 'bytes',
'debezium.source.blob.handling.mode' = 'bytes',
...
);
```
4. **检查 Doris Sink 配置与数据类型映射:**
* 在你的 Doris Sink `CREATE TABLE` 语句中,确认 `BYTES` 类型的字段映射到了合适的 Doris 类型。
* **Doris 没有直接的 `BYTES` 类型。** 通常有两种选择:
* **`VARCHAR`/`STRING`:** 如果源端使用了 `base64` 模式传输,或者在 Flink 作业中进行了 `base64` 编码转换(`SELECT ..., TO_BASE64(clob_data) AS clob_data_base64, ...`),那么在 Doris 端可以定义为 `VARCHAR` 或 `STRING` 来存储 Base64 字符串。查询时应用端需要再进行 Base64 解码。
* **`STRING`:** 如果源端使用了 `string` 模式传输 CLOB,且内容是纯文本,Doris 端定义为 `STRING`。
* **`BITMAP` (不推荐):** Doris 的 `BITMAP` 类型理论上可以存储二进制,但它主要用于高效的 bitmap 计算,作为通用二进制存储不太合适,且大小有限制。
* **`HLL` (不推荐):** 同 `BITMAP`,用于 HyperLogLog 计算,不适合。
* **目前 Doris (截止 1.2.4+) 最推荐存储原始二进制的方式是使用 `VARCHAR(65533)` 或 `TEXT` 类型,并利用其 `hex` 函数进行十六进制编码存储和解码。** 但这需要额外转换:
* **Flink Sink 转换:** 在 Flink SQL 写入 Doris 之前,将 `BYTES` 字段转换为十六进制字符串:`SELECT ..., HEX(clob_data) AS clob_data_hex, ...`。然后在 Doris Sink DDL 中将该字段定义为 `VARCHAR(16777216)` (或更大,取决于最大字段预估大小) 或 `TEXT`。
* **Doris 查询解码:** 在 Doris 中查询时,使用 `UNHEX(clob_data_hex)` 将十六进制字符串转换回二进制字节。应用端再根据原始数据类型(图片、文档等)处理这些字节。
* **关键点:** 确保 Flink Sink 写入的数据类型(可能是 `BYTES` 或 `STRING`)与 Doris 表定义的字段类型兼容。如果 Doris 字段类型是 `VARCHAR`/`STRING` 而写入的是未经转换的 `BYTES` 对象,很可能导致写入失败或存储为空/乱码。
* **Doris 字段长度限制:** 确认 Doris 中目标字段定义的 `VARCHAR(len)` 长度或 `TEXT` 是否足以容纳转换后的数据(例如 Base64 字符串或 HEX 字符串会比原始二进制大)。Base64 会增大约 33%,HEX 会增大一倍。如果定义的长度不足,数据会被截断导致部分丢失或表现为空。
5. **检查 Flink 任务本身:**
* **转换逻辑:** 如果你的 Flink SQL 作业在 Source 和 Sink 之间有 `INSERT INTO ... SELECT ...` 语句,仔细检查 `SELECT` 部分是否正确处理了 CLOB/BLOB 字段?是否有遗漏、误转换(如错误地调用了 `CAST(... AS STRING)`)或过滤?
* **序列化器/反序列化器:** 确保 Flink 使用的序列化框架(如 Flink CDC 内部使用的 Debezium 反序列化器,Flink 自身的类型序列化)能够正确处理 `BYTES` 类型。这通常由框架自动处理,但配置错误(如上述的数据类型映射错误)会导致问题。
* **网络/内存配置:** 虽然不太可能是首要原因,但如果 CLOB/BLOB 非常大,检查 Flink TaskManager 的 JVM 堆内存 (`taskmanager.memory.task.heap.size`) 和网络缓冲区 (`taskmanager.memory.network.fraction/min/max`) 是否足够,避免 OOM 导致数据丢失或任务失败。检查 Flink UI 的 TaskManager 日志是否有 OOM 错误。
**总结与建议的解决方案:**
1. **首选方案 (推荐):**
* **Source 配置:** `debezium.source.clob.handling.mode = 'bytes'`, `debezium.source.blob.handling.mode = 'bytes'`。确保 `max.clob.size`/`max.blob.size` 足够大。
* **Flink SQL Source DDL:** 将 CLOB/BLOB 字段定义为 `BYTES`。
* **Flink SQL 转换:** 在写入 Doris Sink 的 `SELECT` 语句中,使用 `HEX(clob_data) AS clob_data_hex` 将 `BYTES` 字段转换为十六进制字符串。
* **Doris Sink DDL:** 将 `clob_data_hex` 字段定义为 `TEXT` 或足够大的 `VARCHAR` (例如 `VARCHAR(16777216)`)。如果字段非常多或超大,考虑 Doris 的 `LARGEINT` 或 `STRING` 类型(需确认 Doris 版本支持的最大长度)。
* **Doris 查询:** 使用 `UNHEX(clob_data_hex)` 获取原始二进制数据。
2. **备选方案:**
* **Source 配置:** `debezium.source.clob.handling.mode = 'base64'`, `debezium.source.blob.handling.mode = 'base64'`。
* **Flink SQL Source DDL:** 将 CLOB/BLOB 字段定义为 `STRING` (存储 Base64 字符串)。
* **Flink SQL 转换:** 通常无需额外转换,直接 `SELECT ..., clob_data, ...`。
* **Doris Sink DDL:** 将 `clob_data` 字段定义为 `TEXT` 或足够大的 `VARCHAR`。Base64 字符串是文本,可以直接存储。
* **应用端:** 读取到 Base64 字符串后,需要调用 Base64 解码库还原成二进制。
**务必启用 DEBUG/TRACE 日志,这是定位问题最直接的手段!** 日志中通常会明确记录处理大字段时遇到的异常或警告。
**相关问题:**
1. **除了 `bytes` 和 `base64`,Debezium 处理 Oracle CLOB/BLOB 还有其他模式吗?它们各自的优缺点是什么?**
2. **在 Doris 中存储大二进制对象(如图片、文档)的最佳实践是什么?除了 `TEXT`/`VARCHAR` 配合 `HEX`/`UNHEX` 或 Base64,还有更优的方案吗?**
3. **Flink CDC 同步包含超大 CLOB/BLOB (如 > 100MB) 的 Oracle 表时,需要注意哪些额外的性能调优和配置(如 Flink 内存管理、Debezium 参数、Doris 导入配置)?**
4. **如何监控 Flink CDC 作业中 CLOB/BLOB 字段的传输是否成功以及数据大小?**
5. **如果 CLOB 字段包含的是纯文本,使用 `string` 模式传输到 Doris 的 `STRING` 类型,需要额外注意哪些字符集编码问题?如何确保编码一致?**
[^1]: 引用自内部关于 Flink Checkpoint 配置的说明,用于示例引用格式。
阅读全文
相关推荐

















