From b566773e677a843e50f4093184a6f450000d9854 Mon Sep 17 00:00:00 2001 From: Erik Mackdanz Date: Wed, 29 Nov 2023 03:13:05 -0600 Subject: [PATCH] Box not custom error --- src/main.rs | 80 ++++++++++------------------------------------------- 1 file changed, 15 insertions(+), 65 deletions(-) diff --git a/src/main.rs b/src/main.rs index b9912bd..2114bf3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,27 +2,10 @@ use aws_sdk_s3::primitives::ByteStream; use polars::prelude::*; use std::error::Error; use std::env::args; -use std::fmt; use std::io::Write; use std::fs::{create_dir_all,File}; -#[derive(Debug)] -enum PolarsBOPError { - NestedPolarsError(PolarsError), - FilesystemError(Box), - S3PutError(Box), - S3GetError(Box), -} - -impl std::fmt::Display for PolarsBOPError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "PolarsBOPError") - } -} -impl std::error::Error for PolarsBOPError { -} - -async fn map_bucket(partition: &str) -> Result<(),PolarsBOPError> { +async fn map_bucket(partition: &str) -> Result<(),Box> { // set some columns to be parsed as numeric let mut schema_override = Schema::new(); @@ -44,16 +27,11 @@ async fn map_bucket(partition: &str) -> Result<(),PolarsBOPError> { ))); // read and parse from disk - let df = LazyCsvReader::new("data/bop.csv") + let mut df = LazyCsvReader::new("data/bop.csv") .has_header(true) .with_dtype_overwrite(schema_override) .with_null_values(null_values) - .finish(); - - if let Err(e) = df { - return Err(PolarsBOPError::NestedPolarsError(e)); - }; - let mut df = df.unwrap(); + .finish()?; // split list based on arg match partition { @@ -126,38 +104,26 @@ async fn map_bucket(partition: &str) -> Result<(),PolarsBOPError> { let mask = col("Value").lt(lit(high)); df_bucket = df_bucket.filter(mask); - let df_bucket = df_bucket.collect(); - if let Err(e) = df_bucket { - return Err(PolarsBOPError::NestedPolarsError(e)); - } - let mut df_bucket = df_bucket.unwrap(); + let mut df_bucket = df_bucket.collect()?; // some console output println!("df_bucket: {:?}",df_bucket); // write to local filesystem - if let Err(e) = JsonWriter::new(&mut file) - .finish(&mut df_bucket) { - return Err(PolarsBOPError::NestedPolarsError(e)); - }; + JsonWriter::new(&mut file) + .finish(&mut df_bucket)?; println!("wrote local file {}", outfilepath); // open local file for writing to s3 - let bytes = ByteStream::from_path(outfilepath).await; - if let Err(e) = bytes { - return Err(PolarsBOPError::FilesystemError(Box::new(e))); - } - let bytes = bytes.unwrap(); + let bytes = ByteStream::from_path(outfilepath).await?; // write to s3 - if let Err(e) = client.put_object() + client.put_object() .bucket("mackdanz-polars-test") .key(outs3key.clone()) .set_body(Some(bytes)) - .send().await { - return Err(PolarsBOPError::S3PutError(Box::new(e))); - } + .send().await?; println!("wrote file to S3 {}", outs3key); } @@ -165,7 +131,7 @@ async fn map_bucket(partition: &str) -> Result<(),PolarsBOPError> { Ok(()) } -async fn reduce_bucket(bucket: &str) -> Result<(),PolarsBOPError> { +async fn reduce_bucket(bucket: &str) -> Result<(),Box> { // prep input dir let bucketdir = format!("bucket-{}",bucket); create_dir_all(format!("out/reducebucket/in/{}",bucketdir.clone())) @@ -182,11 +148,7 @@ async fn reduce_bucket(bucket: &str) -> Result<(),PolarsBOPError> { let downloadfilepath = format!( "out/reducebucket/in/bucket-{}/partition-{}.json", bucket, partition); - let downloadfile = File::create(downloadfilepath); - if let Err(e) = downloadfile { - return Err(PolarsBOPError::FilesystemError(Box::new(e))); - } - let mut downloadfile = downloadfile.unwrap(); + let mut downloadfile = File::create(downloadfilepath)?; let ins3key = format!("mapbucket/bucket-{}/partition-{}.json", @@ -196,28 +158,16 @@ async fn reduce_bucket(bucket: &str) -> Result<(),PolarsBOPError> { let getobjresult = client.get_object() .bucket("mackdanz-polars-test") .key(ins3key.clone()) - .send().await; - - if let Err(e) = getobjresult { - return Err(PolarsBOPError::S3GetError(Box::new(e))); - } - let getobjresult = getobjresult.unwrap(); + .send().await?; // stream body to local filesystem // https://docs.aws.amazon.com/sdk-for-rust/latest/dg/rust_s3_code_examples.html let mut body = getobjresult.body; loop { - let bytes = body.try_next().await; - if let Err(e) = bytes { - return Err(PolarsBOPError::S3GetError(Box::new(e))); - } - let bytes = bytes.unwrap(); + let bytes = body.try_next().await?; if let Some(b) = bytes { - let writeres = downloadfile.write(&b); - if let Err(e) = writeres { - return Err(PolarsBOPError::FilesystemError(Box::new(e))); - } + let _ = downloadfile.write(&b)?; } else { break; } @@ -232,7 +182,7 @@ async fn reduce_bucket(bucket: &str) -> Result<(),PolarsBOPError> { } #[::tokio::main] -async fn main() -> Result<(),PolarsBOPError> { +async fn main() -> Result<(),Box> { let mut argsiter = args(); // first arg is program name -- 2.52.0