Skip to content

Process Data

getFilteredData(df, impr_acc=100, cpu_cores=max(1, int(cpu_count() / 2)))

Parallel, two–stage cleansing of raw impression data that

  1. drops points whose GNSS accuracy (impression_acc) exceeds the user-specified threshold, and
  2. removes physically implausible jumps using scikit-mob’s :pyfunc:skmob.preprocessing.filtering.filter (max_speed_kmh=200 by default).

The work is split into load-balanced buckets and processed concurrently with :pyclass:multiprocessing.Pool.

Parameters:

Name Type Description Default
df DataFrame

Point-level impressions with at least the columns ["uid", "lat", "lng", "datetime", "impression_acc"] (plus any additional attributes you want to keep).

required
impr_acc int

Maximum allowed GNSS accuracy in metres. Points with a larger impression_acc are discarded. Defaults to 100.

100
cpu_cores int

Number of CPU cores to devote to multiprocessing. By default, half of the available logical cores (but at least 1).

max(1, int(cpu_count() / 2))

Returns:

Name Type Description
TrajDataFrame TrajDataFrame

A scikit-mob TrajDataFrame containing only points that pass both the accuracy and speed filters, with its original columns preserved.

Example

clean_traj = getFilteredData(raw_df, impr_acc=50, cpu_cores=8) print(clean_traj.shape)

Source code in meowmotion/process_data.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def getFilteredData(
    df: pd.DataFrame,
    impr_acc: Optional[int] = 100,
    cpu_cores: Optional[int] = max(1, int(cpu_count() / 2)),
) -> TrajDataFrame:
    """
    Parallel, two–stage cleansing of raw impression data that

    1. **drops points whose GNSS accuracy** (``impression_acc``) exceeds the
       user-specified threshold, and
    2. **removes physically implausible jumps** using scikit-mob’s
       :pyfunc:`skmob.preprocessing.filtering.filter`
       (``max_speed_kmh=200`` by default).

    The work is split into load-balanced buckets and processed concurrently
    with :pyclass:`multiprocessing.Pool`.

    Args:
        df (pd.DataFrame):
            Point-level impressions with *at least* the columns
            ``["uid", "lat", "lng", "datetime", "impression_acc"]``
            (plus any additional attributes you want to keep).
        impr_acc (int, optional):
            Maximum allowed GNSS accuracy in **metres**. Points with a larger
            ``impression_acc`` are discarded. Defaults to ``100``.
        cpu_cores (int, optional):
            Number of CPU cores to devote to multiprocessing. By default, half
            of the available logical cores (but at least 1).

    Returns:
        TrajDataFrame:
            A scikit-mob ``TrajDataFrame`` containing only points that pass
            both the accuracy and speed filters, with its original columns
            preserved.

    Example:
        >>> clean_traj = getFilteredData(raw_df, impr_acc=50, cpu_cores=8)
        >>> print(clean_traj.shape)
    """

    print(f"{datetime.now()}: Filtering data based on impression accuracy={impr_acc}")
    print(f"{datetime.now()}: Creating buckets for multiprocessing")
    tdf_collection = getLoadBalancedBuckets(df, cpu_cores)
    args = [(tdf, impr_acc) for tdf in tdf_collection if not tdf.empty]
    print(f"{datetime.now()}: Filtering Started...")
    with Pool(cpu_cores) as pool:
        results = pool.starmap(
            filterData, args
        )  # Filtering the data based on Impression Accuracy and Speed between GPS points

    del tdf_collection  # Deleting the data to free up the memory
    traj_df = pd.concat(
        [*results]
    )  # Concatinating the filtered data from all the processes
    del results  # Deleting the results to free up the memory
    print(f"{datetime.now()}: Filtering Finished\n\n\n")

    return traj_df

getLoadBalancedBuckets(tdf, bucket_size)

Partition a user-level DataFrame into bucket_size sub-DataFrames whose total row counts (i.e. number of “impressions”) are as evenly balanced as possible. Each bucket can then be processed in parallel on its own CPU core.

