From 0c1890a58fe051839882fed3b3f2ba32e4d008e0 Mon Sep 17 00:00:00 2001 From: Erik Mackdanz Date: Tue, 28 Nov 2023 00:06:22 -0600 Subject: [PATCH] output to buckets based on Primary Income --- src/main.rs | 86 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 59 insertions(+), 27 deletions(-) diff --git a/src/main.rs b/src/main.rs index f028e12..4738231 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,15 +11,6 @@ async fn main() -> Result<(),PolarsError> { let partition = argsiter.next() .expect("single argument required, 0 or 1 or 2"); - // prep output file - create_dir_all("out") - .expect("couldn't create out dir"); - let outfilename = format!("output-{}.json",partition); - let outfilepath = format!("out/{}",outfilename); - // println!("{}",outfilename); - let mut file = File::create(outfilepath.clone()) - .expect("couldn't create output file"); - // set some columns to be parsed as numeric let mut schema_override = Schema::new(); let _ = schema_override.with_column("2022".into(),DataType::Float64); @@ -66,35 +57,76 @@ async fn main() -> Result<(),PolarsError> { } // values not status, selected cols - let mut df = df + let df = df .filter(col("Attribute").eq(lit("Value"))) + .filter(col("Indicator Code").eq(lit("BXIP_BP6_USD"))) .select(&[col("Country Name"),col("Indicator Name"), col("Indicator Code"),col("2022")]) .rename(["2022"],["Value"]) .sort_by_exprs([col("Country Name"),col("Indicator Name")], vec!(false,false),false,false) - .with_streaming(true) - .collect()?; - - // some console output - println!("df: {:?}",df); + .with_streaming(true); - // write to local filesystem - JsonWriter::new(&mut file) - .finish(&mut df)?; + // value ranges for Primary Income + let buckets = [ + ("0", 0f64, 15_000_000f64), + ("1", 15_000_000f64, 50_000_000f64), + ("2", 50_000_000f64, 200_000_000f64), + ("3", 200_000_000f64, 500_000_000f64), + ("4", 500_000_000f64, 1_000_000_000f64), + ("5", 1_000_000_000f64, 100_000_000_000f64), + ("6", 100_000_000_000f64, 500_000_000_000f64), + ("7", 500_000_000_000f64, 1_000_000_000_000f64), + ("8", 1_000_000_000_000f64, 4_000_000_000_000f64) + ]; - println!("wrote local file"); - - // write to s3 let config = aws_config::load_from_env().await; let client = aws_sdk_s3::Client::new(&config); - let _ = client.put_object() - .bucket("mackdanz-polars-test") - .key(outfilename) - .set_body(Some(aws_sdk_s3::primitives::ByteStream::from_path(outfilepath).await.unwrap())) - .send().await; - println!("wrote file to S3"); + for (bindex, low, high) in buckets { + + // prep output file + let bucketdir = format!("bucket-{}",bindex); + create_dir_all(format!("out/{}",bucketdir.clone())) + .expect("couldn't create out dir"); + + let outfilename = format!("partition-{}.json",partition); + let outs3key = format!("{}/{}",bucketdir,outfilename); + let outfilepath = format!("out/{}/{}",bucketdir,outfilename); + + // println!("{}",outfilepath); + let mut file = File::create(outfilepath.clone()) + .expect("couldn't create output file"); + + // work with a copy + let mut df_bucket = df.clone(); + + // filter for bucket range + let mask = col("Value").gt_eq(lit(low)); + df_bucket = df_bucket.filter(mask); + let mask = col("Value").lt(lit(high)); + df_bucket = df_bucket.filter(mask); + + let mut df_bucket = df_bucket.collect()?; + + // some console output + println!("df_bucket: {:?}",df_bucket); + + // write to local filesystem + JsonWriter::new(&mut file) + .finish(&mut df_bucket)?; + + println!("wrote local file {}", outfilepath); + + // write to s3 + let _ = client.put_object() + .bucket("mackdanz-polars-test") + .key(outs3key.clone()) + .set_body(Some(aws_sdk_s3::primitives::ByteStream::from_path(outfilepath).await.unwrap())) + .send().await; + + println!("wrote file to S3 {}", outs3key); + } Ok(()) } -- 2.52.0