Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/iceberg/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,17 @@ Result<std::unique_ptr<NameMapping>> NameMappingFromJson(const nlohmann::json& j
return NameMapping::Make(std::move(mapped_fields));
}

Result<std::string> UpdateMappingFromJsonString(
std::string_view mapping_json,
const std::unordered_map<int32_t, std::shared_ptr<SchemaField>>& updates,
const std::multimap<int32_t, int32_t>& adds) {
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(std::string(mapping_json)));
ICEBERG_ASSIGN_OR_RAISE(auto current_mapping, NameMappingFromJson(json));
ICEBERG_ASSIGN_OR_RAISE(auto updated_mapping,
UpdateMapping(*current_mapping, updates, adds));
return ToJsonString(ToJson(*updated_mapping));
}

nlohmann::json ToJson(const TableIdentifier& identifier) {
nlohmann::json json;
json[kNamespace] = identifier.ns.levels;
Expand Down
14 changes: 14 additions & 0 deletions src/iceberg/json_serde_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@

#pragma once

#include <map>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <unordered_map>

#include <nlohmann/json_fwd.hpp>

Expand Down Expand Up @@ -347,6 +352,15 @@ ICEBERG_EXPORT nlohmann::json ToJson(const NameMapping& name_mapping);
ICEBERG_EXPORT Result<std::unique_ptr<NameMapping>> NameMappingFromJson(
const nlohmann::json& json);

/// \brief Update a name mapping from its JSON string and return updated JSON.
///
/// Parses the JSON, calls UpdateMapping, and serializes the result.
/// Returns an error if parsing, mapping update, or serialization fails.
ICEBERG_EXPORT Result<std::string> UpdateMappingFromJsonString(
std::string_view mapping_json,
const std::unordered_map<int32_t, std::shared_ptr<SchemaField>>& updates,
const std::multimap<int32_t, int32_t>& adds);

/// \brief Serializes a `TableIdentifier` object to JSON.
///
/// \param identifier The `TableIdentifier` object to be serialized.
Expand Down
163 changes: 163 additions & 0 deletions src/iceberg/name_mapping.cc
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,159 @@ class CreateMappingVisitor {
}
};

// Visitor class for updating name mappings with schema changes
class UpdateMappingVisitor {
public:
UpdateMappingVisitor(
const std::unordered_map<int32_t, std::shared_ptr<SchemaField>>& updates,
const std::multimap<int32_t, int32_t>& adds)
: updates_(updates), adds_(adds) {}

Result<std::unique_ptr<MappedFields>> VisitMapping(const NameMapping& mapping) {
auto fields_result = VisitFields(mapping.AsMappedFields());
ICEBERG_RETURN_UNEXPECTED(fields_result);
return AddNewFields(std::move(*fields_result),
-1 /* parent ID for top-level fields */);
}

private:
Result<std::unique_ptr<MappedFields>> VisitFields(const MappedFields& fields) {
// Recursively visit all fields
std::vector<MappedField> field_results;
field_results.reserve(fields.Size());

for (const auto& field : fields.fields()) {
auto field_result = VisitField(field);
ICEBERG_RETURN_UNEXPECTED(field_result);
field_results.push_back(std::move(*field_result));
}

// Build update assignments map for removing reassigned names
std::unordered_map<std::string, int32_t> update_assignments;
for (const auto& field : field_results) {
if (field.field_id.has_value()) {
auto update_it = updates_.find(field.field_id.value());
if (update_it != updates_.end()) {
update_assignments.emplace(std::string(update_it->second->name()),
field.field_id.value());
}
}
}

// Remove reassigned names from all fields
for (auto& field : field_results) {
field = RemoveReassignedNames(field, update_assignments);
}

return MappedFields::Make(std::move(field_results));
}

Result<MappedField> VisitField(const MappedField& field) {
// Update this field's names
std::unordered_set<std::string> field_names = field.names;
if (field.field_id.has_value()) {
auto update_it = updates_.find(field.field_id.value());
if (update_it != updates_.end()) {
field_names.insert(std::string(update_it->second->name()));
}
}

std::unique_ptr<MappedFields> nested_mapping = nullptr;
if (field.nested_mapping != nullptr) {
auto nested_result = VisitFields(*field.nested_mapping);
ICEBERG_RETURN_UNEXPECTED(nested_result);
nested_mapping = std::move(*nested_result);
}

// Add a new mapping for any new nested fields
if (field.field_id.has_value()) {
auto nested_result =
AddNewFields(std::move(nested_mapping), field.field_id.value());
ICEBERG_RETURN_UNEXPECTED(nested_result);
nested_mapping = std::move(*nested_result);
}

return MappedField{
.names = std::move(field_names),
.field_id = field.field_id,
.nested_mapping = std::move(nested_mapping),
};
}

Result<std::unique_ptr<MappedFields>> AddNewFields(
std::unique_ptr<MappedFields> mapping, int32_t parent_id) {
auto range = adds_.equal_range(parent_id);
std::vector<const SchemaField*> fields_to_add;
for (auto it = range.first; it != range.second; ++it) {
auto update_it = updates_.find(it->second);
if (update_it != updates_.end()) {
fields_to_add.push_back(update_it->second.get());
}
}

if (fields_to_add.empty()) {
return std::move(mapping);
}

std::vector<MappedField> new_fields;
CreateMappingVisitor create_visitor;
for (const auto* field_to_add : fields_to_add) {
auto nested_result = VisitType(
*field_to_add->type(),
[&create_visitor](const auto& type) { return create_visitor.Visit(type); });
ICEBERG_RETURN_UNEXPECTED(nested_result);

new_fields.emplace_back(MappedField{
.names = {std::string(field_to_add->name())},
.field_id = field_to_add->field_id(),
.nested_mapping = std::move(*nested_result),
});
}

if (mapping == nullptr || mapping->Size() == 0) {
return MappedFields::Make(std::move(new_fields));
}

// Build assignments map for removing reassigned names
std::unordered_map<std::string, int32_t> assignments;
for (const auto* field_to_add : fields_to_add) {
assignments.emplace(std::string(field_to_add->name()), field_to_add->field_id());
}

// create a copy of fields that can be updated (append new fields, replace existing
// for reassignment)
std::vector<MappedField> fields;
fields.reserve(mapping->Size() + new_fields.size());
for (const auto& field : mapping->fields()) {
fields.push_back(RemoveReassignedNames(field, assignments));
}

fields.insert(fields.end(), std::make_move_iterator(new_fields.begin()),
std::make_move_iterator(new_fields.end()));

return MappedFields::Make(std::move(fields));
}

static MappedField RemoveReassignedNames(
const MappedField& field,
const std::unordered_map<std::string, int32_t>& assignments) {
std::unordered_set<std::string> updated_names = field.names;
std::erase_if(updated_names, [&](const std::string& name) {
auto assign_it = assignments.find(name);
return assign_it != assignments.end() &&
(!field.field_id.has_value() || assign_it->second != field.field_id.value());
});
return MappedField{
.names = std::move(updated_names),
.field_id = field.field_id,
.nested_mapping = field.nested_mapping,
};
}

const std::unordered_map<int32_t, std::shared_ptr<SchemaField>>& updates_;
const std::multimap<int32_t, int32_t>& adds_;
};

} // namespace