Algorithm

  1. Count the number of rows (“impressions”) for every unique uid.
  2. Sort users in descending order of impression count.
  3. Greedily assign each user to the bucket that currently has the smallest total number of impressions (load-balancing heuristic).
  4. Build one DataFrame per bucket containing only the rows for the users assigned to that bucket.
  5. Return the list of non-empty bucket DataFrames.

Parameters:

Name Type Description Default
tdf DataFrame

A DataFrame that must contain a "uid" column plus any other fields. Each row represents one GPS impression or point.

required
bucket_size int

The desired number of buckets—typically equal to the number of CPU cores you plan to use with :pyclass:multiprocessing.Pool.

required

Returns:

Type Description
list

list[pd.DataFrame]: A list whose length is bucket_size. Each element is a DataFrame containing a disjoint subset of users such that the cumulative row counts across buckets are approximately balanced. Empty buckets are omitted.

Example

buckets = getLoadBalancedBuckets(raw_points_df, bucket_size=8) for i, bucket_df in enumerate(buckets, start=1): ... print(f"Bucket {i}: {len(bucket_df):,} rows " ... f"({bucket_df['uid'].nunique()} users)")

Note

The function is designed for embarrassingly parallel workloads where each user’s data can be processed independently (e.g. feature extraction or filtering).

Source code in meowmotion/process_data.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def getLoadBalancedBuckets(tdf: pd.DataFrame, bucket_size: int) -> list:
    """
    Partition a user-level DataFrame into *bucket_size* sub-DataFrames whose
    total row counts (i.e. number of “impressions”) are as evenly balanced as
    possible.  Each bucket can then be processed in parallel on its own CPU
    core.

    Algorithm
    ---------
    1. Count the number of rows (“impressions”) for every unique ``uid``.
    2. Sort users in descending order of impression count.
    3. Greedily assign each user to the bucket that currently has the
       **smallest** total number of impressions (*load-balancing heuristic*).
    4. Build one DataFrame per bucket containing only the rows for the users
       assigned to that bucket.
    5. Return the list of non-empty bucket DataFrames.

    Args:
        tdf (pd.DataFrame):
            A DataFrame that **must contain a ``"uid"`` column** plus any other
            fields.  Each row represents one GPS impression or point.
        bucket_size (int):
            The desired number of buckets—typically equal to the number of CPU
            cores you plan to use with :pyclass:`multiprocessing.Pool`.

    Returns:
        list[pd.DataFrame]:
            A list whose length is **≤ *bucket_size***.  Each element is a
            DataFrame containing a disjoint subset of users such that the
            cumulative row counts across buckets are approximately balanced.
            Empty buckets are omitted.

    Example:
        >>> buckets = getLoadBalancedBuckets(raw_points_df, bucket_size=8)
        >>> for i, bucket_df in enumerate(buckets, start=1):
        ...     print(f"Bucket {i}: {len(bucket_df):,} rows "
        ...           f"({bucket_df['uid'].nunique()} users)")

    Note:
        The function is designed for *embarrassingly parallel* workloads where
        each user’s data can be processed independently (e.g. feature
        extraction or filtering).
    """

    print(f"{datetime.now()}: Getting unique users")
    unique_users = tdf["uid"].unique()  # Getting Unique Users in the data
    print(f"{datetime.now()}: Number of unique users: {len(unique_users)}")
    print(f"{datetime.now()}: Creating sets")
    num_impr_df = (
        pd.DataFrame(tdf.groupby("uid").size(), columns=["num_impressions"])
        .reset_index()
        .sort_values(by=["num_impressions"], ascending=False)
    )  # Creating a DataFrame containing Unique UID and Total number of impressions that Unique UID has in the data.
    buckets = (
        {}
    )  # A dictionary containing buckets of UIDs. Each bucket represent the CPU core. This dictionary tells how many users' data will be process on which core. For example, if bucket 1 contains 10 UIDs, data of those 10 UIDs will be processed on the core 1.
    bucket_sums = {}  # A flag dictionary to keep the track of load on each bucket.

    for i in range(1, bucket_size + 1):
        buckets[i] = []  # Initializing empty buckets
        bucket_sums[i] = 0  # Load in each bucket is zero initially

    # Allocate users to buckets
    for _, row in num_impr_df.iterrows():
        user, impressions = (
            row["uid"],
            row["num_impressions"],
        )  # Getting the UID and the number of impressions of that UID
        # Find the bucket with the minimum sum of impressions
        min_bucket = min(
            bucket_sums, key=bucket_sums.get
        )  # Getting the bucket with the minimum load. Initially, all the buckets have zero load.
        # Add user to this bucket
        buckets[min_bucket].append(user)  # Adding UID to the minimum bucket
        # Update the sum of impressions for this bucket
        bucket_sums[
            min_bucket
        ] += impressions  # Updating the load value of the bucket. For example, UID 1 was added to Bucket 1 and UID 1 had 1000 impressions (records). So, load of bucket 1 is 1000 now.

    print(f"{datetime.now()}: Creating seperate dataframes")
    tdf_collection = (
        []
    )  # List of collection of DataFrames. This list will contain the number of DataFrames=number of CPU Cores. Each DataFrame will be processed in a seperate core as a seperate process.
    for i in range(1, bucket_size + 1):
        # tdf_collection.append(tdf[tdf["uid"].isin(buckets[i])].copy())
        tdf[tdf["uid"].isin(buckets[i])].copy()
        if not tdf.empty:
            tdf_collection.append(tdf[tdf["uid"].isin(buckets[i])].copy())
    return tdf_collection

