"aws-sdk-s3",
"polars",
"polars-io",
+ "smartstring",
"tokio",
]
aws-sdk-s3 = "1.1.0"
polars = { version = "0.35.4", features = ["lazy", "fmt", "partition_by", "http", "streaming", "async", "aws", "json", "random"] }
polars-io = { version = "0.35.4", features = ["csv", "aws", "fmt", "http", "json"] }
+smartstring = "1.0.1"
tokio = { version = "1.34.0", features = ["full"] }
#!/bin/sh
# start multiple jobs in parallel
-# target/debug/polarsbop mapbucket 0 &
-# target/debug/polarsbop mapbucket 1 &
-# target/debug/polarsbop mapbucket 2 &
+target/debug/polarsbop mapbucket 0 &
+target/debug/polarsbop mapbucket 1 &
+target/debug/polarsbop mapbucket 2 &
-# wait %1 %2 %3
+wait %1 %2 %3
# echo mapbucket stage done
-target/debug/polarsbop reducebucket 0
+target/debug/polarsbop reducebucket 0 &
+target/debug/polarsbop reducebucket 1 &
+target/debug/polarsbop reducebucket 2 &
+target/debug/polarsbop reducebucket 3 &
+target/debug/polarsbop reducebucket 4 &
+target/debug/polarsbop reducebucket 5 &
+target/debug/polarsbop reducebucket 6 &
+target/debug/polarsbop reducebucket 7 &
+target/debug/polarsbop reducebucket 8 &
-# target/debug/polarsbop reducebucket 0 &
-# target/debug/polarsbop reducebucket 1 &
-# target/debug/polarsbop reducebucket 2 &
-# target/debug/polarsbop reducebucket 3 &
-# target/debug/polarsbop reducebucket 4 &
-# target/debug/polarsbop reducebucket 5 &
-# target/debug/polarsbop reducebucket 6 &
-# target/debug/polarsbop reducebucket 7 &
-# target/debug/polarsbop reducebucket 8 &
-
-# wait %4
+wait %1 %2 %3 %4 %5 %6 %7 %8 %9
echo reducebucket stage done
use aws_sdk_s3::primitives::ByteStream;
use polars::prelude::*;
+use smartstring::SmartString;
use std::error::Error;
use std::env::args;
use std::io::Write;
}
// read partitions to DF
- let bucketdf =
+ // aggregate as median value in this bucket
+ // need an explicit schema since data can be empty
+ let mut colname = SmartString::new();
+ colname.push_str("Value");
+ let mut schema = Schema::new();
+ let _ = schema.with_column(colname, DataType::Float64);
+
+ let mut bucketdf =
LazyJsonLineReader::new_paths(Arc::new(filepaths))
+ .with_schema(Some(schema.into()))
.finish()?
.sort_by_exprs([col("Value")],
vec!(false),false,false)
+ .median()
+ .select(&[col("Value")])
.collect()?;
println!("bucketdf: {:?}",bucketdf);
+ // output locally
+ create_dir_all("out/reducebucket/out")
+ .expect("couldn't create output dir");
+ let outfilepath = format!(
+ "out/reducebucket/out/bucket-{}.json", bucket);
- // todo: some aggregation on my bucket
+ let mut file = File::create(outfilepath.clone())
+ .expect("couldn't create output file");
+
+ JsonWriter::new(&mut file)
+ .finish(&mut bucketdf)?;
+
+ println!("wrote local file {}", outfilepath);
+
+ // open local file for writing to s3
+ let bytes = ByteStream::from_path(outfilepath).await?;
+
+ // write to s3
+ let outs3key = format!("reducebucket/bucket-{}.json",bucket);
+ client.put_object()
+ .bucket("mackdanz-polars-test")
+ .key(outs3key.clone())
+ .set_body(Some(bytes))
+ .send().await?;
+
+ println!("wrote file to S3 {}", outs3key);
Ok(())
}