From 675cbf8b8210de339341b7a22c60d885e890dca2 Mon Sep 17 00:00:00 2001 From: wilhelmagren Date: Fri, 17 May 2024 01:07:25 +0200 Subject: [PATCH 1/2] [feat] AsyncArrowWriter and tokio --- src/cli.rs | 8 +++-- src/converter.rs | 57 +++++++++++++++++++----------------- src/main.rs | 5 ++-- src/writer.rs | 76 +++++++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 114 insertions(+), 32 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 616b8e2..8570e06 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -233,7 +233,7 @@ enum Commands { } impl Cli { - pub fn run(&self) -> Result<()> { + pub async fn run(&self) -> Result<()> { let n_threads: usize = get_available_threads(self.n_threads); match &self.command { @@ -251,8 +251,10 @@ impl Cli { .with_num_threads(n_threads) .with_buffer_size(*buffer_size) .with_thread_channel_capacity(*thread_channel_capacity) - .build()? - .convert()?; + .build() + .await? + .convert() + .await?; } #[cfg(feature = "rayon")] Commands::CConvert { diff --git a/src/converter.rs b/src/converter.rs index e975293..d960e8d 100644 --- a/src/converter.rs +++ b/src/converter.rs @@ -22,7 +22,7 @@ // SOFTWARE. // // File created: 2024-02-17 -// Last updated: 2024-05-15 +// Last updated: 2024-05-17 // use arrow::array::ArrayRef; @@ -52,7 +52,7 @@ use crate::error::{Result, SetupError}; use crate::mocker::NUM_CHARS_FOR_NEWLINE; use crate::schema::FixedSchema; use crate::slicer::Slicer; -use crate::writer::{ParquetWriter, Writer}; +use crate::writer::{AsyncParquetWriter, ParquetWriter, Writer}; /// The size of the converter read buffer (in bytes). pub(crate) static CONVERTER_SLICE_BUFFER_SIZE: usize = 512 * 1024 * 1024; @@ -70,11 +70,10 @@ pub(crate) static CONVERTER_THREAD_CHANNEL_CAPACITY: usize = 128; /// for, the buffer will simply allocate more memory when needed. pub(crate) static CONVERTER_LINE_BREAKS_BUFFER_SIZE: usize = 128 * 1024 * 1024; -#[derive(Debug)] pub struct Converter { in_file: File, schema: FixedSchema, - writer: Box, + writer: AsyncParquetWriter, slicer: Slicer, n_threads: usize, multithreaded: bool, @@ -93,7 +92,7 @@ impl Converter { } } - pub fn convert(&mut self) -> Result<()> { + pub async fn convert(&mut self) -> Result<()> { if self.multithreaded { #[cfg(feature = "rayon")] info!( @@ -106,10 +105,10 @@ impl Converter { "Converting in standard multithreaded mode using {} threads.", self.n_threads ); - self.convert_multithreaded()? + self.convert_multithreaded().await? } else { info!("Converting in single-threaded mode."); - self.convert_single_threaded()? + self.convert_single_threaded().await? } Ok(()) @@ -118,7 +117,7 @@ impl Converter { /// /// # Panics /// If could not move cursor back. - fn convert_multithreaded(&mut self) -> Result<()> { + async fn convert_multithreaded(&mut self) -> Result<()> { let bytes_to_read: usize = self.in_file.metadata()?.len() as usize; info!( @@ -185,7 +184,7 @@ impl Converter { &mut worker_line_break_indices, ); - self.spawn_convert_threads(&buffer, &line_break_indices, &worker_line_break_indices)?; + self.spawn_convert_threads(&buffer, &line_break_indices, &worker_line_break_indices).await?; bytes_processed += buffer_capacity - n_bytes_left_after_line_break; bytes_overlapped += n_bytes_left_after_line_break; @@ -205,7 +204,7 @@ impl Converter { #[cfg(debug_assertions)] debug!("Finishing and closing writer."); - self.writer.finish()?; + self.writer.finish().await?; #[cfg(feature = "rayon")] info!( @@ -230,7 +229,7 @@ impl Converter { } #[cfg(feature = "rayon")] - fn spawn_convert_threads( + async fn spawn_convert_threads( &mut self, buffer: &Vec, line_breaks: &[usize], @@ -263,7 +262,7 @@ impl Converter { #[cfg(debug_assertions)] debug!("Starting master writer thread."); - master_thread_write(receiver, &mut self.writer)?; + master_thread_write(receiver, &mut self.writer).await?; Ok(()) } @@ -332,7 +331,7 @@ impl Converter { /// We use [`libc::memset`] to directly write to and 'reset' the buffer in memory. /// This can cause a Segmentation fault if e.g. the memory is not allocated properly, /// or we are trying to write outside of the allocated memory area. - fn convert_single_threaded(&mut self) -> Result<()> { + async fn convert_single_threaded(&mut self) -> Result<()> { let bytes_to_read: usize = self.in_file.metadata()?.len() as usize; info!( @@ -425,7 +424,7 @@ impl Converter { let batch = RecordBatch::try_from_iter(builder_write_buffer)?; - self.writer.write_batch(&batch)?; + self.writer.write_batch(&batch).await?; #[cfg(debug_assertions)] debug!("Bytes processed: {}", bytes_processed); @@ -433,7 +432,7 @@ impl Converter { debug!("Remaining bytes: {}", remaining_bytes); } - self.writer.finish()?; + self.writer.finish().await?; info!("Done converting in single-threaded mode!"); @@ -503,12 +502,12 @@ pub fn worker_thread_convert( drop(channel); } -pub fn master_thread_write( +pub async fn master_thread_write( channel: channel::Receiver, - writer: &mut Box, + writer: &mut AsyncParquetWriter, ) -> Result<()> { for record_batch in channel { - writer.write_batch(&record_batch)?; + writer.write_batch(&record_batch).await?; drop(record_batch); } @@ -575,7 +574,7 @@ impl ConverterBuilder { /// /// # Panics /// ... - pub fn build(self) -> Result { + pub async fn build(self) -> Result { let in_file: File = match self.in_file { Some(p) => File::open(p)?, None => { @@ -637,13 +636,19 @@ impl ConverterBuilder { let arrow_schema: ArrowSchemaRef = schema.as_arrow_schema_ref(); - let writer: Box = Box::new( - ParquetWriter::builder() - .with_out_file(out_file) - .with_properties(properties) - .with_arrow_schema(arrow_schema) - .build()?, - ); + /* + let writer: ParquetWriter = ParquetWriter::builder() + .with_out_file(out_file) + .with_properties(properties) + .with_arrow_schema(arrow_schema) + .build()?; + */ + let writer: AsyncParquetWriter = AsyncParquetWriter::builder() + .with_out_file(out_file) + .with_properties(properties) + .with_arrow_schema(arrow_schema) + .build() + .await?; let slicer = Slicer::builder().num_threads(n_threads).build()?; diff --git a/src/main.rs b/src/main.rs index e046e74..9e43f21 100644 --- a/src/main.rs +++ b/src/main.rs @@ -53,7 +53,8 @@ mod writer; use cli::Cli; /// Run the evolution program, parsing any CLI arguments using [`clap`]. -fn main() { +#[tokio::main] +async fn main() { let cli = Cli::parse(); match logger::try_init_logging() { @@ -64,7 +65,7 @@ fn main() { Err(e) => error!("Could not set up env logging: {:?}", e), }; - match cli.run() { + match cli.run().await { Ok(_) => info!("All done! Bye. 👋🥳"), Err(e) => error!("{}", e), } diff --git a/src/writer.rs b/src/writer.rs index 99bcd0e..338fccf 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -22,13 +22,16 @@ // SOFTWARE. // // File created: 2024-05-05 -// Last updated: 2024-05-15 +// Last updated: 2024-05-17 // use arrow::datatypes::SchemaRef as ArrowSchemaRef; use arrow::record_batch::RecordBatch; use parquet::arrow::ArrowWriter; +use parquet::arrow::async_writer::AsyncArrowWriter; use parquet::file::properties::WriterProperties as ArrowWriterProperties; +use tokio::fs::File as AsyncFile; +use tokio::fs::OpenOptions as AsyncOpenOptions; use std::fmt::Debug; use std::fs::{File, OpenOptions}; @@ -179,6 +182,77 @@ impl Writer for FixedLengthFileWriter { } } +pub(crate) struct AsyncParquetWriter { + inner: AsyncArrowWriter, +} + +impl AsyncParquetWriter { + pub fn builder() -> AsyncParquetWriterBuilder { + AsyncParquetWriterBuilder { + ..Default::default() + } + } + + pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> { + self.inner.write(batch).await?; + Ok(()) + } + + pub async fn finish(&mut self) -> Result<()> { + self.inner.flush().await?; + Ok(()) + } +} + +#[derive(Default)] +pub(crate) struct AsyncParquetWriterBuilder { + out_file: Option, + schema: Option, + properties: Option, +} + +impl AsyncParquetWriterBuilder { + pub fn with_out_file(mut self, path: PathBuf) -> Self { + self.out_file = Some(path); + self + } + + pub fn with_properties(mut self, properties: ArrowWriterProperties) -> Self { + self.properties = Some(properties); + self + } + + pub fn with_arrow_schema(mut self, schema: ArrowSchemaRef) -> Self { + self.schema = Some(schema); + self + } + + pub async fn build(self) -> Result { + let out_file: PathBuf = self + .out_file + .ok_or("Required field 'out_file' is missing or None.")?; + + let writer_file: AsyncFile = AsyncOpenOptions::new() + .create(true) + .append(true) + .open(out_file) + .await?; + + let properties: ArrowWriterProperties = self + .properties + .ok_or("Required field 'properties' is missing or None.")?; + + let schema: ArrowSchemaRef = self + .schema + .ok_or("Required field 'schema' is missing or None.")?; + + let buffer: Vec = Vec::with_capacity(10); + + let inner: AsyncArrowWriter = AsyncArrowWriter::try_new(writer_file, schema, Some(properties))?; + + Ok(AsyncParquetWriter { inner }) + } +} #[derive(Debug)] pub(crate) struct ParquetWriter { inner: ArrowWriter, From 91c997b4e790886877729f4bf5e8c94d410afba5 Mon Sep 17 00:00:00 2001 From: wilhelmagren Date: Fri, 17 May 2024 01:07:33 +0200 Subject: [PATCH 2/2] [build] add tokio deps --- Cargo.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index a1b757c..cfbe8b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,8 +42,9 @@ atoi_simd = { version = "0.15.6", optional = true } arrow2 = "0.18.0" libc = "0.2.154" arrow = "51.0.0" -parquet = "51.0.0" +parquet = { version = "51.0.0", features = ["async"] } padder = { version = "1.2.0", features = ["serde"] } +tokio = { version = "1.37.0", features = ["fs", "rt-multi-thread"] } [dev-dependencies] glob = "0.3.1"