"""
A helper for converting COCO to / from KW18 format.
KW18 File Format
https://docs.google.com/spreadsheets/d/1DFCwoTKnDv8qfy3raM7QXtir2Fjfj9j8-z8px5Bu0q8/edit#gid=10
The kw18.trk files are text files, space delimited; each row is one
frame of one track and all rows have the same number of columns. The fields are:
.. code ::
01) track_ID : identifies the track
02) num_frames: number of frames in the track
03) frame_id : frame number for this track sample
04) loc_x : X-coordinate of the track (image/ground coords)
05) loc_y : Y-coordinate of the track (image/ground coords)
06) vel_x : X-velocity of the object (image/ground coords)
07) vel_y : Y-velocity of the object (image/ground coords)
08) obj_loc_x : X-coordinate of the object (image coords)
09) obj_loc_y : Y-coordinate of the object (image coords)
10) bbox_min_x : minimum X-coordinate of bounding box (image coords)
11) bbox_min_y : minimum Y-coordinate of bounding box (image coords)
12) bbox_max_x : maximum X-coordinate of bounding box (image coords)
13) bbox_max_y : maximum Y-coordinate of bounding box (image coords)
14) area : area of object (pixels)
15) world_loc_x : X-coordinate of object in world
16) world_loc_y : Y-coordinate of object in world
17) world_loc_z : Z-coordiante of object in world
18) timestamp : timestamp of frame (frames)
For the location and velocity of object centroids, use fields 4-7.
Bounding box is specified using coordinates of the top-left and bottom
right corners. Fields 15-17 may be ignored.
The kw19.trk and kw20.trk files, when present, add the following field(s):
19) object class: estimated class of the object, either 1 (person), 2
(vehicle), or 3 (other).
20) Activity ID -- refer to activities.txt for index and list of activities.
"""
import kwarray
import numpy as np
[docs]
class KW18(kwarray.DataFrameArray):
"""
A DataFrame like object that stores KW18 column data
Example:
>>> import kwcoco
>>> from kwcoco.kw18 import KW18
>>> coco_dset = kwcoco.CocoDataset.demo('shapes')
>>> kw18_dset = KW18.from_coco(coco_dset)
>>> print(kw18_dset.pandas())
"""
# Define the ordering of the kw18 columns
DEFAULT_COLUMNS = [
'track_id', # 1
'track_length', # 2
'frame_number', # 3
'tracking_plane_loc_x', 'tracking_plane_loc_y', # 4-5
'velocity_x', 'velocity_y', # 6-7
'image_loc_x', 'image_loc_y', # 8-9
'img_bbox_tl_x', 'img_bbox_tl_y', # 10-13
'img_bbox_br_x', 'img_bbox_br_y',
'area', # 14
'world_loc_x', 'world_loc_y', 'world_loc_z', # 15-17
'timestamp', # 18
# kw18 can have more than 18 columns.
'confidence', # 19
'object_type_id', # 20
'activity_type_id', # 21
]
def __init__(self, data):
"""
Args:
data : the kw18 data frame.
"""
super().__init__(data)
[docs]
@classmethod
def demo(KW18):
import kwcoco
coco_dset = kwcoco.CocoDataset.demo('shapes8')
self = KW18.from_coco(coco_dset)
return self
[docs]
@classmethod
def from_coco(KW18, coco_dset):
import kwimage
raw = {col: None for col in KW18.DEFAULT_COLUMNS}
anns = coco_dset.dataset['annotations']
boxes = kwimage.Boxes(np.array([ann['bbox'] for ann in anns]), 'xywh')
tlbr = boxes.to_ltrb()
cxywh = tlbr.to_cxywh()
tl_x, tl_y, br_x, br_y = tlbr.data.T
cx = cxywh.data[:, 0]
cy = cxywh.data[:, 1]
# Create track ids if not given
track_ids = np.array([ann.get('track_id', np.nan) for ann in anns])
missing = np.isnan(track_ids)
valid_track_ids = track_ids[~missing]
if len(valid_track_ids) == 0:
next_track_id = 1
else:
next_track_id = valid_track_ids.max() + 1
num_need = np.sum(missing)
new_track_ids = np.arange(next_track_id, next_track_id + num_need)
track_ids[missing] = new_track_ids
track_ids = track_ids.astype(int)
scores = np.array([ann.get('score', -1) for ann in anns])
image_ids = np.array([ann['image_id'] for ann in anns])
cids = np.array([ann.get('category_id', -1) for ann in anns])
num = len(anns)
raw['track_id'] = track_ids
raw['track_length'] = np.full(num, fill_value=-1)
raw['frame_number'] = image_ids
raw['tracking_plane_loc_x'] = cx
raw['tracking_plane_loc_y'] = cy
raw['velocity_x'] = np.full(num, fill_value=0)
raw['velocity_y'] = np.full(num, fill_value=0)
raw['image_loc_x'] = cx
raw['image_loc_y'] = cy
raw['img_bbox_tl_x'] = tl_x
raw['img_bbox_tl_y'] = tl_y
raw['img_bbox_br_x'] = br_x
raw['img_bbox_br_y'] = br_y
raw['area'] = boxes.area.ravel()
raw['world_loc_x'] = np.full(num, fill_value=-1)
raw['world_loc_y'] = np.full(num, fill_value=-1)
raw['world_loc_z'] = np.full(num, fill_value=-1)
raw['timestamp'] = np.full(num, fill_value=-1)
raw['confidence'] = scores
raw['object_type_id'] = cids
raw = {k: v for k, v in raw.items() if v is not None}
track_ids, groupxs = kwarray.group_indices(raw['track_id'])
for groupx in groupxs:
raw['track_length'][groupx] = len(groupx)
self = KW18(raw)
return self
[docs]
def to_coco(self, image_paths=None, video_name=None):
"""
Translates a kw18 files to a CocoDataset.
Note:
kw18 does not contain complete information, and as such
the returned coco dataset may need to be augmented.
Args:
image_paths (Dict[int, str] | None):
if specified, maps frame numbers to image file paths.
video_name (str | None):
if specified records the name of the video this kw18 belongs to
TODO:
- [X] allow kwargs to specify path to frames / videos
Example:
>>> from kwcoco.kw18 import KW18
>>> import ubelt as ub
>>> import kwimage
>>> import kwcoco
>>> # Prep test data - autogen a demo kw18 and write it to disk
>>> dpath = ub.Path.appdir('kwcoco/kw18').ensuredir()
>>> kw18_fpath = ub.Path(dpath) / 'test.kw18'
>>> KW18.demo().dump(kw18_fpath)
>>> #
>>> # Load the kw18 file
>>> self = KW18.load(kw18_fpath)
>>> # Pretend that these image correspond to kw18 frame numbers
>>> frame_names= kwcoco.CocoDataset.demo('shapes8').images().lookup('file_name')
>>> frame_ids = sorted(set(self['frame_number']))
>>> image_paths = dict(zip(frame_ids, frame_names))
>>> #
>>> # Convert the kw18 to kwcoco and specify paths to images
>>> coco_dset = self.to_coco(image_paths=image_paths, video_name='dummy.mp4')
>>> #
>>> # Now we can draw images
>>> canvas = coco_dset.draw_image(1)
>>> # xdoctest: +REQUIRES(--draw)
>>> kwimage.imwrite('foo.jpg', canvas)
>>> # Draw all iamges
>>> for gid in coco_dset.imgs.keys():
>>> canvas = coco_dset.draw_image(gid)
>>> fpath = dpath / 'gid_{}.jpg'.format(gid)
>>> print('write fpath = {!r}'.format(fpath))
>>> kwimage.imwrite(fpath, canvas)
"""
import kwcoco
import ubelt as ub
dset = kwcoco.CocoDataset()
# kw18s don't have category names, so use ids as proxies
unique_category_ids = sorted(set(self['object_type_id']))
for cid in unique_category_ids:
dset.ensure_category('class_{}'.format(cid), id=cid)
unique_frame_idxs = ub.argunique(self['frame_number'])
# kw18 files correspond to one video
vidid = 1
dset.add_video(id=vidid, name='unknown_kw18_video')
# Index frames of the video
for idx in unique_frame_idxs:
frame_num = self['frame_number'][idx]
timestamp = self['timestamp'][idx]
if image_paths and frame_num in image_paths:
file_name = image_paths[frame_num]
else:
file_name = '<unknown_image_{}>'.format(frame_num)
dset.add_image(
id=frame_num,
file_name=file_name,
video_id=vidid,
frame_index=frame_num,
timestamp=timestamp
)
for rx, row in self.iterrows():
tl_x = row['img_bbox_tl_x']
tl_y = row['img_bbox_tl_y']
br_x = row['img_bbox_br_x']
br_y = row['img_bbox_br_y']
w = br_x - tl_x
h = br_y - tl_y
bbox = [tl_x, tl_y, w, h]
world_loc = (row['world_loc_x'], row['world_loc_y'], row['world_loc_z'])
velocity = (row['velocity_x'], row['velocity_y'])
kw = {}
if 'confidence' in row:
kw['score'] = row['confidence']
dset.add_annotation(
id=rx,
image_id=row['frame_number'],
category_id=row['object_type_id'],
track_id=row['track_id'],
bbox=bbox,
area=row['area'],
velocity=velocity,
world_loc=world_loc,
**kw)
return dset
[docs]
@classmethod
def load(KW18, file):
"""
Example:
>>> import kwcoco
>>> from kwcoco.kw18 import KW18
>>> coco_dset = kwcoco.CocoDataset.demo('shapes')
>>> kw18_dset = KW18.from_coco(coco_dset)
>>> print(kw18_dset.pandas())
"""
import pandas as pd
try:
EmptyDataError = pd.errors.EmptyDataError
except Exception:
EmptyDataError = pd.io.common.EmptyDataError
try:
df = pd.read_csv(
file, sep=' +', comment='#', header=None, engine='python')
except EmptyDataError:
df = pd.DataFrame()
renamer = dict(zip(df.columns, KW18.DEFAULT_COLUMNS))
raw = df.rename(columns=renamer)
raw = _ensure_kw18_column_order(raw)
self = KW18(raw)
return self
[docs]
@classmethod
def loads(KW18, text):
"""
Example:
>>> self = KW18.demo()
>>> text = self.dumps()
>>> self2 = KW18.loads(text)
>>> empty = KW18.loads('')
"""
import io
file = io.StringIO()
file.write(text)
file.seek(0)
self = KW18.load(file)
return self
[docs]
def dump(self, file):
import os
if isinstance(file, (str, os.PathLike)):
with open(file, 'w') as fp:
self.dump(fp)
else:
df = self.pandas()
# Write column header
file.write('#' + ' '.join(df.columns) + '\n')
df.to_csv(file, sep=' ', mode='a', index=False, header=False)
[docs]
def dumps(self):
"""
Example:
>>> self = KW18.demo()
>>> text = self.dumps()
>>> print(text)
"""
import io
file = io.StringIO()
self.dump(file)
file.seek(0)
text = file.read()
return text
[docs]
def _ensure_kw18_column_order(df):
"""
Ensure expected kw18 columns exist and are in the correct order.
Example:
>>> import pandas as pd
>>> df = pd.DataFrame(columns=KW18.DEFAULT_COLUMNS[0:18])
>>> _ensure_kw18_column_order(df)
>>> df = pd.DataFrame(columns=KW18.DEFAULT_COLUMNS[0:19])
>>> _ensure_kw18_column_order(df)
>>> df = pd.DataFrame(columns=KW18.DEFAULT_COLUMNS[0:18] + KW18.DEFAULT_COLUMNS[20:21])
>>> assert np.all(_ensure_kw18_column_order(df).columns == df.columns)
"""
columns = list(KW18.DEFAULT_COLUMNS)
# Columns after the 18th are optional
# (note: the post 18th column spec not well defined in general)
optional_columns = KW18.DEFAULT_COLUMNS[18:]
for col in optional_columns[::-1]:
if col not in df.columns:
columns.remove(col)
if len(df) == 0:
# Ensure empty data frames have columns
df = df.reindex(columns=columns)
missing_cols = [c for c in columns if c not in df.columns]
unknown_cols = [c for c in df.columns if c not in columns]
if missing_cols:
raise ValueError('missing_cols = {!r}'.format(missing_cols))
if unknown_cols:
raise ValueError('unknown_cols = {!r}'.format(unknown_cols))
df = df.reindex(columns=columns)
return df