Numpy
Basic
类别 | 方法列表 |
---|---|
创建 | array , zeros , ones , empty , arange , linspace , logspace , eye , diag , frombuffer , fromfunction |
变形 | reshape , resize , flatten , ravel , transpose , swapaxes , broadcast_to |
数学 | add , subtract , multiply , divide , power , sqrt , exp , log , sin , cos , tan , arcsin |
统计 | sum , mean , std , var , min , max , argmin , argmax , median , percentile , corrcoef |
线性代数 | dot , matmul , inner , outer , linalg.inv , linalg.det , linalg.eig , linalg.svd |
逻辑 | logical_and , logical_or , logical_not , all , any , isnan , isinf |
集合 | unique , in1d , intersect1d , union1d , setdiff1d , setxor1d |
文件 | save , load , savez , savetxt , loadtxt , genfromtxt |
随机 | random.rand , random.randn , random.randint , random.choice , random.shuffle |
创建
import numpy as np
# ---------------- 创建 ----------------
a = np.array([1, 2, 3]) # 列表 → 数组
z = np.zeros((2, 3)) # 全 0
o = np.ones((2, 3)) # 全 1
e = np.empty((2, 2)) # 未初始化
ar = np.arange(0, 10, 2) # 等差
li = np.linspace(0, 1, 5) # 均分
lo = np.logspace(0, 2, 3) # 10^0, 10^1, 10^2
I = np.eye(3) # 单位矩阵
d = np.diag([1, 2, 3]) # 对角矩阵
buf = np.frombuffer(b'\x01\x02\x03\x04', dtype='uint8') # bytes → 数组
fun = np.fromfunction(lambda i, j: i + j, (3, 3)) # 函数生成
print("创建示例汇总:\n", a, z.shape, ar, li, lo, d, buf, sep="\n")
数组操作
拼接分割
a = np.array([[1,2], [3,4]])
b = np.array([[5,6]])
# 垂直拼接
np.vstack((a, b)) # [[1,2],[3,4],[5,6]]
# 水平拼接
np.hstack((a, b.T)) # [[1,2,5],[3,4,6]]
# 分割数组
first, second = np.hsplit(a, 2)
索引切片
arr = np.array([[1,2,3], [4,5,6], [7,8,9]])
# 基本索引
arr[1, 2] # 6 (第1行第2列)
arr[0:2, 1:] # [[2,3],[5,6]]
# 布尔索引
mask = arr > 4
arr[mask] # [5,6,7,8,9]
# 花式索引
arr[[0,2], [1,0]] # [2,7] (0行1列, 2行0列)
变形
x = np.arange(12).reshape(3, 4)
# ---------------- 变形 ----------------
r1 = x.reshape(4, 3) # 改变形状(返回视图/复制)
r2 = np.resize(x, (2, 2)) # 强制裁剪/填充
f1 = x.flatten() # 展平(拷贝)
f2 = x.ravel() # 展平(视图)
t = x.transpose() # 转置
sw = x.swapaxes(0, 1) # 轴交换
bt = np.broadcast_to(x, (2, 3, 4)) # 广播到目标形状
print("变形后形状:", r1.shape, t.shape, bt.shape)
数学
a = np.array([1, 4, 9])
b = np.array([2, 2, 2])
# ---------------- 数学 ----------------
add = np.add(a, b) # 加
sub = np.subtract(a, b) # 减
mul = np.multiply(a, b) # 乘
div = np.divide(a, b) # 除
pow = np.power(a, 2) # 幂
sq = np.sqrt(a) # 平方根
exp = np.exp(a[:2]) # e^x
log = np.log(a[1:]) # 自然对数
sin = np.sin(np.pi/6 * np.ones(3))
cos = np.cos(np.pi/3 * np.ones(3))
tan = np.tan(np.array([0, np.pi/4]))
arcs = np.arcsin([0, 1])
print("数学示例:\n", add, sub, mul, div, pow, sq, exp, log, sin, cos, tan, arcs)
统计
x = np.array([1, 2, 3, 4, 5])
# ---------------- 统计 ----------------
s = x.sum()
m = x.mean()
st = x.std()
v = x.var()
mn = x.min()
mx = x.max()
ami = x.argmin()
amx = x.argmax()
med = np.median(x)
pct = np.percentile(x, 75)
cor = np.corrcoef(x, x**2)[0, 1]
print("统计结果:sum", s, "mean", m, "std", st, "corr", cor)
线性代数
A = np.array([[1, 2], [3, 4]])
B = np.array([[2, 0], [1, 2]])
v = np.array([1, 2])
# ---------------- 线性代数 ----------------
dot = np.dot(A, v) # 矩阵-向量乘
mat = A @ B # 矩阵乘
inn = np.inner(v, v) # 内积
out = np.outer(v, v) # 外积
inv = np.linalg.inv(A) # 逆矩阵
det = np.linalg.det(A) # 行列式
eig = np.linalg.eigvals(A) # 特征值
svd = np.linalg.svd(A) # 奇异值分解
print("线性代数示例:dot", dot, "det", det, "eig", eig)
逻辑
a = np.array([1, np.nan, 3])
b = np.array([2, 2, 2])
# ---------------- 逻辑 ----------------
land = np.logical_and(a>1, b>1)
lor = np.logical_or(a>2, b>2)
lnot = np.logical_not(a>1)
alla = np.all(b>0)
anya = np.any(np.isnan(a))
nan = np.isnan(a)
inf = np.isinf(np.array([1, np.inf]))
print("逻辑结果:and", land, "or", lor, "not", lnot, "all", alla, "any", anya)
集合
x = np.array([1, 2, 3, 3, 4])
y = np.array([3, 4, 5])
# ---------------- 集合 ----------------
un = np.unique(x) # 唯一值
in1 = np.in1d(x, y) # x 的元素是否在 y
int1 = np.intersect1d(x, y) # 交集
uni1 = np.union1d(x, y) # 并集
diff = np.setdiff1d(x, y) # 差集
xor = np.setxor1d(x, y) # 对称差
print("集合示例:\n unique", un, "in1d", in1, "intersect", int1, "union", uni1)
文件
x = np.arange(6).reshape(2, 3)
# ---------------- 文件 ----------------
np.save('tmp.npy', x) # 二进制 .npy
np.savetxt('tmp.csv', x, fmt='%d') # csv
np.savez('tmp.npz', x=x) # 压缩多数组
y1 = np.load('tmp.npy')
y2 = np.loadtxt('tmp.csv', dtype=int)
y3 = np.load('tmp.npz')['x']
print("文件读回:\n", y1, y2, y3)
随机
# ---------------- 随机 ----------------
r1 = np.random.rand(2, 2) # 均匀 [0,1)
r2 = np.random.randn(2, 2) # 标准正态
r3 = np.random.randint(0, 10, 5) # 随机整数
r4 = np.random.choice(10, 5, replace=False) # 无放回抽样
arr = np.arange(10)
np.random.shuffle(arr) # 原地洗牌
print("随机示例:\n", r1, r2, r3, r4, arr)
广播机制
A = np.ones((3,3))
B = np.array([1,2,3])
A + B # 自动扩展为:[[2,3,4],[2,3,4],[2,3,4]]
结构化数组
# 创建表格数据
dtype = [('name', 'S10'), ('age', 'i4'), ('height', 'f8')]
people = np.array([('Alice', 25, 1.65), ('Bob', 30, 1.8)], dtype=dtype)
# 按字段查询
people[people['age'] > 27] # [('Bob', 30, 1.8)]
向量化函数
# 创建自定义向量函数
def custom_func(x):
return x**2 + 2*x + 1
vectorized_func = np.vectorize(custom_func)
vectorized_func(np.array([1,2,3])) # [4,9,16]
内存映射
# 处理超大文件
large_data = np.memmap('bigdata.dat', dtype='float32', mode='r', shape=(10000, 10000))
应用
金融量化:分钟线批量因子
数据:随机生成 4 年、3000 只股票、1 分钟 K 线(OHLCV),约 2 GB→ 这里只用 10 只 × 1 天做演示。 因子:日内波动率、10 分钟 VWAP、相对开盘涨跌幅。
import numpy as np, pandas as pd, datetime as dt
# 1. 生成 mock 数据 ----------------------------------------------------------
n_stocks, days = 10, 1
mins = 240 * days
rng = np.random.default_rng(42)
dates = pd.date_range('2024-07-01', periods=mins, freq='1min')
stock_ids = [f'S{i:04d}' for i in range(n_stocks)]
# 用结构化数组一次生成
dtype = np.dtype([('open','f4'),('high','f4'),('low','f4'),('close','f4'),('vol','i4')])
bars = np.zeros((n_stocks, mins), dtype=dtype)
for i in range(n_stocks):
drift = rng.normal(0, 0.0005, mins).cumsum()
noise = rng.normal(0, 0.001, mins)
close = 10 * np.exp(drift + noise)
high = np.maximum(close, close*rng.uniform(1, 1.02))
low = np.minimum(close, close*rng.uniform(0.98, 1))
open_ = np.roll(close, 1); open_[0] = close[0]
bars[i]['open']=open_; bars[i]['high']=high
bars[i]['low']=low; bars[i]['close']=close
bars[i]['vol']=rng.integers(1000, 10000, mins)
# 2. 向量化计算因子 ---------------------------------------------------------
close = bars['close'] # shape (10, 240)
vol = bars['vol']
# 日内波动率
intraday_vol = close.std(axis=1)
# 10 分钟 VWAP
window = 10
vwap = (close * vol).cumsum(axis=1) / vol.cumsum(axis=1)
vwap10 = vwap[:, window-1]
# 相对开盘涨幅
ret_vs_open = (close[:, -1] / close[:, 0]) - 1
result = pd.DataFrame({
'stock': stock_ids,
'ivol': intraday_vol,
'vwap10': vwap10,
'ret_open': ret_vs_open
})
print(result.head())
电商 RFM
数据:1 亿条订单记录(user_id, amount, ts),用内存映射避免一次性读入。 RFM:R=最近一次距今天数,F=90 天订单数,M=90 天金额。
import numpy as np, time, datetime as dt
# 0. 生成 1 亿条 mock 数据 ----------------------------------------------------
N = 100_000_000
rng = np.random.default_rng(42)
user_ids = rng.integers(0, 5_000_000, N) # 500 万用户
amounts = rng.lognormal(4, 1.2, N).astype('float32')
# 时间戳:最近 90 天
ts_max = np.datetime64('2024-07-28')
timestamps = ts_max - rng.integers(0, 90*24*3600, N).astype('timedelta64[s]')
# 1. 内存映射方式保存 *.npy ---------------------------------------------------
np.save('orders_user.npy', user_ids)
np.save('orders_amt.npy', amounts)
np.save('orders_ts.npy', timestamps.view('int64'))
# 2. 使用 NumPy 计算 RFM ------------------------------------------------------
tic = time.time()
uids = np.load('orders_user.npy', mmap_mode='r')
amts = np.load('orders_amt.npy', mmap_mode='r')
tss = np.load('orders_ts.npy', mmap_mode='r').view('datetime64[s]')
mask = (ts_max - tss).astype('int64') < 90*24*3600 # 最近 90 天
f = np.bincount(uids[mask], minlength=5_000_000)
m = np.bincount(uids[mask], weights=amts[mask], minlength=5_000_000)
r_days = (ts_max - np.minimum.reduceat(tss, np.unique(uids, return_index=True)[1])) / np.timedelta64(1,'D')
# 打包结果
rfm = np.vstack([r_days, f, m]).T
print('耗时:', time.time()-tic, '秒') # ~60 s on 8-core
医疗影像
CT 三维体素 Sobel 边缘
数据:使用 scikit-image 自带 3D 体数据(512×512×300)→ 演示用 128×128×64 子块。
from skimage import data
import numpy as np
from scipy.ndimage import sobel
# 1. 加载 3D 体数据 ----------------------------------------------------------
# 这里用 Shepp-Logan head phantom
vol = data.shepp_logan_phantom()
vol = vol[::4, ::4, ::4] # 下采样 128x128x64
# 2. Sobel 边缘检测 ----------------------------------------------------------
gx = sobel(vol, axis=0)
gy = sobel(vol, axis=1)
gz = sobel(vol, axis=2)
g = np.sqrt(gx**2 + gy**2 + gz**2)
print('边缘体素占比:', (g>0.1).mean()*100, '%')
工业振动
实时 FFT 故障频率 数据:模拟 10 kHz 采样,1 s 窗口,100 条通道。
import numpy as np, time
from numba import njit
fs = 10_000
N = 100_000
channels = 100
rng = np.random.default_rng(42)
# 1. 生成带故障频率的振动信号 -----------------------------------------------
t = np.arange(N)/fs
signal = rng.normal(0, 1, (channels, N))
# 加 250 Hz 故障谐波
signal += 0.3*np.sin(2*np.pi*250*t)
# 2. 实时 FFT ---------------------------------------------------------------
@njit(cache=True, fastmath=True)
def batch_fft(x):
return np.abs(np.fft.rfft(x, axis=1))
tic = time.time()
spec = batch_fft(signal) # 100×5001
print('100 通道 1 s FFT 耗时:', time.time()-tic, '秒')
# 3. 找出故障
freqs = np.fft.rfftfreq(N, 1/fs)
peak = freqs[np.argmax(spec, axis=1)]
print('故障频率检测均值:', peak.mean(), 'Hz')
光伏预测
气象格点到电站插值
数据:ERA5 0.25° 网格 2 m 温度、10 m 风速,插值到 3000 个电站坐标。
import numpy as np
from scipy.interpolate import RegularGridInterpolator
# 1. mock 气象格点 -----------------------------------------------------------
lats = np.linspace(15, 55, 161) # 0.25°
lons = np.linspace(70, 140, 281)
temp = 20 + 10*np.sin(np.pi*lats/90)[:,None] + 5*np.cos(2*np.pi*lons/180)[None,:]
wind = 3 + 1.5*np.random.rand(*temp.shape)
# 2. mock 电站坐标 -----------------------------------------------------------
n_plants = 3000
rng = np.random.default_rng(42)
plant_lat = rng.uniform(15, 55, n_plants)
plant_lon = rng.uniform(70, 140, n_plants)
# 3. 双线性插值 --------------------------------------------------------------
interp_T = RegularGridInterpolator((lats, lons), temp, bounds_error=False, fill_value=None)
interp_W = RegularGridInterpolator((lats, lons), wind, bounds_error=False, fill_value=None)
T_site = interp_T(np.c_[plant_lat, plant_lon])
W_site = interp_W(np.c_[plant_lat, plant_lon])
# 4. 简单线性回归预测功率(MW) ----------------------------------------------
power = 0.15*W_site + 0.4*T_site - 2 # 伪模型
print('预测功率均值:', power.mean(), 'MW')
← Previous postPython类库Pandas
Next post →Python爬虫系统架构设计