]> Humopery - polarsbop.git/commitdiff
for reduce, download partitions to local filesystem
authorErik Mackdanz <erikmack@gmail.com>
Wed, 29 Nov 2023 04:36:09 +0000 (22:36 -0600)
committerErik Mackdanz <erikmack@gmail.com>
Wed, 29 Nov 2023 04:36:09 +0000 (22:36 -0600)
Also implement a custom error

Cargo.lock
src/main.rs

index 3bc30aa3cf48a57fe3f54a3ad6cebd89dabd2757..cd2b42fb9960810d272fc48e8ec277cdef398410 100644 (file)
@@ -348,9 +348,9 @@ dependencies = [
 
 [[package]]
 name = "aws-smithy-async"
-version = "1.0.1"
+version = "1.0.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1fbfa248f7f966d73e325dbc85851a5500042b6d96e3c3b535a8527707f36fe4"
+checksum = "3e37ca17d25fe1e210b6d4bdf59b81caebfe99f986201a1228cb5061233b4b13"
 dependencies = [
  "futures-util",
  "pin-project-lite",
@@ -455,9 +455,9 @@ dependencies = [
 
 [[package]]
 name = "aws-smithy-runtime-api"
-version = "1.0.1"
+version = "1.0.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4d27c3235d4972ed976b5c1a82286e7c4457f618f3c2ae6d4ae44f081dd24575"
+checksum = "c6cebff0d977b6b6feed2fd07db52aac58ba3ccaf26cdd49f1af4add5061bef9"
 dependencies = [
  "aws-smithy-async",
  "aws-smithy-types",
@@ -471,9 +471,9 @@ dependencies = [
 
 [[package]]
 name = "aws-smithy-types"
-version = "1.0.1"
+version = "1.0.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d2fc32035dc0636a8583cf0c6dd7f1e6d5404103b836d26228b8730907a88d9f"
+checksum = "d7f48b3f27ddb40ab19892a5abda331f403e3cb877965e4e51171447807104af"
 dependencies = [
  "base64-simd",
  "bytes",
index 788a01fe7e204858ef37b5fdc9de9bd52b149938..b9912bd475b1c3b3dfdd76316b447b02ab741db2 100644 (file)
@@ -1,9 +1,28 @@
 use aws_sdk_s3::primitives::ByteStream;
 use polars::prelude::*;
+use std::error::Error;
 use std::env::args;
+use std::fmt;
+use std::io::Write;
 use std::fs::{create_dir_all,File};
 
-async fn map_bucket(partition: &str) -> Result<(),PolarsError> {
+#[derive(Debug)]
+enum PolarsBOPError {
+    NestedPolarsError(PolarsError),
+    FilesystemError(Box<dyn Error>),
+    S3PutError(Box<dyn Error>),
+    S3GetError(Box<dyn Error>),
+}
+
+impl std::fmt::Display for PolarsBOPError {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+       write!(f, "PolarsBOPError")
+    }
+}
+impl std::error::Error for PolarsBOPError {
+}
+
+async fn map_bucket(partition: &str) -> Result<(),PolarsBOPError> {
 
     // set some columns to be parsed as numeric
     let mut schema_override = Schema::new();
@@ -25,11 +44,16 @@ async fn map_bucket(partition: &str) -> Result<(),PolarsError> {
        )));
 
     // read and parse from disk
-    let mut df = LazyCsvReader::new("data/bop.csv")
+    let df = LazyCsvReader::new("data/bop.csv")
        .has_header(true)
        .with_dtype_overwrite(schema_override)
        .with_null_values(null_values)
-       .finish()?;
+       .finish();
+
+    if let Err(e) = df {
+       return Err(PolarsBOPError::NestedPolarsError(e));
+    };
+    let mut df = df.unwrap();
 
     // split list based on arg
     match partition {
@@ -102,24 +126,38 @@ async fn map_bucket(partition: &str) -> Result<(),PolarsError> {
        let mask = col("Value").lt(lit(high));
        df_bucket = df_bucket.filter(mask);
 
-       let mut df_bucket = df_bucket.collect()?;
+       let df_bucket = df_bucket.collect();
+       if let Err(e) = df_bucket {
+           return Err(PolarsBOPError::NestedPolarsError(e));
+       }
+       let mut df_bucket = df_bucket.unwrap();
 
        // some console output
        println!("df_bucket: {:?}",df_bucket);
 
        // write to local filesystem
-       JsonWriter::new(&mut file)
-           .finish(&mut df_bucket)?;
+       if let Err(e) = JsonWriter::new(&mut file)
+           .finish(&mut df_bucket) {
+               return Err(PolarsBOPError::NestedPolarsError(e));
+           };
 
        println!("wrote local file {}", outfilepath);
 
+       // open local file for writing to s3
+       let bytes = ByteStream::from_path(outfilepath).await;
+       if let Err(e) = bytes {
+           return Err(PolarsBOPError::FilesystemError(Box::new(e)));
+       }
+       let bytes = bytes.unwrap();
+
        // write to s3
-       let _ = client.put_object()
+       if let Err(e) = client.put_object()
            .bucket("mackdanz-polars-test")
            .key(outs3key.clone())
-           .set_body(Some(ByteStream::from_path(outfilepath)
-                          .await.unwrap()))
-           .send().await;
+           .set_body(Some(bytes))
+           .send().await {
+               return Err(PolarsBOPError::S3PutError(Box::new(e)));
+           }
 
        println!("wrote file to S3 {}", outs3key);
     }
@@ -127,12 +165,74 @@ async fn map_bucket(partition: &str) -> Result<(),PolarsError> {
     Ok(())
 }
 
-async fn reduce_bucket(bucket: &str) -> Result<(),PolarsError> {
+async fn reduce_bucket(bucket: &str) -> Result<(),PolarsBOPError> {
+    // prep input dir
+    let bucketdir = format!("bucket-{}",bucket);
+    create_dir_all(format!("out/reducebucket/in/{}",bucketdir.clone()))
+       .expect("couldn't create input dir");
+
+    // prep S3 client
+    let config = aws_config::load_from_env().await;
+    let client = aws_sdk_s3::Client::new(&config);
+
+    // there are three input files for each income price bucket
+    for partition in [0,1,2] {
+       println!("downloading partition {}",partition);
+
+       let downloadfilepath = format!(
+           "out/reducebucket/in/bucket-{}/partition-{}.json",
+           bucket, partition);
+       let downloadfile = File::create(downloadfilepath);
+       if let Err(e) = downloadfile {
+           return Err(PolarsBOPError::FilesystemError(Box::new(e)));
+       }
+       let mut downloadfile = downloadfile.unwrap();
+
+       let ins3key =
+           format!("mapbucket/bucket-{}/partition-{}.json",
+                   bucket,partition);
+
+       // call S3 get_object
+       let getobjresult = client.get_object()
+           .bucket("mackdanz-polars-test")
+           .key(ins3key.clone())
+           .send().await;
+
+       if let Err(e) = getobjresult {
+           return Err(PolarsBOPError::S3GetError(Box::new(e)));
+       }
+       let getobjresult = getobjresult.unwrap();
+
+       // stream body to local filesystem
+       // https://docs.aws.amazon.com/sdk-for-rust/latest/dg/rust_s3_code_examples.html
+       let mut body = getobjresult.body;
+       loop {
+           let bytes = body.try_next().await;
+           if let Err(e) = bytes {
+               return Err(PolarsBOPError::S3GetError(Box::new(e)));
+           }
+           let bytes = bytes.unwrap();
+
+           if let Some(b) = bytes {
+               let writeres = downloadfile.write(&b);
+               if let Err(e) = writeres {
+                   return Err(PolarsBOPError::FilesystemError(Box::new(e)));
+               }
+           } else {
+               break;
+           }
+       }
+    }
+
+    // todo: read partitions to DF
+    // todo: concat partitions in bucket
+    // todo: some aggregation on my bucket
+
     Ok(())
 }
 
 #[::tokio::main]
-async fn main() -> Result<(),PolarsError> {
+async fn main() -> Result<(),PolarsBOPError> {
 
     let mut argsiter = args();
     // first arg is program name
@@ -161,4 +261,3 @@ async fn main() -> Result<(),PolarsError> {
     }
 
 }
-