Skip to content

Commit 6c99767

Browse files
feat: add pubsublite-kafka-auth module (#363)
* feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent c152761 commit 6c99767

File tree

10 files changed

+450
-27
lines changed

10 files changed

+450
-27
lines changed

README.md

+4-3
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,20 @@ If you are using Maven, add this to your pom.xml file:
1919
<dependency>
2020
<groupId>com.google.cloud</groupId>
2121
<artifactId>pubsublite-kafka</artifactId>
22-
<version>1.0.3</version>
22+
<version>1.0.4</version>
2323
</dependency>
2424
```
2525

2626
If you are using Gradle without BOM, add this to your dependencies:
2727

2828
```Groovy
29-
implementation 'com.google.cloud:pubsublite-kafka:1.0.3'
29+
implementation 'com.google.cloud:pubsublite-kafka:1.0.4'
3030
```
3131

3232
If you are using SBT, add this to your dependencies:
3333

3434
```Scala
35-
libraryDependencies += "com.google.cloud" % "pubsublite-kafka" % "1.0.3"
35+
libraryDependencies += "com.google.cloud" % "pubsublite-kafka" % "1.0.4"
3636
```
3737

3838
## Authentication
@@ -160,6 +160,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-pubsublite-ka
160160
| Sample | Source Code | Try it |
161161
| --------------------------- | --------------------------------- | ------ |
162162
| Consumer Example | [source code](https://github.com/googleapis/java-pubsublite-kafka/blob/main/samples/snippets/src/main/java/pubsublite/ConsumerExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsublite-kafka&page=editor&open_in_editor=samples/snippets/src/main/java/pubsublite/ConsumerExample.java) |
163+
| Kafka Producer Example | [source code](https://github.com/googleapis/java-pubsublite-kafka/blob/main/samples/snippets/src/main/java/pubsublite/KafkaProducerExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsublite-kafka&page=editor&open_in_editor=samples/snippets/src/main/java/pubsublite/KafkaProducerExample.java) |
163164
| Producer Example | [source code](https://github.com/googleapis/java-pubsublite-kafka/blob/main/samples/snippets/src/main/java/pubsublite/ProducerExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsublite-kafka&page=editor&open_in_editor=samples/snippets/src/main/java/pubsublite/ProducerExample.java) |
164165

165166

kafka_gcp_credentials.py

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import base64
2+
import datetime
3+
import google.auth
4+
import google.auth.transport.urllib3
5+
import http.server
6+
import json
7+
import urllib3
8+
9+
_credentials, _project = google.auth.default()
10+
_http_client = urllib3.PoolManager()
11+
12+
13+
def valid_credentials():
14+
if not _credentials.valid:
15+
_credentials.refresh(
16+
google.auth.transport.urllib3.Request(_http_client))
17+
return _credentials
18+
19+
20+
_HEADER = json.dumps(dict(typ='JWT', alg='GOOG_TOKEN'))
21+
22+
23+
def get_jwt(creds):
24+
return json.dumps(dict(exp=creds.expiry.timestamp(),
25+
iat=datetime.datetime.utcnow().timestamp(),
26+
scope='pubsub',
27+
sub='unused'))
28+
29+
30+
def b64_encode(source):
31+
return base64.urlsafe_b64encode(source.encode('utf-8')).decode('utf-8')
32+
33+
34+
def get_kafka_access_token(creds):
35+
return '.'.join([b64_encode(_HEADER), b64_encode(get_jwt(creds)),
36+
b64_encode(creds.token)])
37+
38+
39+
def build_message():
40+
creds = valid_credentials()
41+
expiry_seconds = (creds.expiry - datetime.datetime.utcnow()).total_seconds()
42+
return json.dumps(
43+
dict(access_token=get_kafka_access_token(creds), token_type='bearer',
44+
expires_in=expiry_seconds))
45+
46+
47+
class AuthHandler(http.server.BaseHTTPRequestHandler):
48+
def _handle(self):
49+
self.send_response(200)
50+
self.send_header('Content-type', 'text/plain')
51+
self.end_headers()
52+
self.wfile.write(build_message().encode('utf-8'))
53+
54+
def do_GET(self):
55+
self._handle()
56+
57+
def do_POST(self):
58+
self._handle()
59+
60+
61+
def run_server():
62+
server_address = ('localhost', 14293)
63+
server = http.server.ThreadingHTTPServer(server_address, AuthHandler)
64+
print("Serving on localhost:14293. This is not accessible outside of the "
65+
"current machine.")
66+
server.serve_forever()
67+
68+
69+
if __name__ == '__main__':
70+
run_server()

pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
<description>Parent POM for Pub/Sub Lite Kafka Integrations</description>
1616
<modules>
1717
<module>pubsublite-kafka</module>
18+
<module>pubsublite-kafka-auth</module>
1819
</modules>
1920
<properties>
2021
<psl.version>1.9.2</psl.version>

pubsublite-kafka-auth/pom.xml

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?xml version="1.0"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<parent>
4+
<groupId>com.google.cloud</groupId>
5+
<artifactId>pubsublite-kafka-parent</artifactId>
6+
<version>1.0.5-SNAPSHOT</version><!-- {x-version-update:pubsublite-kafka:current} -->
7+
</parent>
8+
<modelVersion>4.0.0</modelVersion>
9+
<groupId>com.google.cloud</groupId>
10+
<artifactId>pubsublite-kafka-auth</artifactId>
11+
<version>1.0.5-SNAPSHOT</version><!-- {x-version-update:pubsublite-kafka:current} -->
12+
<packaging>jar</packaging>
13+
<name>Pub/Sub Lite Kafka Auth</name>
14+
<url>https://github.com/googleapis/java-pubsublite-kafka</url>
15+
<description>Kafka Auth Provider for Google Cloud Pub/Sub Lite</description>
16+
<dependencies>
17+
<dependency>
18+
<groupId>com.google.auth</groupId>
19+
<artifactId>google-auth-library-oauth2-http</artifactId>
20+
</dependency>
21+
<dependency>
22+
<groupId>com.google.cloud</groupId>
23+
<artifactId>google-cloud-pubsublite</artifactId>
24+
</dependency>
25+
<dependency>
26+
<groupId>com.google.code.gson</groupId>
27+
<artifactId>gson</artifactId>
28+
</dependency>
29+
<dependency>
30+
<groupId>com.google.guava</groupId>
31+
<artifactId>guava</artifactId>
32+
</dependency>
33+
</dependencies>
34+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.kafka;
18+
19+
import com.google.cloud.pubsublite.CloudRegion;
20+
import com.google.cloud.pubsublite.ProjectId;
21+
import com.google.cloud.pubsublite.ProjectIdOrNumber;
22+
import com.google.cloud.pubsublite.ProjectNumber;
23+
import com.google.cloud.pubsublite.kafka.internal.AuthServer;
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
27+
/** A class providing the correct parameters for connecting a Kafka client to Pub/Sub Lite. */
28+
public final class ClientParameters {
29+
public static Map<String, Object> getProducerParams(ProjectId project, CloudRegion region) {
30+
return getProducerParams(ProjectIdOrNumber.of(project), region);
31+
}
32+
33+
public static Map<String, Object> getProducerParams(ProjectNumber project, CloudRegion region) {
34+
return getProducerParams(ProjectIdOrNumber.of(project), region);
35+
}
36+
37+
public static Map<String, Object> getProducerParams(
38+
ProjectIdOrNumber project, CloudRegion region) {
39+
HashMap<String, Object> params = new HashMap<>();
40+
params.put("enable.idempotence", false);
41+
params.put("bootstrap.servers", getEndpoint(region));
42+
params.put("security.protocol", "SASL_SSL");
43+
params.put("sasl.mechanism", "OAUTHBEARER");
44+
params.put("sasl.oauthbearer.token.endpoint.url", "http://localhost:" + AuthServer.PORT);
45+
params.put("sasl.jaas.config", getJaasConfig(project));
46+
params.put(
47+
"sasl.login.callback.handler.class",
48+
"org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler");
49+
return params;
50+
}
51+
52+
private static String getEndpoint(CloudRegion region) {
53+
return region.value() + "-kafka-pubsub.googleapis.com:443";
54+
}
55+
56+
private static String getJaasConfig(ProjectIdOrNumber project) {
57+
return String.format(
58+
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId=\"unused\" clientSecret=\"unused\" extension_pubsubProject=\"%s\";",
59+
project);
60+
}
61+
62+
private ClientParameters() {}
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.kafka.internal;
18+
19+
import static java.nio.charset.StandardCharsets.UTF_8;
20+
import static java.util.Collections.singletonList;
21+
22+
import com.google.auth.oauth2.AccessToken;
23+
import com.google.auth.oauth2.GoogleCredentials;
24+
import com.google.common.collect.ImmutableMap;
25+
import com.google.gson.Gson;
26+
import com.sun.net.httpserver.HttpServer;
27+
import java.io.IOException;
28+
import java.net.InetAddress;
29+
import java.net.InetSocketAddress;
30+
import java.time.Duration;
31+
import java.time.Instant;
32+
import java.util.Base64;
33+
34+
public class AuthServer {
35+
public static int PORT = 14293;
36+
public static InetSocketAddress ADDRESS =
37+
new InetSocketAddress(InetAddress.getLoopbackAddress(), PORT);
38+
39+
private static final String HEADER =
40+
new Gson().toJson(ImmutableMap.of("typ", "JWT", "alg", "GOOG_TOKEN"));
41+
42+
static {
43+
spawnDaemon();
44+
}
45+
46+
private static String b64Encode(String data) {
47+
return Base64.getUrlEncoder().encodeToString(data.getBytes(UTF_8));
48+
}
49+
50+
private static String getJwt(AccessToken token) {
51+
return new Gson()
52+
.toJson(
53+
ImmutableMap.of(
54+
"exp",
55+
token.getExpirationTime().toInstant().getEpochSecond(),
56+
"iat",
57+
Instant.now().getEpochSecond(),
58+
"scope",
59+
"pubsub",
60+
"sub",
61+
"unused"));
62+
}
63+
64+
private static String getKafkaAccessToken(AccessToken token) {
65+
return String.join(
66+
".", b64Encode(HEADER), b64Encode(getJwt(token)), b64Encode(token.getTokenValue()));
67+
}
68+
69+
private static String getResponse(GoogleCredentials creds) throws IOException {
70+
creds.refreshIfExpired();
71+
AccessToken token = creds.getAccessToken();
72+
long exipiresInSeconds =
73+
Duration.between(Instant.now(), token.getExpirationTime().toInstant()).getSeconds();
74+
return new Gson()
75+
.toJson(
76+
ImmutableMap.of(
77+
"access_token",
78+
getKafkaAccessToken(token),
79+
"token_type",
80+
"bearer",
81+
"expires_in",
82+
Long.toString(exipiresInSeconds)));
83+
}
84+
85+
private static void spawnDaemon() {
86+
// Run spawn() in a daemon thread so the created threads are themselves daemons.
87+
Thread thread = new Thread(AuthServer::spawn);
88+
thread.setDaemon(true);
89+
thread.start();
90+
}
91+
92+
private static void spawn() {
93+
try {
94+
GoogleCredentials creds =
95+
GoogleCredentials.getApplicationDefault()
96+
.createScoped("https://www.googleapis.com/auth/cloud-platform");
97+
HttpServer server = HttpServer.create(ADDRESS, 0);
98+
server.createContext(
99+
"/",
100+
handler -> {
101+
try {
102+
byte[] response = getResponse(creds).getBytes(UTF_8);
103+
handler.getResponseHeaders().put("Content-type", singletonList("text/plain"));
104+
handler.sendResponseHeaders(200, response.length);
105+
handler.getResponseBody().write(response);
106+
handler.close();
107+
} catch (Exception e) {
108+
e.printStackTrace(System.err);
109+
throw new RuntimeException(e);
110+
}
111+
});
112+
server.start();
113+
} catch (Exception e) {
114+
e.printStackTrace(System.err);
115+
throw new RuntimeException(e);
116+
}
117+
}
118+
}

samples/snapshot/pom.xml

+16
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,28 @@
4545
<artifactId>pubsublite-kafka</artifactId>
4646
<version>1.0.5-SNAPSHOT</version>
4747
</dependency>
48+
<dependency>
49+
<groupId>com.google.cloud</groupId>
50+
<artifactId>pubsublite-kafka-auth</artifactId>
51+
<version>1.0.5-SNAPSHOT</version>
52+
</dependency>
53+
<dependency>
54+
<groupId>com.fasterxml.jackson.core</groupId>
55+
<artifactId>jackson-databind</artifactId>
56+
<version>2.14.1</version>
57+
</dependency>
4858
<!-- {x-version-update-end} -->
4959
<dependency>
5060
<groupId>org.apache.kafka</groupId>
5161
<artifactId>kafka-clients</artifactId>
5262
<version>3.3.1</version>
5363
</dependency>
64+
<dependency>
65+
<groupId>org.slf4j</groupId>
66+
<artifactId>slf4j-simple</artifactId>
67+
<version>1.7.36</version>
68+
<scope>test</scope>
69+
</dependency>
5470
<dependency>
5571
<groupId>junit</groupId>
5672
<artifactId>junit</artifactId>

samples/snippets/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@
4747
<artifactId>pubsublite-kafka</artifactId>
4848
<version>1.0.3</version>
4949
</dependency>
50+
<dependency>
51+
<groupId>com.google.cloud</groupId>
52+
<artifactId>pubsublite-kafka-auth</artifactId>
53+
<version>1.0.5-SNAPSHOT</version>
54+
</dependency>
5055
<dependency>
5156
<groupId>org.apache.kafka</groupId>
5257
<artifactId>kafka-clients</artifactId>

0 commit comments

Comments
 (0)