]> Humopery - polarsbop.git/commitdiff
Box<dyn Error> not custom error
authorErik Mackdanz <erikmack@gmail.com>
Wed, 29 Nov 2023 09:13:05 +0000 (03:13 -0600)
committerErik Mackdanz <erikmack@gmail.com>
Wed, 29 Nov 2023 09:13:05 +0000 (03:13 -0600)
src/main.rs

index b9912bd475b1c3b3dfdd76316b447b02ab741db2..2114bf3acc2ff2f699880b578fce64152ec578f8 100644 (file)
@@ -2,27 +2,10 @@ use aws_sdk_s3::primitives::ByteStream;
 use polars::prelude::*;
 use std::error::Error;
 use std::env::args;
-use std::fmt;
 use std::io::Write;
 use std::fs::{create_dir_all,File};
 
-#[derive(Debug)]
-enum PolarsBOPError {
-    NestedPolarsError(PolarsError),
-    FilesystemError(Box<dyn Error>),
-    S3PutError(Box<dyn Error>),
-    S3GetError(Box<dyn Error>),
-}
-
-impl std::fmt::Display for PolarsBOPError {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-       write!(f, "PolarsBOPError")
-    }
-}
-impl std::error::Error for PolarsBOPError {
-}
-
-async fn map_bucket(partition: &str) -> Result<(),PolarsBOPError> {
+async fn map_bucket(partition: &str) -> Result<(),Box<dyn Error>> {
 
     // set some columns to be parsed as numeric
     let mut schema_override = Schema::new();
@@ -44,16 +27,11 @@ async fn map_bucket(partition: &str) -> Result<(),PolarsBOPError> {
        )));
 
     // read and parse from disk
-    let df = LazyCsvReader::new("data/bop.csv")
+    let mut df = LazyCsvReader::new("data/bop.csv")
        .has_header(true)
        .with_dtype_overwrite(schema_override)
        .with_null_values(null_values)
-       .finish();
-
-    if let Err(e) = df {
-       return Err(PolarsBOPError::NestedPolarsError(e));
-    };
-    let mut df = df.unwrap();
+       .finish()?;
 
     // split list based on arg
     match partition {
@@ -126,38 +104,26 @@ async fn map_bucket(partition: &str) -> Result<(),PolarsBOPError> {
        let mask = col("Value").lt(lit(high));
        df_bucket = df_bucket.filter(mask);
 
-       let df_bucket = df_bucket.collect();
-       if let Err(e) = df_bucket {
-           return Err(PolarsBOPError::NestedPolarsError(e));
-       }
-       let mut df_bucket = df_bucket.unwrap();
+       let mut df_bucket = df_bucket.collect()?;
 
        // some console output
        println!("df_bucket: {:?}",df_bucket);
 
        // write to local filesystem
-       if let Err(e) = JsonWriter::new(&mut file)
-           .finish(&mut df_bucket) {
-               return Err(PolarsBOPError::NestedPolarsError(e));
-           };
+       JsonWriter::new(&mut file)
+           .finish(&mut df_bucket)?;
 
        println!("wrote local file {}", outfilepath);
 
        // open local file for writing to s3
-       let bytes = ByteStream::from_path(outfilepath).await;
-       if let Err(e) = bytes {
-           return Err(PolarsBOPError::FilesystemError(Box::new(e)));
-       }
-       let bytes = bytes.unwrap();
+       let bytes = ByteStream::from_path(outfilepath).await?;
 
        // write to s3
-       if let Err(e) = client.put_object()
+       client.put_object()
            .bucket("mackdanz-polars-test")
            .key(outs3key.clone())
            .set_body(Some(bytes))
-           .send().await {
-               return Err(PolarsBOPError::S3PutError(Box::new(e)));
-           }
+           .send().await?;
 
        println!("wrote file to S3 {}", outs3key);
     }
@@ -165,7 +131,7 @@ async fn map_bucket(partition: &str) -> Result<(),PolarsBOPError> {
     Ok(())
 }
 
-async fn reduce_bucket(bucket: &str) -> Result<(),PolarsBOPError> {
+async fn reduce_bucket(bucket: &str) -> Result<(),Box<dyn Error>> {
     // prep input dir
     let bucketdir = format!("bucket-{}",bucket);
     create_dir_all(format!("out/reducebucket/in/{}",bucketdir.clone()))
@@ -182,11 +148,7 @@ async fn reduce_bucket(bucket: &str) -> Result<(),PolarsBOPError> {
        let downloadfilepath = format!(
            "out/reducebucket/in/bucket-{}/partition-{}.json",
            bucket, partition);
-       let downloadfile = File::create(downloadfilepath);
-       if let Err(e) = downloadfile {
-           return Err(PolarsBOPError::FilesystemError(Box::new(e)));
-       }
-       let mut downloadfile = downloadfile.unwrap();
+       let mut downloadfile = File::create(downloadfilepath)?;
 
        let ins3key =
            format!("mapbucket/bucket-{}/partition-{}.json",
@@ -196,28 +158,16 @@ async fn reduce_bucket(bucket: &str) -> Result<(),PolarsBOPError> {
        let getobjresult = client.get_object()
            .bucket("mackdanz-polars-test")
            .key(ins3key.clone())
-           .send().await;
-
-       if let Err(e) = getobjresult {
-           return Err(PolarsBOPError::S3GetError(Box::new(e)));
-       }
-       let getobjresult = getobjresult.unwrap();
+           .send().await?;
 
        // stream body to local filesystem
        // https://docs.aws.amazon.com/sdk-for-rust/latest/dg/rust_s3_code_examples.html
        let mut body = getobjresult.body;
        loop {
-           let bytes = body.try_next().await;
-           if let Err(e) = bytes {
-               return Err(PolarsBOPError::S3GetError(Box::new(e)));
-           }
-           let bytes = bytes.unwrap();
+           let bytes = body.try_next().await?;
 
            if let Some(b) = bytes {
-               let writeres = downloadfile.write(&b);
-               if let Err(e) = writeres {
-                   return Err(PolarsBOPError::FilesystemError(Box::new(e)));
-               }
+               let _ = downloadfile.write(&b)?;
            } else {
                break;
            }
@@ -232,7 +182,7 @@ async fn reduce_bucket(bucket: &str) -> Result<(),PolarsBOPError> {
 }
 
 #[::tokio::main]
-async fn main() -> Result<(),PolarsBOPError> {
+async fn main() -> Result<(),Box<dyn Error>> {
 
     let mut argsiter = args();
     // first arg is program name