]> Humopery - polarsbop.git/commitdiff
output to buckets based on Primary Income
authorErik Mackdanz <erikmack@gmail.com>
Tue, 28 Nov 2023 06:06:22 +0000 (00:06 -0600)
committerErik Mackdanz <erikmack@gmail.com>
Tue, 28 Nov 2023 06:06:22 +0000 (00:06 -0600)
src/main.rs

index f028e12b3a4a5fb63176c4dd66be8e98867963d3..4738231faa94bf1e0a16791e337818759853df8c 100644 (file)
@@ -11,15 +11,6 @@ async fn main() -> Result<(),PolarsError> {
     let partition = argsiter.next()
        .expect("single argument required, 0 or 1 or 2");
 
-    // prep output file
-    create_dir_all("out")
-       .expect("couldn't create out dir");
-    let outfilename = format!("output-{}.json",partition);
-    let outfilepath = format!("out/{}",outfilename);
-    // println!("{}",outfilename);
-    let mut file = File::create(outfilepath.clone())
-       .expect("couldn't create output file");
-
     // set some columns to be parsed as numeric
     let mut schema_override = Schema::new();
     let _ = schema_override.with_column("2022".into(),DataType::Float64);
@@ -66,35 +57,76 @@ async fn main() -> Result<(),PolarsError> {
     }
 
     // values not status, selected cols
-    let mut df = df
+    let df = df
        .filter(col("Attribute").eq(lit("Value")))
+       .filter(col("Indicator Code").eq(lit("BXIP_BP6_USD")))
        .select(&[col("Country Name"),col("Indicator Name"),
                  col("Indicator Code"),col("2022")])
        .rename(["2022"],["Value"])
        .sort_by_exprs([col("Country Name"),col("Indicator Name")],
                       vec!(false,false),false,false)
-       .with_streaming(true)
-       .collect()?;
-
-    // some console output
-    println!("df: {:?}",df);
+       .with_streaming(true);
 
-    // write to local filesystem
-    JsonWriter::new(&mut file)
-       .finish(&mut df)?;
+    // value ranges for Primary Income
+    let buckets = [
+       ("0",                 0f64,        15_000_000f64),
+       ("1",        15_000_000f64,        50_000_000f64),
+       ("2",        50_000_000f64,       200_000_000f64),
+       ("3",       200_000_000f64,       500_000_000f64),
+       ("4",       500_000_000f64,     1_000_000_000f64),
+       ("5",     1_000_000_000f64,   100_000_000_000f64),
+       ("6",   100_000_000_000f64,   500_000_000_000f64),
+       ("7",   500_000_000_000f64, 1_000_000_000_000f64),
+       ("8", 1_000_000_000_000f64, 4_000_000_000_000f64)
+    ];
 
-    println!("wrote local file");
-
-    // write to s3
     let config = aws_config::load_from_env().await;
     let client = aws_sdk_s3::Client::new(&config);
-    let _ = client.put_object()
-       .bucket("mackdanz-polars-test")
-       .key(outfilename)
-       .set_body(Some(aws_sdk_s3::primitives::ByteStream::from_path(outfilepath).await.unwrap()))
-       .send().await;
 
-    println!("wrote file to S3");
+    for (bindex, low, high) in buckets {
+
+       // prep output file
+       let bucketdir = format!("bucket-{}",bindex);
+       create_dir_all(format!("out/{}",bucketdir.clone()))
+           .expect("couldn't create out dir");
+
+       let outfilename = format!("partition-{}.json",partition);
+       let outs3key = format!("{}/{}",bucketdir,outfilename);
+       let outfilepath = format!("out/{}/{}",bucketdir,outfilename);
+
+       // println!("{}",outfilepath);
+       let mut file = File::create(outfilepath.clone())
+           .expect("couldn't create output file");
+
+       // work with a copy
+       let mut df_bucket = df.clone();
+
+       // filter for bucket range
+       let mask = col("Value").gt_eq(lit(low));
+       df_bucket = df_bucket.filter(mask);
+       let mask = col("Value").lt(lit(high));
+       df_bucket = df_bucket.filter(mask);
+
+       let mut df_bucket = df_bucket.collect()?;
+
+       // some console output
+       println!("df_bucket: {:?}",df_bucket);
+
+       // write to local filesystem
+       JsonWriter::new(&mut file)
+           .finish(&mut df_bucket)?;
+
+       println!("wrote local file {}", outfilepath);
+
+       // write to s3
+       let _ = client.put_object()
+               .bucket("mackdanz-polars-test")
+               .key(outs3key.clone())
+               .set_body(Some(aws_sdk_s3::primitives::ByteStream::from_path(outfilepath).await.unwrap()))
+               .send().await;
+
+       println!("wrote file to S3 {}", outs3key);
+    }
 
     Ok(())
 }