import kwarray
import numpy as np
import ubelt as ub
from scipy.spatial.distance import pdist
[docs]
class Boids(ub.NiceRepr):
"""
Efficient numpy based backend for generating boid positions.
BOID = bird-oid object
References:
https://www.youtube.com/watch?v=mhjuuHl6qHM
https://medium.com/better-programming/boids-simulating-birds-flock-behavior-in-python-9fff99375118
https://en.wikipedia.org/wiki/Boids
Example:
>>> from kwcoco.demo.boids import * # NOQA
>>> num_frames = 10
>>> num_objects = 3
>>> rng = None
>>> self = Boids(num=num_objects, rng=rng).initialize()
>>> paths = self.paths(num_frames)
>>> #
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> plt = kwplot.autoplt()
>>> from mpl_toolkits.mplot3d import Axes3D # NOQA
>>> ax = plt.gca(projection='3d')
>>> ax.cla()
>>> #
>>> for path in paths:
>>> time = np.arange(len(path))
>>> ax.plot(time, path.T[0] * 1, path.T[1] * 1, ',-')
>>> ax.set_xlim(0, num_frames)
>>> ax.set_ylim(-.01, 1.01)
>>> ax.set_zlim(-.01, 1.01)
>>> ax.set_xlabel('time')
>>> ax.set_ylabel('u-pos')
>>> ax.set_zlabel('v-pos')
>>> kwplot.show_if_requested()
import xdev
_ = xdev.profile_now(self.compute_forces)()
_ = xdev.profile_now(self.update_neighbors)()
Ignore:
self = Boids(num=5, rng=0).initialize()
self.pos
fig = kwplot.figure(fnum=10, do_clf=True)
ax = fig.gca()
verts = np.array([[0, 0], [1, 0], [0.5, 2]])
com = verts.mean(axis=0)
verts = (verts - com) * 0.02
import kwimage
poly = kwimage.Polygon(exterior=verts)
def rotate(poly, theta):
sin_ = np.sin(theta)
cos_ = np.cos(theta)
rot_ = np.array(((cos_, -sin_),
(sin_, cos_),))
return poly.warp(rot_)
for _ in xdev.InteractiveIter(list(range(10000))):
self.step()
ax.cla()
import math
for rx in range(len(self.pos)):
x, y = self.pos[rx]
dx, dy = (self.vel[rx] / np.linalg.norm(self.vel[rx], axis=0))
theta = (np.arctan2(dy, dx) - math.tau / 4)
boid_poly = rotate(poly, theta).translate(self.pos[rx])
color = 'red' if rx == 0 else 'blue'
boid_poly.draw(ax=ax, color=color)
tip = boid_poly.data['exterior'].data[2]
tx, ty = tip
s = 100.0
vel = self.vel[rx]
acc = self.acc[rx]
com = self.acc[rx]
spsteer = self.sep_steering[rx]
cmsteer = self.com_steering[rx]
alsteer = self.align_steering[rx]
avsteer = self.avoid_steering[rx]
# plt.arrow(tip[0], tip[1], s * vel[0], s * vel[1], color='green')
plt.arrow(tip[0], tip[1], s * acc[1], s * acc[1], color='purple')
plt.arrow(tip[0], tip[1], s * spsteer[0], s * spsteer[1], color='dodgerblue')
plt.arrow(tip[0], tip[1], s * cmsteer[0], s * cmsteer[1], color='orange')
plt.arrow(tip[0], tip[1], s * alsteer[0], s * alsteer[1], color='pink')
plt.arrow(tip[0], tip[1], s * avsteer[0], s * avsteer[1], color='black')
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
xdev.InteractiveIter.draw()
rx = 0
Example:
>>> # Test determenism
>>> from kwcoco.demo.boids import * # NOQA
>>> num_frames = 2
>>> num_objects = 1
>>> rng = 4532
>>> self = Boids(num=num_objects, rng=rng).initialize()
>>> #print(ub.hash_data(self.pos))
>>> #print(ub.hash_data(self.vel))
>>> #print(ub.hash_data(self.acc))
>>> tocheck = []
>>> for i in range(100):
>>> self = Boids(num=num_objects, rng=rng).initialize()
>>> self.step()
>>> self.step()
>>> self.step()
>>> tocheck.append(self.pos.copy())
>>> assert ub.allsame(list(map(ub.hash_data, tocheck)))
"""
def __init__(self, num, dims=2, rng=None, **kwargs):
self.rng = kwarray.ensure_rng(rng)
self.num = num
self.dims = dims
# self.config = {
# 'perception_thresh': 0.08,
# 'max_speed': 0.005,
# 'max_force': 0.0003,
# }
self.config = {
'perception_thresh': 0.2,
'max_speed': 0.01,
'max_force': 0.001,
'damping': 0.99,
}
self.config.update(ub.dict_isect(kwargs, self.config))
self.pos = None
self.vel = None
self.acc = None
def __nice__(self):
return '{}'.format(self.num)
[docs]
def initialize(self):
# Generate random starting positions, velocities, and accelerations
self.pos = self.rng.rand(self.num, self.dims)
self.vel = self.rng.randn(self.num, self.dims) * self.config['max_speed']
self.acc = self.rng.randn(self.num, self.dims) * self.config['max_force']
return self
[docs]
def update_neighbors(self):
# TODO: this should be done with a fast spatial index, but
# unfortunately I don't see any existing implementations that make it
# easy to support moving points.
utriu_dists = pdist(self.pos)
utriu_flags = utriu_dists < self.config['perception_thresh']
utriu_rx, utriu_cx = np.triu_indices(len(self.pos), k=1)
utriu_neighb_rxs = utriu_rx[utriu_flags]
utriu_neighb_cxs = utriu_cx[utriu_flags]
neighb_rxs = np.r_[utriu_neighb_rxs, utriu_neighb_cxs]
neighb_cxs = np.r_[utriu_neighb_cxs, utriu_neighb_rxs]
group_rxs, groupxs = kwarray.group_indices(neighb_rxs)
group_cxs = kwarray.apply_grouping(neighb_cxs, groupxs)
rx_to_neighb_cxs = ub.dzip(group_rxs, group_cxs)
# n = len(self.pos)
# rx_to_neighb_utriu_idxs = {}
# for rx, cxs in rx_to_neighb_cxs.items():
# rxs = np.full_like(cxs, fill_value=rx)
# multi_index = (rxs, cxs)
# utriu_idxs = triu_condense_multi_index(
# multi_index, dims=(n, n), symetric=True)
# rx_to_neighb_utriu_idxs[rx] = utriu_idxs
# self.utriu_dists = utriu_dists
self.rx_to_neighb_cxs = rx_to_neighb_cxs
# self.rx_to_neighb_utriu_idxs = rx_to_neighb_utriu_idxs
# Compute speed and direction of every boid
self.speeds = np.linalg.norm(self.vel, axis=1)
self.dirs = self.vel / np.maximum(self.speeds[:, None], 1e-9)
[docs]
def compute_forces(self):
self.update_neighbors()
max_speed = self.config['max_speed']
# Randomly drop perception of neighbors
# neighbors[self.rng.rand(*neighbors.shape) > 0.3] = 0
num = len(self.pos)
align_steering = np.zeros((num, 2))
com_steering = np.zeros((num, 2))
sep_steering = np.zeros((num, 2))
for rx in self.rx_to_neighb_cxs.keys():
cxs = self.rx_to_neighb_cxs[rx]
# Alignment
# Each boid wants its direction to agree with the
# the average direction of its neighbors
if 1:
neigh_vel = self.vel[cxs]
ave_vel = neigh_vel.mean(axis=0)
ave_speed = np.linalg.norm(ave_vel)
if ave_speed > 0:
ave_dir = (ave_vel / ave_speed) * max_speed
else:
ave_dir = ave_vel
align_steering_ = ave_dir - self.vel[rx]
align_steering[rx] = align_steering_
# Cohesion
# Each boid wants to be in the center of its neighbors
if 1:
center_of_mass = self.pos[cxs].mean(axis=0)
com_vec = center_of_mass - self.pos[rx]
com_dist = np.linalg.norm(com_vec)
if com_dist > 0:
com_dir = (com_vec / com_dist) * max_speed
else:
com_dir = com_vec
com_steering_ = com_dir - self.vel[rx]
com_steering[rx] = com_steering_
# Separation
# Each boid does not want to be too close to its neighbors
if 1:
neigh_vec = self.pos[rx] - self.pos[cxs]
# utriu_idxs = self.rx_to_neighb_utriu_idxs[rx]
# neigh_dist1 = self.utriu_dists[utriu_idxs][:, None]
# neigh_dist = neigh_dist1
neigh_dist2 = np.linalg.norm(neigh_vec, axis=1, keepdims=1)
neigh_dist = neigh_dist2
assert neigh_dist.max() <= self.config['perception_thresh']
flags = neigh_dist.ravel() > 0
# neigh_dir = neigh_vec[flags]
# neigh_dir = neigh_vec[flags] / neigh_dist[flags]
neigh_dir = neigh_vec[flags] / (neigh_dist[flags] ** 2)
ave_neigh_dir = neigh_dir.mean(axis=0)
sep_steering_ = ave_neigh_dir - self.vel[rx]
sep_steering[rx] = sep_steering_
def dist_point_to_line(line_pts, pt):
pt1, pt2 = line_pts
x1, y1 = pt1
x2, y2 = pt2
x0, y0 = pt
numer = abs((y2 - y1) * x0 - (x2 - x1) * y0 + x2 * y1 - y2 * x1)
denom = np.sqrt((y2 - y1) ** 2 + (x2 - x1) ** 2)
dist = numer / denom
return dist
# align_steering[
# com_steering = clamp_mag(com_steering, self.config['max_force'], axis=None)
align_steering = clamp_mag(align_steering, 0.33 * self.config['max_force'], axis=None)
com_steering = clamp_mag(com_steering, 0.33 * self.config['max_force'], axis=None)
# Separation and obstical avoidance should override alignment and COM
sep_steering = clamp_mag(sep_steering, 1.0 * self.config['max_force'], axis=None)
# Add some small random movement
rand_steering = clamp_mag(self.rng.randn(*self.pos.shape), 0.08 * self.config['max_force'], axis=None)
self.sep_steering = sep_steering
self.com_steering = com_steering
self.align_steering = align_steering
self.rand_steering = rand_steering
steering = sum([
com_steering,
align_steering,
sep_steering,
rand_steering,
])
if 1:
# # Edge avoidance
# # Each boid does not want to hit an edge
avoid_steering = np.zeros_like(self.pos)
edge_thresh = self.config['perception_thresh'] * 1
edges = {
'bot': np.array([(0, 0), (1, 0)]),
'left': np.array([(0, 0), (0, 1)]),
'top': np.array([(0, 1), (1, 1)]),
'right': np.array([(1, 0), (1, 1)]),
}
for edge_name, edge in edges.items():
e1, e2 = edge
edge_pt = closest_point_on_line_segment(self.pos, e1, e2)
edge_vec = self.pos - edge_pt
edge_dist = np.linalg.norm(edge_vec, axis=1, keepdims=1)
flags = ((edge_dist < edge_thresh) & (edge_dist > 0)).ravel()
rxs = np.where(flags)[0]
if len(rxs):
avoid_vec = edge_vec[rxs]
avoid_dist = edge_dist[rxs]
avoid_vec = avoid_vec / (avoid_dist ** 3)
avoid_steering_ = avoid_vec - self.vel[rxs]
avoid_steering[rxs] += avoid_steering_
avoid_steering = clamp_mag(avoid_steering, 1.0 * self.config['max_force'], axis=None)
steering += avoid_steering
self.avoid_steering = avoid_steering
return steering
[docs]
def boundary_conditions(self):
# Clamp positions
lower_boundry_violators = (self.pos < 0)
upper_boundry_violators = (self.pos > 1)
if np.any(lower_boundry_violators):
self.pos[lower_boundry_violators] = 0
self.vel[lower_boundry_violators] *= -1.0 # bounce
self.acc[lower_boundry_violators] *= -1.0
if np.any(upper_boundry_violators):
self.pos[upper_boundry_violators] = 1
self.vel[upper_boundry_violators] *= -1.0
self.acc[upper_boundry_violators] *= -1.0
[docs]
def step(self):
"""
Update positions, velocities, and accelerations
"""
self.boundary_conditions()
force = self.compute_forces()
self.acc += force
self.pos += self.vel
self.vel += self.acc
self.acc = clamp_mag(self.acc, self.config['max_force'], axis=1)
self.vel = clamp_mag(self.vel, self.config['max_speed'], axis=1)
# Dampen acceleration
self.acc[:] *= max(1, min(0, (1 - self.config['damping'])))
# self.acc[:] = 0
self.boundary_conditions()
return self.pos
[docs]
def paths(self, num_steps):
positions = []
for _ in range(num_steps):
pos = self.step().copy()
positions.append(pos)
paths = np.concatenate([p[:, None] for p in positions], axis=1)
return paths
[docs]
def clamp_mag(vec, mag, axis=None):
"""
vec = np.random.rand(10, 2)
mag = 1.0
axis = 1
new_vec = clamp_mag(vec, mag, axis)
np.linalg.norm(new_vec, axis=axis)
"""
norm = np.linalg.norm(vec, axis=axis, keepdims=True)
flags = norm > mag
flags = np.squeeze(flags, axis)
vec[flags] = (vec[flags] / norm[flags]) * mag
return vec
[docs]
def triu_condense_multi_index(multi_index, dims, symetric=False):
r"""
Like np.ravel_multi_index but returns positions in an upper triangular
condensed square matrix
Examples:
multi_index (Tuple[ArrayLike]):
indexes for each dimension into the square matrix
dims (Tuple[int]):
shape of each dimension in the square matrix (should all be the
same)
symetric (bool):
if True, converts lower triangular indices to their upper
triangular location. This may cause a copy to occur.
References:
https://stackoverflow.com/a/36867493/887074
https://numpy.org/doc/stable/reference/generated/numpy.ravel_multi_index.html#numpy.ravel_multi_index
Examples:
>>> dims = (3, 3)
>>> symetric = True
>>> multi_index = (np.array([0, 0, 1]), np.array([1, 2, 2]))
>>> condensed_idxs = triu_condense_multi_index(multi_index, dims, symetric=symetric)
>>> assert condensed_idxs.tolist() == [0, 1, 2]
>>> n = 7
>>> symetric = True
>>> multi_index = np.triu_indices(n=n, k=1)
>>> condensed_idxs = triu_condense_multi_index(multi_index, [n] * 2, symetric=symetric)
>>> assert condensed_idxs.tolist() == list(range(n * (n - 1) // 2))
>>> from scipy.spatial.distance import pdist, squareform
>>> square_mat = np.zeros((n, n))
>>> conden_mat = squareform(square_mat)
>>> conden_mat[condensed_idxs] = np.arange(len(condensed_idxs)) + 1
>>> square_mat = squareform(conden_mat)
>>> print('square_mat =\n{}'.format(ub.urepr(square_mat, nl=1)))
>>> n = 7
>>> symetric = True
>>> multi_index = np.tril_indices(n=n, k=-1)
>>> condensed_idxs = triu_condense_multi_index(multi_index, [n] * 2, symetric=symetric)
>>> assert sorted(condensed_idxs.tolist()) == list(range(n * (n - 1) // 2))
>>> from scipy.spatial.distance import pdist, squareform
>>> square_mat = np.zeros((n, n))
>>> conden_mat = squareform(square_mat, checks=False)
>>> conden_mat[condensed_idxs] = np.arange(len(condensed_idxs)) + 1
>>> square_mat = squareform(conden_mat)
>>> print('square_mat =\n{}'.format(ub.urepr(square_mat, nl=1)))
Ignore:
>>> import xdev
>>> n = 30
>>> symetric = True
>>> multi_index = np.triu_indices(n=n, k=1)
>>> condensed_idxs = xdev.profile_now(triu_condense_multi_index)(multi_index, [n] * 2)
# Numba helps here when ub.allsame is gone
from numba import jit
triu_condense_multi_index2 = jit(nopython=True)(triu_condense_multi_index)
triu_condense_multi_index2 = jit()(triu_condense_multi_index)
triu_condense_multi_index2(multi_index, [n] * 2)
%timeit triu_condense_multi_index(multi_index, [n] * 2)
%timeit triu_condense_multi_index2(multi_index, [n] * 2)
"""
if len(dims) != 2:
raise NotImplementedError('only 2d matrices for now')
if not ub.allsame(dims):
raise NotImplementedError('only square matrices for now')
rxs, cxs = multi_index
triu_flags = rxs < cxs
if not np.all(triu_flags):
if np.any(rxs == cxs):
raise NotImplementedError(
'multi_index contains diagonal elements, which are not '
'allowed in a condensed matrix')
tril_flags = ~triu_flags
if not symetric:
raise ValueError(
'multi_index cannot contain inputs from '
'lower triangle unless symetric=True')
else:
rxs = rxs.copy()
cxs = cxs.copy()
_tmp_rxs = rxs[tril_flags]
rxs[tril_flags] = cxs[tril_flags]
cxs[tril_flags] = _tmp_rxs
n = dims[0]
# Let i = rxs
# Let j = cxs
# with i*n + j you go to the position in the square-formed matrix;
# with - i*(i+1)/2 you remove lower triangle (including diagonal) in all lines before i;
# with - i you remove positions in line i before the diagonal;
# with - 1 you remove positions in line i on the diagonal.
"""
import sympy
rxs, n, cxs = sympy.symbols(['rxs', 'n', 'cxs'])
condensed_indices = (n * rxs + cxs) - (rxs * (rxs + 1) // 2) - rxs - 1
sympy.simplify(condensed_indices)
%timeit cxs + (n - 1) * rxs - rxs*(rxs + 1)//2 - 1
%timeit (n * rxs + cxs) - (rxs * (rxs + 1) // 2) - rxs - 1
"""
condensed_indices = cxs + (n - 1) * rxs - (rxs * (rxs + 1) // 2) - 1
# condensed_indices = (n * rxs + cxs) - (rxs * (rxs + 1) // 2) - rxs - 1
return condensed_indices
[docs]
def _spatial_index_scratch():
"""
Ignore:
!pip install git+git://github.com/carsonfarmer/fastpair.git
from fastpair import FastPair
fp = FastPair()
fp.build(list(map(tuple, self.pos.tolist()))) #
!pip install pyqtree
from pyqtree import Index
spindex = Index(bbox=(0, 0, 1., 1.))
# this example assumes you have a list of items with bbox attribute
for item in items:
spindex.insert(item, item.bbox)
https://stackoverflow.com/questions/41946007/efficient-and-well-explained-implementation-of-a-quadtree-for-2d-collision-det
!pip install grispy
import grispy
index = grispy.GriSPy(data=self.pos)
index.bubble_neighbors(self.pos, distance_upper_bound=0.1)
"""
[docs]
def closest_point_on_line_segment(pts, e1, e2):
"""
Finds the closet point from p on line segment (e1, e2)
Args:
pts (ndarray): xy points [Nx2]
e1 (ndarray): the first xy endpoint of the segment
e2 (ndarray): the second xy endpoint of the segment
Returns:
ndarray: pt_on_seg - the closest xy point on (e1, e2) from ptp
References:
http://en.wikipedia.org/wiki/Distance_from_a_point_to_a_line
http://stackoverflow.com/questions/849211/shortest-distance-between-a-point-and-a-line-segment
Example:
>>> # ENABLE_DOCTEST
>>> from kwcoco.demo.boids import * # NOQA
>>> verts = np.array([[ 21.83012702, 13.16987298],
>>> [ 16.83012702, 21.83012702],
>>> [ 8.16987298, 16.83012702],
>>> [ 13.16987298, 8.16987298],
>>> [ 21.83012702, 13.16987298]])
>>> rng = np.random.RandomState(0)
>>> pts = rng.rand(64, 2) * 20 + 5
>>> e1, e2 = verts[0:2]
>>> closest_point_on_line_segment(pts, e1, e2)
Ignore:
from numba import jit
closest_point_on_line_segment2 = jit(closest_point_on_line_segment)
closest_point_on_line_segment2(pts, e1, e2)
%timeit closest_point_on_line_segment(pts, e1, e2)
%timeit closest_point_on_line_segment2(pts, e1, e2)
"""
# shift e1 to origin
de = (e2 - e1)[None, :]
# make point vector wrt orgin
pv = pts - e1
# Project pv onto de
mag = np.linalg.norm(de, axis=1)
de_norm = (de / mag)
pt_on_line_ = pv.dot(de_norm.T) * de_norm
# Check if normalized dot product is between 0 and 1
# Determines if pt is between 0,0 and de
t = (de.dot(pt_on_line_.T) / (mag ** 2))[0]
# t is an interpolation factor indicating how far past the line segment we
# are. We are on the line segment if it is in the range 0 to 1.
oob_left = t < 0
oob_right = t > 1
# Compute the point on the extended line defined by the line segment.
pt_on_seg = pt_on_line_ + e1
# Clamp to the endpoints if out of bounds
pt_on_seg[oob_left] = e1
pt_on_seg[oob_right] = e2
return pt_on_seg
[docs]
def _pygame_render_boids():
"""
Fast and responsive BOID rendering. This is an easter egg.
Requirements:
pip install pygame
CommandLine:
python -m kwcoco.demo.boids
pip install pygame kwcoco -U && python -m kwcoco.demo.boids
"""
ASK_TO_INSTALL = 1
if ASK_TO_INSTALL:
if ub.modname_to_modpath('pygame') is None:
from rich.prompt import Confirm
ans = Confirm.ask('Pygame was not detected. Do you want to install it?')
if ans:
import subprocess
import sys
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'pygame'])
try:
import pygame
except ImportError:
raise Exception('Please pip install pygame before using this function')
pygame.init()
display_info = pygame.display.Info()
w = display_info.current_w // 2
h = display_info.current_h // 2
screen = pygame.display.set_mode([w, h], flags=pygame.RESIZABLE)
pygame.display.set_caption('YEAH BOID!')
strokeweight = 2
kw = {
'perception_thresh': 0.20,
'max_speed': 0.005,
'max_force': 0.0003,
'damping': 0.99,
}
flock = Boids(256, **kw).initialize()
try:
while True:
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
screen.fill((255, 255, 255))
positions = flock.step()
for pos in positions:
x, y = pos * (w, h)
r = 8
pygame.draw.ellipse(screen, (255, 0, 0), (x, y, r, r))
pygame.draw.ellipse(screen, (0, 0, 0), (x, y, r, r), strokeweight)
pygame.display.flip()
except pygame.error:
print('exiting')
[docs]
def _yeah_boid():
_pygame_render_boids()
if __name__ == '__main__':
_yeah_boid()