From: Erik Mackdanz Date: Tue, 28 Nov 2023 12:36:26 +0000 (-0600) Subject: add mapbucket command arg X-Git-Url: https://git.humopery.space/?a=commitdiff_plain;h=ea46b4b148916d8b430616d27dd5f56b77414ad1;p=polarsbop.git add mapbucket command arg --- diff --git a/scripts/paraexport b/scripts/paraexport index be33a5d..903569e 100755 --- a/scripts/paraexport +++ b/scripts/paraexport @@ -1,9 +1,9 @@ #!/bin/sh # start multiple jobs in parallel -target/debug/polarsbop 0 & -target/debug/polarsbop 1 & -target/debug/polarsbop 2 & +target/debug/polarsbop mapbucket 0 & +target/debug/polarsbop mapbucket 1 & +target/debug/polarsbop mapbucket 2 & wait %1 %2 %3 diff --git a/src/main.rs b/src/main.rs index 4738231..93bcac4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,9 @@ +use aws_sdk_s3::primitives::ByteStream; use polars::prelude::*; use std::env::args; use std::fs::{create_dir_all,File}; -#[::tokio::main] -async fn main() -> Result<(),PolarsError> { - - // expect first arg to be a file partition = 0, 1, 2 - let mut argsiter = args(); - let _ = argsiter.next(); - let partition = argsiter.next() - .expect("single argument required, 0 or 1 or 2"); +async fn map_bucket(partition: &str) -> Result<(),PolarsError> { // set some columns to be parsed as numeric let mut schema_override = Schema::new(); @@ -38,7 +32,7 @@ async fn main() -> Result<(),PolarsError> { .finish()?; // split list based on arg - match partition.as_str() { + match partition { "0" => { let mask = col("Country Name").lt(lit("Ethiopia")); df = df.filter(mask); @@ -80,6 +74,7 @@ async fn main() -> Result<(),PolarsError> { ("8", 1_000_000_000_000f64, 4_000_000_000_000f64) ]; + // prep S3 client let config = aws_config::load_from_env().await; let client = aws_sdk_s3::Client::new(&config); @@ -120,13 +115,39 @@ async fn main() -> Result<(),PolarsError> { // write to s3 let _ = client.put_object() - .bucket("mackdanz-polars-test") - .key(outs3key.clone()) - .set_body(Some(aws_sdk_s3::primitives::ByteStream::from_path(outfilepath).await.unwrap())) - .send().await; + .bucket("mackdanz-polars-test") + .key(outs3key.clone()) + .set_body(Some(ByteStream::from_path(outfilepath) + .await.unwrap())) + .send().await; println!("wrote file to S3 {}", outs3key); } Ok(()) } + +#[::tokio::main] +async fn main() -> Result<(),PolarsError> { + + let mut argsiter = args(); + // first arg is program name + let _ = argsiter.next(); + let command = argsiter.next() + .expect("no command arg"); + + match command.as_str() { + "mapbucket" => { + // expect next arg to be a file partition = 0, 1, 2 + let partition = argsiter.next() + .expect("single argument required, 0 or 1 or 2"); + + map_bucket(partition.as_str()).await + }, + _ => { + panic!("invalid command"); + } + } + +} +