Custom Serialization¶
Scaler uses cloudpickle by default. You can provide a custom serializer on the client to control
how functions, arguments, and results are encoded and decoded.
Note
Any libraries used by your serializer must be installed on workers.
Serializer interface¶
Implement two methods:
- serialize(obj: Any) bytes¶
- Parameters:
obj – function object, argument object, or task result object
- Returns:
serialized bytes
- deserialize(payload: bytes) Any¶
- Parameters:
payload – serialized bytes produced by
serialize- Returns:
reconstructed object
For a call like client.submit(add, 1, 2), serialize is called for:
The function
addArgument
1Argument
2The returned result
Example implementation¶
This example uses tagged payloads and mixes strategies:
DataFrames -> parquet bytes
Integers -> packed binary
All other objects -> cloudpickle
import enum
import pickle
import struct
from io import BytesIO
from typing import Any
import pandas as pd
from cloudpickle import cloudpickle
from scaler import Serializer
class ObjType(enum.Enum):
General = b"G"
Integer = b"I"
DataFrame = b"D"
class CustomSerializer(Serializer):
@staticmethod
def serialize(obj: Any) -> bytes:
if isinstance(obj, pd.DataFrame):
buffer = BytesIO()
obj.to_parquet(buffer)
return ObjType.DataFrame.value + buffer.getvalue()
if isinstance(obj, int):
return ObjType.Integer.value + struct.pack("I", obj)
return ObjType.General.value + cloudpickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
@staticmethod
def deserialize(payload: bytes) -> Any:
obj_type = ObjType(payload[0:1])
payload = payload[1:]
if obj_type == ObjType.DataFrame:
buffer = BytesIO(payload)
return pd.read_parquet(buffer)
if obj_type == ObjType.Integer:
return struct.unpack("I", payload)[0]
return cloudpickle.loads(payload)