From: Erik Mackdanz Date: Wed, 29 Nov 2023 04:36:09 +0000 (-0600) Subject: for reduce, download partitions to local filesystem X-Git-Url: https://git.humopery.space/?a=commitdiff_plain;h=dfb483564d76c989313acd3d3cc36d0cc1519191;p=polarsbop.git for reduce, download partitions to local filesystem Also implement a custom error --- diff --git a/Cargo.lock b/Cargo.lock index 3bc30aa..cd2b42f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -348,9 +348,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fbfa248f7f966d73e325dbc85851a5500042b6d96e3c3b535a8527707f36fe4" +checksum = "3e37ca17d25fe1e210b6d4bdf59b81caebfe99f986201a1228cb5061233b4b13" dependencies = [ "futures-util", "pin-project-lite", @@ -455,9 +455,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d27c3235d4972ed976b5c1a82286e7c4457f618f3c2ae6d4ae44f081dd24575" +checksum = "c6cebff0d977b6b6feed2fd07db52aac58ba3ccaf26cdd49f1af4add5061bef9" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -471,9 +471,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2fc32035dc0636a8583cf0c6dd7f1e6d5404103b836d26228b8730907a88d9f" +checksum = "d7f48b3f27ddb40ab19892a5abda331f403e3cb877965e4e51171447807104af" dependencies = [ "base64-simd", "bytes", diff --git a/src/main.rs b/src/main.rs index 788a01f..b9912bd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,28 @@ use aws_sdk_s3::primitives::ByteStream; use polars::prelude::*; +use std::error::Error; use std::env::args; +use std::fmt; +use std::io::Write; use std::fs::{create_dir_all,File}; -async fn map_bucket(partition: &str) -> Result<(),PolarsError> { +#[derive(Debug)] +enum PolarsBOPError { + NestedPolarsError(PolarsError), + FilesystemError(Box), + S3PutError(Box), + S3GetError(Box), +} + +impl std::fmt::Display for PolarsBOPError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "PolarsBOPError") + } +} +impl std::error::Error for PolarsBOPError { +} + +async fn map_bucket(partition: &str) -> Result<(),PolarsBOPError> { // set some columns to be parsed as numeric let mut schema_override = Schema::new(); @@ -25,11 +44,16 @@ async fn map_bucket(partition: &str) -> Result<(),PolarsError> { ))); // read and parse from disk - let mut df = LazyCsvReader::new("data/bop.csv") + let df = LazyCsvReader::new("data/bop.csv") .has_header(true) .with_dtype_overwrite(schema_override) .with_null_values(null_values) - .finish()?; + .finish(); + + if let Err(e) = df { + return Err(PolarsBOPError::NestedPolarsError(e)); + }; + let mut df = df.unwrap(); // split list based on arg match partition { @@ -102,24 +126,38 @@ async fn map_bucket(partition: &str) -> Result<(),PolarsError> { let mask = col("Value").lt(lit(high)); df_bucket = df_bucket.filter(mask); - let mut df_bucket = df_bucket.collect()?; + let df_bucket = df_bucket.collect(); + if let Err(e) = df_bucket { + return Err(PolarsBOPError::NestedPolarsError(e)); + } + let mut df_bucket = df_bucket.unwrap(); // some console output println!("df_bucket: {:?}",df_bucket); // write to local filesystem - JsonWriter::new(&mut file) - .finish(&mut df_bucket)?; + if let Err(e) = JsonWriter::new(&mut file) + .finish(&mut df_bucket) { + return Err(PolarsBOPError::NestedPolarsError(e)); + }; println!("wrote local file {}", outfilepath); + // open local file for writing to s3 + let bytes = ByteStream::from_path(outfilepath).await; + if let Err(e) = bytes { + return Err(PolarsBOPError::FilesystemError(Box::new(e))); + } + let bytes = bytes.unwrap(); + // write to s3 - let _ = client.put_object() + if let Err(e) = client.put_object() .bucket("mackdanz-polars-test") .key(outs3key.clone()) - .set_body(Some(ByteStream::from_path(outfilepath) - .await.unwrap())) - .send().await; + .set_body(Some(bytes)) + .send().await { + return Err(PolarsBOPError::S3PutError(Box::new(e))); + } println!("wrote file to S3 {}", outs3key); } @@ -127,12 +165,74 @@ async fn map_bucket(partition: &str) -> Result<(),PolarsError> { Ok(()) } -async fn reduce_bucket(bucket: &str) -> Result<(),PolarsError> { +async fn reduce_bucket(bucket: &str) -> Result<(),PolarsBOPError> { + // prep input dir + let bucketdir = format!("bucket-{}",bucket); + create_dir_all(format!("out/reducebucket/in/{}",bucketdir.clone())) + .expect("couldn't create input dir"); + + // prep S3 client + let config = aws_config::load_from_env().await; + let client = aws_sdk_s3::Client::new(&config); + + // there are three input files for each income price bucket + for partition in [0,1,2] { + println!("downloading partition {}",partition); + + let downloadfilepath = format!( + "out/reducebucket/in/bucket-{}/partition-{}.json", + bucket, partition); + let downloadfile = File::create(downloadfilepath); + if let Err(e) = downloadfile { + return Err(PolarsBOPError::FilesystemError(Box::new(e))); + } + let mut downloadfile = downloadfile.unwrap(); + + let ins3key = + format!("mapbucket/bucket-{}/partition-{}.json", + bucket,partition); + + // call S3 get_object + let getobjresult = client.get_object() + .bucket("mackdanz-polars-test") + .key(ins3key.clone()) + .send().await; + + if let Err(e) = getobjresult { + return Err(PolarsBOPError::S3GetError(Box::new(e))); + } + let getobjresult = getobjresult.unwrap(); + + // stream body to local filesystem + // https://docs.aws.amazon.com/sdk-for-rust/latest/dg/rust_s3_code_examples.html + let mut body = getobjresult.body; + loop { + let bytes = body.try_next().await; + if let Err(e) = bytes { + return Err(PolarsBOPError::S3GetError(Box::new(e))); + } + let bytes = bytes.unwrap(); + + if let Some(b) = bytes { + let writeres = downloadfile.write(&b); + if let Err(e) = writeres { + return Err(PolarsBOPError::FilesystemError(Box::new(e))); + } + } else { + break; + } + } + } + + // todo: read partitions to DF + // todo: concat partitions in bucket + // todo: some aggregation on my bucket + Ok(()) } #[::tokio::main] -async fn main() -> Result<(),PolarsError> { +async fn main() -> Result<(),PolarsBOPError> { let mut argsiter = args(); // first arg is program name @@ -161,4 +261,3 @@ async fn main() -> Result<(),PolarsError> { } } -