From 2312de802c7cc90ecb2a9354b2a9d242cb5393ec Mon Sep 17 00:00:00 2001 From: Erik Mackdanz Date: Tue, 28 Nov 2023 06:50:15 -0600 Subject: [PATCH] stub reduce_bucket --- scripts/paraexport | 26 +++++++++++++++++++++----- src/main.rs | 17 ++++++++++++++--- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/scripts/paraexport b/scripts/paraexport index 903569e..bb8c316 100755 --- a/scripts/paraexport +++ b/scripts/paraexport @@ -1,10 +1,26 @@ #!/bin/sh # start multiple jobs in parallel -target/debug/polarsbop mapbucket 0 & -target/debug/polarsbop mapbucket 1 & -target/debug/polarsbop mapbucket 2 & +# target/debug/polarsbop mapbucket 0 & +# target/debug/polarsbop mapbucket 1 & +# target/debug/polarsbop mapbucket 2 & -wait %1 %2 %3 +# wait %1 %2 %3 -echo done +# echo mapbucket stage done + +target/debug/polarsbop reducebucket 0 + +# target/debug/polarsbop reducebucket 0 & +# target/debug/polarsbop reducebucket 1 & +# target/debug/polarsbop reducebucket 2 & +# target/debug/polarsbop reducebucket 3 & +# target/debug/polarsbop reducebucket 4 & +# target/debug/polarsbop reducebucket 5 & +# target/debug/polarsbop reducebucket 6 & +# target/debug/polarsbop reducebucket 7 & +# target/debug/polarsbop reducebucket 8 & + +# wait %4 + +echo reducebucket stage done diff --git a/src/main.rs b/src/main.rs index 93bcac4..788a01f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -82,12 +82,12 @@ async fn map_bucket(partition: &str) -> Result<(),PolarsError> { // prep output file let bucketdir = format!("bucket-{}",bindex); - create_dir_all(format!("out/{}",bucketdir.clone())) + create_dir_all(format!("out/mapbucket/{}",bucketdir.clone())) .expect("couldn't create out dir"); let outfilename = format!("partition-{}.json",partition); - let outs3key = format!("{}/{}",bucketdir,outfilename); - let outfilepath = format!("out/{}/{}",bucketdir,outfilename); + let outs3key = format!("mapbucket/{}/{}",bucketdir,outfilename); + let outfilepath = format!("out/mapbucket/{}/{}",bucketdir,outfilename); // println!("{}",outfilepath); let mut file = File::create(outfilepath.clone()) @@ -127,6 +127,10 @@ async fn map_bucket(partition: &str) -> Result<(),PolarsError> { Ok(()) } +async fn reduce_bucket(bucket: &str) -> Result<(),PolarsError> { + Ok(()) +} + #[::tokio::main] async fn main() -> Result<(),PolarsError> { @@ -144,6 +148,13 @@ async fn main() -> Result<(),PolarsError> { map_bucket(partition.as_str()).await }, + "reducebucket" => { + // expect next arg to be a bucket partition = 0-8 + let bucket = argsiter.next() + .expect("single argument required, 0 - 8"); + + reduce_bucket(bucket.as_str()).await + }, _ => { panic!("invalid command"); } -- 2.52.0