readJsonFiles(root, month_file)

Load a month-worth of impression records stored as gzipped JSON-Lines inside a ZIP archive and return them as a tidy DataFrame.

Data-at-Rest Format

The function expects the following directory / file structure:

``root/ 2023-01.zip # <- month_file argument 2023-01-01-00.json.gz 2023-01-01-01.json.gz ... 2023-01-31-23.json.gz ```

  • Each .json.gz file is a JSON-Lines file (one JSON object per line).
  • Every JSON object is expected to contain at least these keys:

  • impression_acc (float) – GNSS accuracy (metres)

  • device_iid_hash (str)  – Anonymised user or device ID
  • impression_lng (float) – Longitude in WGS-84
  • impression_lat (float) – Latitude in WGS-84
  • timestamp (str/int) – ISO-8601 string or Unix epoch (ms)

The loader iterates through each .json.gz in the archive, parses every line, and extracts the subset of fields listed above.

Args: root (str): Path to the directory that contains month_file (e.g. "/data/impressions"). month_file (str): Name of the ZIP archive to read (e.g. "2023-01.zip" or "london_2024-06.zip").

Returns: pandas.DataFrame: Columns → ["impression_acc", "uid", "lng", "lat", "datetime"] One row per JSON object across all .json.gz files in the archive.

Example: >>> df = readJsonFiles("/data/impressions", "2023-01.zip") >>> df.head() impression_acc uid lng lat datetime 0 6.5 a1b2c3d4e5f6g7h8 -0.12776 51.50735 2023-01-01T00:00:10Z 1 4.8 h8g7f6e5d4c3b2a1 -0.12800 51.50720 2023-01-01T00:00:11Z ...

