Skip to content
This repository was archived by the owner on Dec 14, 2023. It is now read-only.

Commit f41f7c9

Browse files
committed
Revert "Copy just story_sentences shards"
This reverts commit 314dbcd.
1 parent 2f332be commit f41f7c9

File tree

2 files changed

+311
-12
lines changed

2 files changed

+311
-12
lines changed

apps/move-rows-to-shards/src/main/java/org/mediacloud/mrts/MoveRowsToShardsWorkflowImpl.java

+148-6
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,153 @@ public class MoveRowsToShardsWorkflowImpl implements MoveRowsToShardsWorkflow {
1111

1212
@Override
1313
public void moveRowsToShards() {
14-
Workflow.newChildWorkflowStub(
15-
StoriesWorkflow.class,
16-
ChildWorkflowOptions.newBuilder()
17-
.setWorkflowId("stories")
18-
.build()
19-
).moveStories();
14+
// First level tables that don't have any dependencies
15+
Promise<Void> authUserRequestDailyCountsPromise = Async.procedure(
16+
Workflow.newChildWorkflowStub(
17+
AuthUserRequestDailyCountsWorkflow.class,
18+
ChildWorkflowOptions.newBuilder()
19+
.setWorkflowId("auth_user_request_daily_counts")
20+
.build()
21+
)::moveAuthUserRequestDailyCounts
22+
);
23+
Promise<Void> mediaStatsPromise = Async.procedure(
24+
Workflow.newChildWorkflowStub(
25+
MediaStatsWorkflow.class,
26+
ChildWorkflowOptions.newBuilder()
27+
.setWorkflowId("media_stats")
28+
.build()
29+
)::moveMediaStats
30+
);
31+
Promise<Void> mediaCoverageGapsPromise = Async.procedure(
32+
Workflow.newChildWorkflowStub(
33+
MediaCoverageGapsWorkflow.class,
34+
ChildWorkflowOptions.newBuilder()
35+
.setWorkflowId("media_coverage_gaps")
36+
.build()
37+
)::moveMediaCoverageGaps
38+
);
39+
Promise<Void> storiesPromise = Async.procedure(
40+
Workflow.newChildWorkflowStub(
41+
StoriesWorkflow.class,
42+
ChildWorkflowOptions.newBuilder()
43+
.setWorkflowId("stories")
44+
.build()
45+
)::moveStories
46+
);
47+
Promise<Void> downloadsPromise = Async.procedure(
48+
Workflow.newChildWorkflowStub(
49+
DownloadsWorkflow.class,
50+
ChildWorkflowOptions.newBuilder()
51+
.setWorkflowId("downloads")
52+
.build()
53+
)::moveDownloads
54+
);
55+
Promise<Void> topicStoriesPromise = Async.procedure(
56+
Workflow.newChildWorkflowStub(
57+
TopicStoriesWorkflow.class,
58+
ChildWorkflowOptions.newBuilder()
59+
.setWorkflowId("topic_stories")
60+
.build()
61+
)::moveTopicStories
62+
);
63+
Promise<Void> topicLinksPromise = Async.procedure(
64+
Workflow.newChildWorkflowStub(
65+
TopicLinksWorkflow.class,
66+
ChildWorkflowOptions.newBuilder()
67+
.setWorkflowId("topic_links")
68+
.build()
69+
)::moveTopicLinks
70+
);
71+
Promise<Void> topicPostsPromise = Async.procedure(
72+
Workflow.newChildWorkflowStub(
73+
TopicPostsWorkflow.class,
74+
ChildWorkflowOptions.newBuilder()
75+
.setWorkflowId("topic_posts")
76+
.build()
77+
)::moveTopicPosts
78+
);
79+
Promise<Void> snapStoriesPromise = Async.procedure(
80+
Workflow.newChildWorkflowStub(
81+
SnapStoriesWorkflow.class,
82+
ChildWorkflowOptions.newBuilder()
83+
.setWorkflowId("snap.stories")
84+
.build()
85+
)::moveSnapStories
86+
);
87+
Promise<Void> snapMediaPromise = Async.procedure(
88+
Workflow.newChildWorkflowStub(
89+
SnapMediaWorkflow.class,
90+
ChildWorkflowOptions.newBuilder()
91+
.setWorkflowId("snap.media")
92+
.build()
93+
)::moveSnapMedia
94+
);
95+
Promise<Void> snapMediaTagsMapPromise = Async.procedure(
96+
Workflow.newChildWorkflowStub(
97+
SnapMediaTagsMapWorkflow.class,
98+
ChildWorkflowOptions.newBuilder()
99+
.setWorkflowId("snap.media_tags_map")
100+
.build()
101+
)::moveSnapMediaTagsMap
102+
);
103+
Promise<Void> snapStoriesTagsMapPromise = Async.procedure(
104+
Workflow.newChildWorkflowStub(
105+
SnapStoriesTagsMapWorkflow.class,
106+
ChildWorkflowOptions.newBuilder()
107+
.setWorkflowId("snap.stories_tags_map")
108+
.build()
109+
)::moveSnapStoriesTagsMap
110+
);
111+
Promise<Void> snapStoryLinksPromise = Async.procedure(
112+
Workflow.newChildWorkflowStub(
113+
SnapStoryLinksWorkflow.class,
114+
ChildWorkflowOptions.newBuilder()
115+
.setWorkflowId("snap.story_links")
116+
.build()
117+
)::moveSnapStoryLinks
118+
);
119+
Promise<Void> snapStoryLinkCountsPromise = Async.procedure(
120+
Workflow.newChildWorkflowStub(
121+
SnapStoryLinkCountsWorkflow.class,
122+
ChildWorkflowOptions.newBuilder()
123+
.setWorkflowId("snap.story_link_counts")
124+
.build()
125+
)::moveSnapStoryLinkCounts
126+
);
127+
Promise<Void> snapMediumLinkCountsPromise = Async.procedure(
128+
Workflow.newChildWorkflowStub(
129+
SnapMediumLinkCountsWorkflow.class,
130+
ChildWorkflowOptions.newBuilder()
131+
.setWorkflowId("snap.medium_link_counts")
132+
.build()
133+
)::moveSnapMediumLinkCounts
134+
);
135+
Promise<Void> snapMediumLinksPromise = Async.procedure(
136+
Workflow.newChildWorkflowStub(
137+
SnapMediumLinksWorkflow.class,
138+
ChildWorkflowOptions.newBuilder()
139+
.setWorkflowId("snap.medium_links")
140+
.build()
141+
)::moveSnapMediumLinks
142+
);
143+
144+
Promise.allOf(
145+
authUserRequestDailyCountsPromise,
146+
mediaStatsPromise,
147+
mediaCoverageGapsPromise,
148+
storiesPromise,
149+
downloadsPromise,
150+
topicStoriesPromise,
151+
topicLinksPromise,
152+
topicPostsPromise,
153+
snapStoriesPromise,
154+
snapMediaPromise,
155+
snapMediaTagsMapPromise,
156+
snapStoriesTagsMapPromise,
157+
snapStoryLinksPromise,
158+
snapStoryLinkCountsPromise,
159+
snapMediumLinkCountsPromise,
160+
snapMediumLinksPromise
161+
).get();
20162
}
21163
}

