let partition = argsiter.next()
.expect("single argument required, 0 or 1 or 2");
- // prep output file
- create_dir_all("out")
- .expect("couldn't create out dir");
- let outfilename = format!("output-{}.json",partition);
- let outfilepath = format!("out/{}",outfilename);
- // println!("{}",outfilename);
- let mut file = File::create(outfilepath.clone())
- .expect("couldn't create output file");
-
// set some columns to be parsed as numeric
let mut schema_override = Schema::new();
let _ = schema_override.with_column("2022".into(),DataType::Float64);
}
// values not status, selected cols
- let mut df = df
+ let df = df
.filter(col("Attribute").eq(lit("Value")))
+ .filter(col("Indicator Code").eq(lit("BXIP_BP6_USD")))
.select(&[col("Country Name"),col("Indicator Name"),
col("Indicator Code"),col("2022")])
.rename(["2022"],["Value"])
.sort_by_exprs([col("Country Name"),col("Indicator Name")],
vec!(false,false),false,false)
- .with_streaming(true)
- .collect()?;
-
- // some console output
- println!("df: {:?}",df);
+ .with_streaming(true);
- // write to local filesystem
- JsonWriter::new(&mut file)
- .finish(&mut df)?;
+ // value ranges for Primary Income
+ let buckets = [
+ ("0", 0f64, 15_000_000f64),
+ ("1", 15_000_000f64, 50_000_000f64),
+ ("2", 50_000_000f64, 200_000_000f64),
+ ("3", 200_000_000f64, 500_000_000f64),
+ ("4", 500_000_000f64, 1_000_000_000f64),
+ ("5", 1_000_000_000f64, 100_000_000_000f64),
+ ("6", 100_000_000_000f64, 500_000_000_000f64),
+ ("7", 500_000_000_000f64, 1_000_000_000_000f64),
+ ("8", 1_000_000_000_000f64, 4_000_000_000_000f64)
+ ];
- println!("wrote local file");
-
- // write to s3
let config = aws_config::load_from_env().await;
let client = aws_sdk_s3::Client::new(&config);
- let _ = client.put_object()
- .bucket("mackdanz-polars-test")
- .key(outfilename)
- .set_body(Some(aws_sdk_s3::primitives::ByteStream::from_path(outfilepath).await.unwrap()))
- .send().await;
- println!("wrote file to S3");
+ for (bindex, low, high) in buckets {
+
+ // prep output file
+ let bucketdir = format!("bucket-{}",bindex);
+ create_dir_all(format!("out/{}",bucketdir.clone()))
+ .expect("couldn't create out dir");
+
+ let outfilename = format!("partition-{}.json",partition);
+ let outs3key = format!("{}/{}",bucketdir,outfilename);
+ let outfilepath = format!("out/{}/{}",bucketdir,outfilename);
+
+ // println!("{}",outfilepath);
+ let mut file = File::create(outfilepath.clone())
+ .expect("couldn't create output file");
+
+ // work with a copy
+ let mut df_bucket = df.clone();
+
+ // filter for bucket range
+ let mask = col("Value").gt_eq(lit(low));
+ df_bucket = df_bucket.filter(mask);
+ let mask = col("Value").lt(lit(high));
+ df_bucket = df_bucket.filter(mask);
+
+ let mut df_bucket = df_bucket.collect()?;
+
+ // some console output
+ println!("df_bucket: {:?}",df_bucket);
+
+ // write to local filesystem
+ JsonWriter::new(&mut file)
+ .finish(&mut df_bucket)?;
+
+ println!("wrote local file {}", outfilepath);
+
+ // write to s3
+ let _ = client.put_object()
+ .bucket("mackdanz-polars-test")
+ .key(outs3key.clone())
+ .set_body(Some(aws_sdk_s3::primitives::ByteStream::from_path(outfilepath).await.unwrap()))
+ .send().await;
+
+ println!("wrote file to S3 {}", outs3key);
+ }
Ok(())
}