Source code in meowmotion/process_data.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def readJsonFiles(root: str, month_file: str) -> pd.DataFrame:
    """
    Load a month-worth of impression records stored as *gzipped JSON-Lines*
    inside a ZIP archive and return them as a tidy DataFrame.

    Data-at-Rest Format
    -------------------
    The function expects the following directory / file structure:

    ``root/
        2023-01.zip              # <- month_file argument
            2023-01-01-00.json.gz
            2023-01-01-01.json.gz
            ...
            2023-01-31-23.json.gz
    ```

    * Each ``.json.gz`` file is a **JSON-Lines** file (one JSON object per line).
    * Every JSON object is expected to contain at least these keys:

      - ``impression_acc``  (float) – GNSS accuracy (metres)
      - ``device_iid_hash`` (str)   – Anonymised user or device ID
      - ``impression_lng``  (float) – Longitude in WGS-84
      - ``impression_lat``  (float) – Latitude  in WGS-84
      - ``timestamp``       (str/int) – ISO-8601 string *or* Unix epoch (ms)

    The loader iterates through each ``.json.gz`` in the archive, parses every
    line, and extracts the subset of fields listed above.

    Args:
        root (str):
            Path to the directory that contains *month_file* (e.g.
            ``"/data/impressions"``).
        month_file (str):
            Name of the ZIP archive to read
            (e.g. ``"2023-01.zip"`` or ``"london_2024-06.zip"``).

    Returns:
        pandas.DataFrame:
            Columns → ``["impression_acc", "uid", "lng", "lat", "datetime"]``
            One row per JSON object across all ``.json.gz`` files in the archive.

    Example:
        >>> df = readJsonFiles("/data/impressions", "2023-01.zip")
        >>> df.head()
             impression_acc             uid        lng        lat             datetime
        0              6.5  a1b2c3d4e5f6g7h8  -0.12776   51.50735   2023-01-01T00:00:10Z
        1              4.8  h8g7f6e5d4c3b2a1  -0.12800   51.50720   2023-01-01T00:00:11Z
        ...
    """

    print(f"{datetime.now()}: Processing {month_file}")

    data = []
    month_zip_file = f"{root}/{month_file}"

    with zipfile.ZipFile(month_zip_file, "r") as zf:
        for gz_file in zf.namelist():
            print(f"{datetime.now()}: Processing {gz_file}")
            with zf.open(gz_file) as f, gzip.GzipFile(fileobj=f, mode="r") as g:
                for line in io.TextIOWrapper(g, encoding="utf-8"):
                    json_obj = json.loads(line.strip())
                    data.append(
                        {
                            "impression_acc": json_obj.get("impression_acc"),
                            "uid": json_obj.get("device_iid_hash"),
                            "lng": json_obj.get("impression_lng"),
                            "lat": json_obj.get("impression_lat"),
                            "datetime": json_obj.get("timestamp"),
                        }
                    )

    df = pd.DataFrame(data)
    print(f"{datetime.now()}: {month_file} processed.")

    return df

saveFile(path, fname, df)

Write a pandas DataFrame to a CSV file, creating the target directory if it does not already exist.

Parameters:

Name Type Description Default
path str

Folder in which to store the file (e.g. "outputs/predictions").

required
fname str

Name of the CSV file to create (e.g. "trip_points.csv").

required
df DataFrame

The DataFrame to be saved.

required

Returns:

Type Description
None

None

Example

saveFile("outputs", "clean_points.csv", clean_df)

→ file written to outputs/clean_points.csv

Source code in meowmotion/process_data.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def saveFile(path: str, fname: str, df: pd.DataFrame) -> None:
    """
    Write a pandas DataFrame to a **CSV** file, creating the target directory
    if it does not already exist.

    Args:
        path (str): Folder in which to store the file
            (e.g. ``"outputs/predictions"``).
        fname (str): Name of the CSV file to create
            (e.g. ``"trip_points.csv"``).
        df (pd.DataFrame): The DataFrame to be saved.

    Returns:
        None

    Example:
        >>> saveFile("outputs", "clean_points.csv", clean_df)
        # → file written to outputs/clean_points.csv
    """

    if not os.path.exists(path):
        os.makedirs(path)
    df.to_csv(join(path, fname), index=False)
    return None

spatialJoin(df, shape, lng_col, lat_col, loc_type)

Spatially joins point data (supplied in df) to polygon features (supplied in shape) and appends the polygon’s code and name as new columns that are prefixed with the provided loc_type.

Workflow
  1. Convert each (lng_col, lat_col) pair into a Shapely :class:Point and wrap df into a GeoDataFrame (CRS = EPSG 4326).
  2. Perform a left, intersects-based spatial join with shape.
  3. Rename "geo_code" → f"{loc_type}_geo_code" and "name" → f"{loc_type}_name".
  4. Drop internal join artefacts (index_right and the point geometry) and return a plain pandas DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Point-level DataFrame containing longitude and latitude columns specified by lng_col and lat_col.

