(no title)
lonesword | 1 year ago
- Write a pandas_udf function in pyspark.
- Parition your data into smaller bits so that the pandas_udf does not get too much data at the same time.
Something like:
```
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
@f.pandas_udf(return_type=whatever)
def ingest(doc: pd.Series): # doc is a pandas series now
# your processing goes here -> write to DB e.t.c
pd_series_literal = Create a pd.Series that just contains the integer 0 to make spark happy
return pd_series_literal
spark = SparkSession.builder.getOrCreate()df = spark.read.parquet("s3 path")
df = df.repartition(1000). # bump up this number if you run into memory issues
df = df.withColumn("foo", ingest(f.col("doc_column"))
```
Now the trick is, you can limit how much data is given to your pandas_udf by repartitioning your data. The more the partitions, the smaller the pd.Series that your pandas_udf gets. There's also the `spark.sql.execution.arrow.maxRecordsPerBatch` config that you can set in spark to limit memory consumption.
^ Probably overkill to bring spark into the equation, but this is one way to do it.
You can use a normal udf (i.e `f.udf()`) instead of a pandas_udf, but apparently that's slower due to java <-> python serialization
fifilura|1 year ago
I just wanted to mention that AWS Athena eats 15G parquet files for breakfast.
It is trivial to map the file into Athena.
But you can't connect it to anything else than file output. But it can help you to for example write it to smaller chunks. Or choose another output format such as csv (although arbitrary email content in a csv feels like you are set up for parsing errors).
The benefit is that there is virtually no setup cost. And processing cost for a 15G file will be just a few cents.
calderwoodra|1 year ago
jcgrillo|1 year ago
okr|1 year ago