Result<std::unique_ptr<NameMapping>> CreateMapping(const Schema& schema) {
Expand All @@ -335,4 +488,14 @@ Result<std::unique_ptr<NameMapping>> CreateMapping(const Schema& schema) {
return NameMapping::Make(std::move(*result));
}

Result<std::unique_ptr<NameMapping>> UpdateMapping(
const NameMapping& mapping,
const std::unordered_map<int32_t, std::shared_ptr<SchemaField>>& updates,
const std::multimap<int32_t, int32_t>& adds) {
UpdateMappingVisitor visitor(updates, adds);
auto result = visitor.VisitMapping(mapping);
ICEBERG_RETURN_UNEXPECTED(result);
return NameMapping::Make(std::move(*result));
}

} // namespace iceberg
10 changes: 5 additions & 5 deletions src/iceberg/name_mapping.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#pragma once

#include <functional>
#include <map>
#include <memory>
#include <optional>
#include <span>
Expand Down Expand Up @@ -143,16 +144,15 @@ ICEBERG_EXPORT std::string ToString(const NameMapping& mapping);
/// \return A new NameMapping instance initialized with the schema's fields and names.
ICEBERG_EXPORT Result<std::unique_ptr<NameMapping>> CreateMapping(const Schema& schema);

/// TODO(gangwu): implement this function once SchemaUpdate is supported
///
/// \brief Update a name-based mapping using changes to a schema.
/// \param mapping a name-based mapping
/// \param updates a map from field ID to updated field definitions
/// \param adds a map from parent field ID to nested fields to be added
/// \return an updated mapping with names added to renamed fields and the mapping extended
/// for new fields
// ICEBERG_EXPORT Result<std::unique_ptr<NameMapping>> UpdateMapping(
// const NameMapping& mapping, const std::map<int32_t, SchemaField>& updates,
// const std::multimap<int32_t, int32_t>& adds);
ICEBERG_EXPORT Result<std::unique_ptr<NameMapping>> UpdateMapping(
const NameMapping& mapping,
const std::unordered_map<int32_t, std::shared_ptr<SchemaField>>& updates,
const std::multimap<int32_t, int32_t>& adds);

} // namespace iceberg
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ if(ICEBERG_BUILD_BUNDLE)
SOURCES
expire_snapshots_test.cc
fast_append_test.cc
name_mapping_update_test.cc
snapshot_manager_test.cc
transaction_test.cc
update_location_test.cc
Expand Down
Loading
Loading