apps/move-rows-to-shards/src/main/java/org/mediacloud/mrts/tables/StoriesWorkflowImpl.java

+163-6
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,168 @@ public class StoriesWorkflowImpl extends TableMoveWorkflow implements StoriesWor
1212

1313
@Override
1414
public void moveStories() {
15-
Workflow.newChildWorkflowStub(
16-
StorySentencesWorkflow.class,
17-
ChildWorkflowOptions.newBuilder()
18-
.setWorkflowId("story_sentences")
19-
.build()
20-
).moveStorySentences();
15+
this.moveTable(
16+
"unsharded_public.stories",
17+
"stories_id",
18+
// 2,119,319,121 in source table
19+
3_000_000,
20+
List.of(String.format("""
21+
WITH deleted_rows AS (
22+
DELETE FROM unsharded_public.stories
23+
WHERE stories_id BETWEEN %s AND %s
24+
RETURNING
25+
stories_id,
26+
media_id,
27+
url,
28+
guid,
29+
title,
30+
normalized_title_hash,
31+
description,
32+
publish_date,
33+
collect_date,
34+
full_text_rss,
35+
language
36+
)
37+
INSERT INTO sharded_public.stories (
38+
stories_id,
39+
media_id,
40+
url,
41+
guid,
42+
title,
43+
normalized_title_hash,
44+
description,
45+
publish_date,
46+
collect_date,
47+
full_text_rss,
48+
language
49+
)
50+
SELECT
51+
stories_id::BIGINT,
52+
media_id::BIGINT,
53+
url::TEXT,
54+
guid::TEXT,
55+
title,
56+
normalized_title_hash,
57+
description,
58+
publish_date,
59+
collect_date,
60+
full_text_rss,
61+
language
62+
FROM deleted_rows
63+
""", START_ID_MARKER, END_ID_MARKER))
64+
);
65+
66+
Promise<Void> storiesApSyndicatedPromise = Async.procedure(
67+
Workflow.newChildWorkflowStub(
68+
StoriesApSyndicatedWorkflow.class,
69+
ChildWorkflowOptions.newBuilder()
70+
.setWorkflowId("stories_ap_syndicated")
71+
.build()
72+
)::moveStoriesApSyndicated
73+
);
74+
Promise<Void> storyUrlsPromise = Async.procedure(
75+
Workflow.newChildWorkflowStub(
76+
StoryUrlsWorkflow.class,
77+
ChildWorkflowOptions.newBuilder()
78+
.setWorkflowId("story_urls")
79+
.build()
80+
)::moveStoryUrls
81+
);
82+
Promise<Void> feedsStoriesMapPromise = Async.procedure(
83+
Workflow.newChildWorkflowStub(
84+
FeedsStoriesMapWorkflow.class,
85+
ChildWorkflowOptions.newBuilder()
86+
.setWorkflowId("feeds_stories_map")
87+
.build()
88+
)::moveFeedsStoriesMap
89+
);
90+
Promise<Void> storiesTagsMapPromise = Async.procedure(
91+
Workflow.newChildWorkflowStub(
92+
StoriesTagsMapWorkflow.class,
93+
ChildWorkflowOptions.newBuilder()
94+
.setWorkflowId("stories_tags_map")
95+
.build()
96+
)::moveStoriesTagsMap
97+
);
98+
Promise<Void> storySentencesPromise = Async.procedure(
99+
Workflow.newChildWorkflowStub(
100+
StorySentencesWorkflow.class,
101+
ChildWorkflowOptions.newBuilder()
102+
.setWorkflowId("story_sentences")
103+
.build()
104+
)::moveStorySentences
105+
);
106+
Promise<Void> solrImportStoriesPromise = Async.procedure(
107+
Workflow.newChildWorkflowStub(
108+
SolrImportStoriesWorkflow.class,
109+
ChildWorkflowOptions.newBuilder()
110+
.setWorkflowId("solr_import_stories")
111+
.build()
112+
)::moveSolrImportStories
113+
);
114+
Promise<Void> solrImportedStoriesPromise = Async.procedure(
115+
Workflow.newChildWorkflowStub(
116+
SolrImportedStoriesWorkflow.class,
117+
ChildWorkflowOptions.newBuilder()
118+
.setWorkflowId("solr_imported_stories")
119+
.build()
120+
)::moveSolrImportedStories
121+
);
122+
Promise<Void> topicMergedStoriesMapPromise = Async.procedure(
123+
Workflow.newChildWorkflowStub(
124+
TopicMergedStoriesMapWorkflow.class,
125+
ChildWorkflowOptions.newBuilder()
126+
.setWorkflowId("topic_merged_stories_map")
127+
.build()
128+
)::moveTopicMergedStoriesMap
129+
);
130+
Promise<Void> storyStatisticsPromise = Async.procedure(
131+
Workflow.newChildWorkflowStub(
132+
StoryStatisticsWorkflow.class,
133+
ChildWorkflowOptions.newBuilder()
134+
.setWorkflowId("story_statistics")
135+
.build()
136+
)::moveStoryStatistics
137+
);
138+
Promise<Void> processedStoriesPromise = Async.procedure(
139+
Workflow.newChildWorkflowStub(
140+
ProcessedStoriesWorkflow.class,
141+
ChildWorkflowOptions.newBuilder()
142+
.setWorkflowId("processed_stories")
143+
.build()
144+
)::moveProcessedStories
145+
);
146+
Promise<Void> scrapedStoriesPromise = Async.procedure(
147+
Workflow.newChildWorkflowStub(
148+
ScrapedStoriesWorkflow.class,
149+
ChildWorkflowOptions.newBuilder()
150+
.setWorkflowId("scraped_stories")
151+
.build()
152+
)::moveScrapedStories
153+
);
154+
Promise<Void> storyEnclosuresPromise = Async.procedure(
155+
Workflow.newChildWorkflowStub(
156+
StoryEnclosuresWorkflow.class,
157+
ChildWorkflowOptions.newBuilder()
158+
.setWorkflowId("story_enclosures")
159+
.build()
160+
)::moveStoryEnclosures
161+
);
162+
163+
// Move tables that depend on "stories"
164+
Promise.allOf(
165+
storiesApSyndicatedPromise,
166+
storyUrlsPromise,
167+
feedsStoriesMapPromise,
168+
storiesTagsMapPromise,
169+
storySentencesPromise,
170+
solrImportStoriesPromise,
171+
solrImportedStoriesPromise,
172+
topicMergedStoriesMapPromise,
173+
storyStatisticsPromise,
174+
processedStoriesPromise,
175+
scrapedStoriesPromise,
176+
storyEnclosuresPromise
177+
).get();
21178
}
22179
}

0 commit comments

Comments
 (0)