required
shape GeoDataFrame

Polygon layer with at least the columns ["geo_code", "name", "geometry"] (e.g. LSOAs, census tracts). Must be in CRS WGS-84 (EPSG 4326) or convertible as such.

required
lng_col str

Name of the longitude column in df.

required
lat_col str

Name of the latitude column in df.

required
loc_type str

Prefix for the new columns—commonly "origin" or "destination".

required

Returns:

Type Description

pd.DataFrame: A copy of df with two new columns:

  • f"{loc_type}_geo_code"
  • f"{loc_type}_name"

Rows that do not intersect any polygon will contain NaN in these

columns.

Example

enriched_df = spatialJoin( ... df=trip_points, ... shape=lsoa_gdf, ... lng_col="org_lng", ... lat_col="org_lat", ... loc_type="origin" ... ) enriched_df[["origin_geo_code", "origin_name"]].head()

Source code in meowmotion/process_data.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
def spatialJoin(
    df: pd.DataFrame,
    shape: gpd.GeoDataFrame,
    lng_col: str,
    lat_col: str,
    loc_type: str,  # 'origin' or 'destination'
):
    """
    Spatially joins point data (supplied in *df*) to polygon features
    (supplied in *shape*) and appends the polygon’s code and name as new
    columns that are prefixed with the provided *loc_type*.

    Workflow:
        1. Convert each ``(lng_col, lat_col)`` pair into a Shapely
           :class:`Point` and wrap *df* into a GeoDataFrame (CRS = EPSG 4326).
        2. Perform a left, *intersects*-based spatial join with *shape*.
        3. Rename ``"geo_code" → f"{loc_type}_geo_code"`` and
           ``"name" → f"{loc_type}_name"``.
        4. Drop internal join artefacts (``index_right`` and the point
           ``geometry``) and return a plain pandas DataFrame.

    Args:
        df (pd.DataFrame):
            Point-level DataFrame containing longitude and latitude columns
            specified by *lng_col* and *lat_col*.
        shape (gpd.GeoDataFrame):
            Polygon layer with at least the columns
            ``["geo_code", "name", "geometry"]`` (e.g. LSOAs, census tracts).
            Must be in CRS WGS-84 (EPSG 4326) or convertible as such.
        lng_col (str): Name of the longitude column in *df*.
        lat_col (str): Name of the latitude column in *df*.
        loc_type (str): Prefix for the new columns—commonly ``"origin"`` or
            ``"destination"``.

    Returns:
        pd.DataFrame: A copy of *df* with two new columns:

        * ``f"{loc_type}_geo_code"``
        * ``f"{loc_type}_name"``

        Rows that do not intersect any polygon will contain ``NaN`` in these
        columns.

    Example:
        >>> enriched_df = spatialJoin(
        ...     df=trip_points,
        ...     shape=lsoa_gdf,
        ...     lng_col="org_lng",
        ...     lat_col="org_lat",
        ...     loc_type="origin"
        ... )
        >>> enriched_df[["origin_geo_code", "origin_name"]].head()
    """

    geometry = [Point(xy) for xy in zip(df[lng_col], df[lat_col])]
    geo_df = gpd.GeoDataFrame(df, geometry=geometry, crs="EPSG:4326")
    geo_df.sindex
    geo_df = gpd.sjoin(
        geo_df,
        shape[["geo_code", "name", "geometry"]],
        how="left",
        predicate="intersects",
    )
    col_rename_dict = {
        "geo_code": f"{loc_type}_geo_code",
        "name": f"{loc_type}_name",
    }
    geo_df = geo_df.rename(columns=col_rename_dict)
    geo_df = geo_df.drop(columns=["index_right", "geometry"])
    geo_df = geo_df.reset_index(drop=True)
    geo_df = pd.DataFrame(geo_df)
    return geo_df