From 01479e65075633bcd32b3e7acb929587e14208c6 Mon Sep 17 00:00:00 2001 From: Erik Mackdanz Date: Thu, 30 Nov 2023 16:01:58 -0600 Subject: [PATCH] add local/s3 write for reduce stage --- Cargo.lock | 1 + Cargo.toml | 1 + scripts/paraexport | 30 ++++++++++++++---------------- src/main.rs | 39 +++++++++++++++++++++++++++++++++++++-- 4 files changed, 53 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cd2b42f..829bfaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2134,6 +2134,7 @@ dependencies = [ "aws-sdk-s3", "polars", "polars-io", + "smartstring", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 3185406..9fbe9d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,4 +10,5 @@ aws-config = { version = "1.0.1", features = ["behavior-version-latest"] } aws-sdk-s3 = "1.1.0" polars = { version = "0.35.4", features = ["lazy", "fmt", "partition_by", "http", "streaming", "async", "aws", "json", "random"] } polars-io = { version = "0.35.4", features = ["csv", "aws", "fmt", "http", "json"] } +smartstring = "1.0.1" tokio = { version = "1.34.0", features = ["full"] } diff --git a/scripts/paraexport b/scripts/paraexport index bb8c316..d581939 100755 --- a/scripts/paraexport +++ b/scripts/paraexport @@ -1,26 +1,24 @@ #!/bin/sh # start multiple jobs in parallel -# target/debug/polarsbop mapbucket 0 & -# target/debug/polarsbop mapbucket 1 & -# target/debug/polarsbop mapbucket 2 & +target/debug/polarsbop mapbucket 0 & +target/debug/polarsbop mapbucket 1 & +target/debug/polarsbop mapbucket 2 & -# wait %1 %2 %3 +wait %1 %2 %3 # echo mapbucket stage done -target/debug/polarsbop reducebucket 0 +target/debug/polarsbop reducebucket 0 & +target/debug/polarsbop reducebucket 1 & +target/debug/polarsbop reducebucket 2 & +target/debug/polarsbop reducebucket 3 & +target/debug/polarsbop reducebucket 4 & +target/debug/polarsbop reducebucket 5 & +target/debug/polarsbop reducebucket 6 & +target/debug/polarsbop reducebucket 7 & +target/debug/polarsbop reducebucket 8 & -# target/debug/polarsbop reducebucket 0 & -# target/debug/polarsbop reducebucket 1 & -# target/debug/polarsbop reducebucket 2 & -# target/debug/polarsbop reducebucket 3 & -# target/debug/polarsbop reducebucket 4 & -# target/debug/polarsbop reducebucket 5 & -# target/debug/polarsbop reducebucket 6 & -# target/debug/polarsbop reducebucket 7 & -# target/debug/polarsbop reducebucket 8 & - -# wait %4 +wait %1 %2 %3 %4 %5 %6 %7 %8 %9 echo reducebucket stage done diff --git a/src/main.rs b/src/main.rs index 312f263..2d5aa1d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ use aws_sdk_s3::primitives::ByteStream; use polars::prelude::*; +use smartstring::SmartString; use std::error::Error; use std::env::args; use std::io::Write; @@ -175,16 +176,50 @@ async fn reduce_bucket(bucket: &str) -> Result<(),Box> { } // read partitions to DF - let bucketdf = + // aggregate as median value in this bucket + // need an explicit schema since data can be empty + let mut colname = SmartString::new(); + colname.push_str("Value"); + let mut schema = Schema::new(); + let _ = schema.with_column(colname, DataType::Float64); + + let mut bucketdf = LazyJsonLineReader::new_paths(Arc::new(filepaths)) + .with_schema(Some(schema.into())) .finish()? .sort_by_exprs([col("Value")], vec!(false),false,false) + .median() + .select(&[col("Value")]) .collect()?; println!("bucketdf: {:?}",bucketdf); + // output locally + create_dir_all("out/reducebucket/out") + .expect("couldn't create output dir"); + let outfilepath = format!( + "out/reducebucket/out/bucket-{}.json", bucket); - // todo: some aggregation on my bucket + let mut file = File::create(outfilepath.clone()) + .expect("couldn't create output file"); + + JsonWriter::new(&mut file) + .finish(&mut bucketdf)?; + + println!("wrote local file {}", outfilepath); + + // open local file for writing to s3 + let bytes = ByteStream::from_path(outfilepath).await?; + + // write to s3 + let outs3key = format!("reducebucket/bucket-{}.json",bucket); + client.put_object() + .bucket("mackdanz-polars-test") + .key(outs3key.clone()) + .set_body(Some(bytes)) + .send().await?; + + println!("wrote file to S3 {}", outs3key); Ok(()) } -- 2.52.0