]> Humopery - polarsbop.git/commitdiff
add local/s3 write for reduce stage
authorErik Mackdanz <erikmack@gmail.com>
Thu, 30 Nov 2023 22:01:58 +0000 (16:01 -0600)
committerErik Mackdanz <erikmack@gmail.com>
Thu, 30 Nov 2023 22:01:58 +0000 (16:01 -0600)
Cargo.lock
Cargo.toml
scripts/paraexport
src/main.rs

index cd2b42fb9960810d272fc48e8ec277cdef398410..829bfafd9dcd60f5214a9f34242a199194195b58 100644 (file)
@@ -2134,6 +2134,7 @@ dependencies = [
  "aws-sdk-s3",
  "polars",
  "polars-io",
+ "smartstring",
  "tokio",
 ]
 
index 3185406699f3c7850263366bdbb4b75bb9cc80a6..9fbe9d4bd038637b048800f265604c275b86b90f 100644 (file)
@@ -10,4 +10,5 @@ aws-config = { version = "1.0.1", features = ["behavior-version-latest"] }
 aws-sdk-s3 = "1.1.0"
 polars = { version = "0.35.4", features = ["lazy", "fmt", "partition_by", "http", "streaming", "async", "aws", "json", "random"] }
 polars-io = { version = "0.35.4", features = ["csv", "aws", "fmt", "http", "json"] }
+smartstring = "1.0.1"
 tokio = { version = "1.34.0", features = ["full"] }
index bb8c316b94d7deff11c2e5a8eea5a87452b283b3..d58193997a3f92ae51259354d3b3872068d605b7 100755 (executable)
@@ -1,26 +1,24 @@
 #!/bin/sh
 
 # start multiple jobs in parallel
-target/debug/polarsbop mapbucket 0 &
-target/debug/polarsbop mapbucket 1 &
-target/debug/polarsbop mapbucket 2 &
+target/debug/polarsbop mapbucket 0 &
+target/debug/polarsbop mapbucket 1 &
+target/debug/polarsbop mapbucket 2 &
 
-wait %1 %2 %3
+wait %1 %2 %3
 
 # echo mapbucket stage done
 
-target/debug/polarsbop reducebucket 0
+target/debug/polarsbop reducebucket 0 &
+target/debug/polarsbop reducebucket 1 &
+target/debug/polarsbop reducebucket 2 &
+target/debug/polarsbop reducebucket 3 &
+target/debug/polarsbop reducebucket 4 &
+target/debug/polarsbop reducebucket 5 &
+target/debug/polarsbop reducebucket 6 &
+target/debug/polarsbop reducebucket 7 &
+target/debug/polarsbop reducebucket 8 &
 
-# target/debug/polarsbop reducebucket 0 &
-# target/debug/polarsbop reducebucket 1 &
-# target/debug/polarsbop reducebucket 2 &
-# target/debug/polarsbop reducebucket 3 &
-# target/debug/polarsbop reducebucket 4 &
-# target/debug/polarsbop reducebucket 5 &
-# target/debug/polarsbop reducebucket 6 &
-# target/debug/polarsbop reducebucket 7 &
-# target/debug/polarsbop reducebucket 8 &
-
-# wait %4
+wait %1 %2 %3 %4 %5 %6 %7 %8 %9
 
 echo reducebucket stage done
index 312f2632c4e1f1a452b4c7755c894c6ea053b1ee..2d5aa1d6809424d8eda0987ba99d435136e7d28b 100644 (file)
@@ -1,5 +1,6 @@
 use aws_sdk_s3::primitives::ByteStream;
 use polars::prelude::*;
+use smartstring::SmartString;
 use std::error::Error;
 use std::env::args;
 use std::io::Write;
@@ -175,16 +176,50 @@ async fn reduce_bucket(bucket: &str) -> Result<(),Box<dyn Error>> {
     }
 
     // read partitions to DF
-    let bucketdf =
+    // aggregate as median value in this bucket
+    // need an explicit schema since data can be empty
+    let mut colname = SmartString::new();
+    colname.push_str("Value");
+    let mut schema = Schema::new();
+    let _ = schema.with_column(colname, DataType::Float64);
+
+    let mut bucketdf =
        LazyJsonLineReader::new_paths(Arc::new(filepaths))
+       .with_schema(Some(schema.into()))
        .finish()?
        .sort_by_exprs([col("Value")],
                       vec!(false),false,false)
+       .median()
+       .select(&[col("Value")])
        .collect()?;
     println!("bucketdf: {:?}",bucketdf);
 
+    // output locally
+    create_dir_all("out/reducebucket/out")
+       .expect("couldn't create output dir");
+    let outfilepath = format!(
+       "out/reducebucket/out/bucket-{}.json", bucket);
 
-    // todo: some aggregation on my bucket
+    let mut file = File::create(outfilepath.clone())
+       .expect("couldn't create output file");
+
+    JsonWriter::new(&mut file)
+       .finish(&mut bucketdf)?;
+
+    println!("wrote local file {}", outfilepath);
+
+    // open local file for writing to s3
+    let bytes = ByteStream::from_path(outfilepath).await?;
+
+    // write to s3
+    let outs3key = format!("reducebucket/bucket-{}.json",bucket);
+    client.put_object()
+       .bucket("mackdanz-polars-test")
+       .key(outs3key.clone())
+       .set_body(Some(bytes))
+       .send().await?;
+
+    println!("wrote file to S3 {}", outs3key);
 
     Ok(())
 }