Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integrations/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ path = "tests/behavior/main.rs"
async-trait = "0.1"
bytes = "1"
futures = "0.3"
object_store = "0.11"
object_store = "0.12"
opendal = { version = "0.53.0", path = "../../core", default-features = false }
pin-project = "1.1"
send_wrapper = { version = "0.6", features = ["futures"], optional = true }
tokio = { version = "1", default-features = false }

[dev-dependencies]
anyhow = "1.0.86"
datafusion = "46.0.1"
datafusion = "47.0.0"
libtest-mimic = "0.8.1"
opendal = { version = "0.53.0", path = "../../core", features = [
"services-memory",
Expand Down
28 changes: 15 additions & 13 deletions integrations/object_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl ObjectStore for OpendalStore {
let meta = ObjectMeta {
location: location.clone(),
last_modified: meta.last_modified().unwrap_or_default(),
size: meta.content_length() as usize,
size: meta.content_length(),
e_tag: meta.etag().map(|x| x.to_string()),
version: meta.version().map(|x| x.to_string()),
};
Expand Down Expand Up @@ -302,7 +302,7 @@ impl ObjectStore for OpendalStore {
};

let stream = reader
.into_bytes_stream(read_range.start as u64..read_range.end as u64)
.into_bytes_stream(read_range.start..read_range.end)
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?
Expand Down Expand Up @@ -330,16 +330,14 @@ impl ObjectStore for OpendalStore {
Ok(())
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
// object_store `Path` always removes trailing slash
// need to add it back
let path = prefix.map_or("".into(), |x| format!("{}/", x));

let lister_fut = self.inner.lister_with(&path).recursive(true);
let fut = async move {
let stream = self
.inner
.lister_with(&path)
.recursive(true)
let stream = lister_fut
.await
.map_err(|err| format_object_store_error(err, &path))?;

Expand All @@ -359,13 +357,17 @@ impl ObjectStore for OpendalStore {
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
let path = prefix.map_or("".into(), |x| format!("{}/", x));
let offset = offset.clone();

// clone self for 'static lifetime
// clone self is cheap
let this = self.clone();

let fut = async move {
let list_with_start_after = self.inner.info().full_capability().list_with_start_after;
let mut fut = self.inner.lister_with(&path).recursive(true);
let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
let mut fut = this.inner.lister_with(&path).recursive(true);

// Use native start_after support if possible.
if list_with_start_after {
Expand All @@ -377,7 +379,7 @@ impl ObjectStore for OpendalStore {
.map_err(|err| format_object_store_error(err, &path))?
.then(move |entry| {
let path = path.clone();

let this = this.clone();
async move {
let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
let (path, metadata) = entry.into_parts();
Expand All @@ -388,7 +390,7 @@ impl ObjectStore for OpendalStore {
return Ok(object_meta);
}

let metadata = self
let metadata = this
.inner
.stat(&path)
.await
Expand Down Expand Up @@ -652,7 +654,7 @@ mod tests {

let meta = object_store.head(&path).await.unwrap();

assert_eq!(meta.size, all_bytes.len());
assert_eq!(meta.size, all_bytes.len() as u64);

assert_eq!(
object_store
Expand Down
2 changes: 1 addition & 1 deletion integrations/object_store/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub fn format_object_meta(path: &str, meta: &Metadata) -> ObjectMeta {
ObjectMeta {
location: path.into(),
last_modified: meta.last_modified().unwrap_or_default(),
size: meta.content_length() as usize,
size: meta.content_length(),
e_tag: meta.etag().map(|x| x.to_string()),
version: meta.version().map(|x| x.to_string()),
}
Expand Down
8 changes: 4 additions & 4 deletions integrations/object_store/tests/behavior/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub async fn test_basic_get(store: OpendalStore) -> Result<()> {

let ret = store.get(&location).await?;

assert_eq!(0..value.len(), ret.range);
assert_eq!(0..value.len() as u64, ret.range);
let data = ret.bytes().await?;
assert_eq!(value, data);

Expand All @@ -94,7 +94,7 @@ pub async fn test_head(store: OpendalStore) -> Result<()> {

let meta = store.head(&location).await?;

assert_eq!(meta.size, value.len());
assert_eq!(meta.size, value.len() as u64);
assert_eq!(meta.location, location);

store.delete(&location).await?;
Expand Down Expand Up @@ -160,7 +160,7 @@ pub async fn test_get_opts_with_invalid_range(store: OpendalStore) -> Result<()>
..Default::default()
};
let ret = store.get_opts(&location, opts).await?;
assert_eq!(ret.range, 0..value.len());
assert_eq!(ret.range, 0..value.len() as u64);
assert_eq!(ret.bytes().await?, value);

// the offset of the range is greater than the size of the object
Expand All @@ -177,7 +177,7 @@ pub async fn test_get_opts_with_invalid_range(store: OpendalStore) -> Result<()>
..Default::default()
};
let ret = store.get_opts(&location, opts).await?;
assert_eq!(ret.range, 0..value.len());
assert_eq!(ret.range, 0..value.len() as u64);
assert_eq!(ret.bytes().await?, value);

store.delete(&location).await?;
Expand Down
Loading