]> Humopery - polarsbop.git/commitdiff
add mapbucket command arg
authorErik Mackdanz <erikmack@gmail.com>
Tue, 28 Nov 2023 12:36:26 +0000 (06:36 -0600)
committerErik Mackdanz <erikmack@gmail.com>
Tue, 28 Nov 2023 12:36:26 +0000 (06:36 -0600)
scripts/paraexport
src/main.rs

index be33a5dc7b3eba3d85043f90497d1401f61023c0..903569e64e34e0ba1e3481c51e934f2cf01e8610 100755 (executable)
@@ -1,9 +1,9 @@
 #!/bin/sh
 
 # start multiple jobs in parallel
-target/debug/polarsbop 0 &
-target/debug/polarsbop 1 &
-target/debug/polarsbop 2 &
+target/debug/polarsbop mapbucket 0 &
+target/debug/polarsbop mapbucket 1 &
+target/debug/polarsbop mapbucket 2 &
 
 wait %1 %2 %3
 
index 4738231faa94bf1e0a16791e337818759853df8c..93bcac406571c0dec9e718803092587546a0369b 100644 (file)
@@ -1,15 +1,9 @@
+use aws_sdk_s3::primitives::ByteStream;
 use polars::prelude::*;
 use std::env::args;
 use std::fs::{create_dir_all,File};
 
-#[::tokio::main]
-async fn main() -> Result<(),PolarsError> {
-
-    // expect first arg to be a file partition = 0, 1, 2
-    let mut argsiter = args();
-    let _ = argsiter.next();
-    let partition = argsiter.next()
-       .expect("single argument required, 0 or 1 or 2");
+async fn map_bucket(partition: &str) -> Result<(),PolarsError> {
 
     // set some columns to be parsed as numeric
     let mut schema_override = Schema::new();
@@ -38,7 +32,7 @@ async fn main() -> Result<(),PolarsError> {
        .finish()?;
 
     // split list based on arg
-    match partition.as_str() {
+    match partition {
        "0" => {
            let mask = col("Country Name").lt(lit("Ethiopia"));
            df = df.filter(mask);
@@ -80,6 +74,7 @@ async fn main() -> Result<(),PolarsError> {
        ("8", 1_000_000_000_000f64, 4_000_000_000_000f64)
     ];
 
+    // prep S3 client
     let config = aws_config::load_from_env().await;
     let client = aws_sdk_s3::Client::new(&config);
 
@@ -120,13 +115,39 @@ async fn main() -> Result<(),PolarsError> {
 
        // write to s3
        let _ = client.put_object()
-               .bucket("mackdanz-polars-test")
-               .key(outs3key.clone())
-               .set_body(Some(aws_sdk_s3::primitives::ByteStream::from_path(outfilepath).await.unwrap()))
-               .send().await;
+           .bucket("mackdanz-polars-test")
+           .key(outs3key.clone())
+           .set_body(Some(ByteStream::from_path(outfilepath)
+                          .await.unwrap()))
+           .send().await;
 
        println!("wrote file to S3 {}", outs3key);
     }
 
     Ok(())
 }
+
+#[::tokio::main]
+async fn main() -> Result<(),PolarsError> {
+
+    let mut argsiter = args();
+    // first arg is program name
+    let _ = argsiter.next();
+    let command = argsiter.next()
+       .expect("no command arg");
+
+    match command.as_str() {
+       "mapbucket" => {
+           // expect next arg to be a file partition = 0, 1, 2
+           let partition = argsiter.next()
+               .expect("single argument required, 0 or 1 or 2");
+
+           map_bucket(partition.as_str()).await
+       },
+       _ => {
+           panic!("invalid command");
+       }
+    }
+
+}
+