Vanson's Eternal Blog

Python类库Numpy

Python numpy basic.png
Published on
/10 mins read/---

Numpy

Basic

runoob的 numpy文档

类别方法列表
创建array, zeros, ones, empty, arange, linspace, logspace, eye, diag, frombuffer, fromfunction
变形reshape, resize, flatten, ravel, transpose, swapaxes, broadcast_to
数学add, subtract, multiply, divide, power, sqrt, exp, log, sin, cos, tan, arcsin
统计sum, mean, std, var, min, max, argmin, argmax, median, percentile, corrcoef
线性代数dot, matmul, inner, outer, linalg.inv, linalg.det, linalg.eig, linalg.svd
逻辑logical_and, logical_or, logical_not, all, any, isnan, isinf
集合unique, in1d, intersect1d, union1d, setdiff1d, setxor1d
文件save, load, savez, savetxt, loadtxt, genfromtxt
随机random.rand, random.randn, random.randint, random.choice, random.shuffle

创建

import numpy as np
 
# ---------------- 创建 ----------------
a  = np.array([1, 2, 3])            # 列表 → 数组
z  = np.zeros((2, 3))               # 全 0
o  = np.ones((2, 3))                # 全 1
e  = np.empty((2, 2))               # 未初始化
ar = np.arange(0, 10, 2)            # 等差
li = np.linspace(0, 1, 5)           # 均分
lo = np.logspace(0, 2, 3)           # 10^0, 10^1, 10^2
I  = np.eye(3)                      # 单位矩阵
d  = np.diag([1, 2, 3])             # 对角矩阵
buf = np.frombuffer(b'\x01\x02\x03\x04', dtype='uint8')  # bytes → 数组
fun = np.fromfunction(lambda i, j: i + j, (3, 3))        # 函数生成
 
print("创建示例汇总:\n", a, z.shape, ar, li, lo, d, buf, sep="\n")

数组操作

拼接分割

a = np.array([[1,2], [3,4]])
b = np.array([[5,6]])
 
# 垂直拼接
np.vstack((a, b))    # [[1,2],[3,4],[5,6]]
 
# 水平拼接
np.hstack((a, b.T))  # [[1,2,5],[3,4,6]]
 
# 分割数组
first, second = np.hsplit(a, 2)

索引切片

arr = np.array([[1,2,3], [4,5,6], [7,8,9]])
 
# 基本索引
arr[1, 2]    # 6 (第1行第2列)
arr[0:2, 1:] # [[2,3],[5,6]]
 
# 布尔索引
mask = arr > 4
arr[mask]     # [5,6,7,8,9]
 
# 花式索引
arr[[0,2], [1,0]]  # [2,7] (0行1列, 2行0列)

变形

x = np.arange(12).reshape(3, 4)
 
# ---------------- 变形 ----------------
r1 = x.reshape(4, 3)          # 改变形状(返回视图/复制)
r2 = np.resize(x, (2, 2))     # 强制裁剪/填充
f1 = x.flatten()              # 展平(拷贝)
f2 = x.ravel()                # 展平(视图)
t  = x.transpose()            # 转置
sw = x.swapaxes(0, 1)         # 轴交换
bt = np.broadcast_to(x, (2, 3, 4))  # 广播到目标形状
 
print("变形后形状:", r1.shape, t.shape, bt.shape)

数学

a = np.array([1, 4, 9])
b = np.array([2, 2, 2])
 
# ---------------- 数学 ----------------
add = np.add(a, b)         # 加
sub = np.subtract(a, b)    # 减
mul = np.multiply(a, b)    # 乘
div = np.divide(a, b)      # 除
pow = np.power(a, 2)       # 幂
sq  = np.sqrt(a)           # 平方根
exp = np.exp(a[:2])        # e^x
log = np.log(a[1:])        # 自然对数
sin = np.sin(np.pi/6 * np.ones(3))
cos = np.cos(np.pi/3 * np.ones(3))
tan = np.tan(np.array([0, np.pi/4]))
arcs = np.arcsin([0, 1])
 
print("数学示例:\n", add, sub, mul, div, pow, sq, exp, log, sin, cos, tan, arcs)

统计

x = np.array([1, 2, 3, 4, 5])
 
# ---------------- 统计 ----------------
s  = x.sum()
m  = x.mean()
st = x.std()
v  = x.var()
mn = x.min()
mx = x.max()
ami = x.argmin()
amx = x.argmax()
med = np.median(x)
pct = np.percentile(x, 75)
cor = np.corrcoef(x, x**2)[0, 1]
 
print("统计结果:sum", s, "mean", m, "std", st, "corr", cor)

线性代数

A = np.array([[1, 2], [3, 4]])
B = np.array([[2, 0], [1, 2]])
v = np.array([1, 2])
 
# ---------------- 线性代数 ----------------
dot  = np.dot(A, v)          # 矩阵-向量乘
mat  = A @ B                 # 矩阵乘
inn  = np.inner(v, v)        # 内积
out  = np.outer(v, v)        # 外积
inv  = np.linalg.inv(A)      # 逆矩阵
det  = np.linalg.det(A)      # 行列式
eig  = np.linalg.eigvals(A)  # 特征值
svd  = np.linalg.svd(A)      # 奇异值分解
 
print("线性代数示例:dot", dot, "det", det, "eig", eig)

逻辑

a = np.array([1, np.nan, 3])
b = np.array([2, 2, 2])
 
# ---------------- 逻辑 ----------------
land = np.logical_and(a>1, b>1)
lor  = np.logical_or(a>2, b>2)
lnot = np.logical_not(a>1)
alla = np.all(b>0)
anya = np.any(np.isnan(a))
nan  = np.isnan(a)
inf  = np.isinf(np.array([1, np.inf]))
 
print("逻辑结果:and", land, "or", lor, "not", lnot, "all", alla, "any", anya)

集合

x = np.array([1, 2, 3, 3, 4])
y = np.array([3, 4, 5])
 
# ---------------- 集合 ----------------
un  = np.unique(x)              # 唯一值
in1 = np.in1d(x, y)             # x 的元素是否在 y
int1 = np.intersect1d(x, y)     # 交集
uni1 = np.union1d(x, y)         # 并集
diff = np.setdiff1d(x, y)       # 差集
xor  = np.setxor1d(x, y)        # 对称差
 
print("集合示例:\n unique", un, "in1d", in1, "intersect", int1, "union", uni1)

文件

x = np.arange(6).reshape(2, 3)
 
# ---------------- 文件 ----------------
np.save('tmp.npy', x)                   # 二进制 .npy
np.savetxt('tmp.csv', x, fmt='%d')      # csv
np.savez('tmp.npz', x=x)                # 压缩多数组
 
y1 = np.load('tmp.npy')
y2 = np.loadtxt('tmp.csv', dtype=int)
y3 = np.load('tmp.npz')['x']
 
print("文件读回:\n", y1, y2, y3)

随机

# ---------------- 随机 ----------------
r1 = np.random.rand(2, 2)               # 均匀 [0,1)
r2 = np.random.randn(2, 2)              # 标准正态
r3 = np.random.randint(0, 10, 5)        # 随机整数
r4 = np.random.choice(10, 5, replace=False)  # 无放回抽样
arr = np.arange(10)
np.random.shuffle(arr)                   # 原地洗牌
 
print("随机示例:\n", r1, r2, r3, r4, arr)

广播机制

A = np.ones((3,3))
B = np.array([1,2,3])
A + B  # 自动扩展为:[[2,3,4],[2,3,4],[2,3,4]]

结构化数组

# 创建表格数据
dtype = [('name', 'S10'), ('age', 'i4'), ('height', 'f8')]
people = np.array([('Alice', 25, 1.65), ('Bob', 30, 1.8)], dtype=dtype)
 
# 按字段查询
people[people['age'] > 27]  # [('Bob', 30, 1.8)]

向量化函数

# 创建自定义向量函数
def custom_func(x):
    return x**2 + 2*x + 1
 
vectorized_func = np.vectorize(custom_func)
vectorized_func(np.array([1,2,3]))  # [4,9,16]

内存映射

# 处理超大文件
large_data = np.memmap('bigdata.dat', dtype='float32', mode='r', shape=(10000, 10000))

应用

金融量化:分钟线批量因子

数据:随机生成 4 年、3000 只股票、1 分钟 K 线(OHLCV),约 2 GB→ 这里只用 10 只 × 1 天做演示。 因子:日内波动率、10 分钟 VWAP、相对开盘涨跌幅。

import numpy as np, pandas as pd, datetime as dt
 
# 1. 生成 mock 数据 ----------------------------------------------------------
n_stocks, days = 10, 1
mins = 240 * days
rng = np.random.default_rng(42)
 
dates = pd.date_range('2024-07-01', periods=mins, freq='1min')
stock_ids = [f'S{i:04d}' for i in range(n_stocks)]
 
# 用结构化数组一次生成
dtype = np.dtype([('open','f4'),('high','f4'),('low','f4'),('close','f4'),('vol','i4')])
bars = np.zeros((n_stocks, mins), dtype=dtype)
 
for i in range(n_stocks):
    drift = rng.normal(0, 0.0005, mins).cumsum()
    noise = rng.normal(0, 0.001, mins)
    close = 10 * np.exp(drift + noise)
    high  = np.maximum(close, close*rng.uniform(1, 1.02))
    low   = np.minimum(close, close*rng.uniform(0.98, 1))
    open_ = np.roll(close, 1); open_[0] = close[0]
    bars[i]['open']=open_; bars[i]['high']=high
    bars[i]['low']=low; bars[i]['close']=close
    bars[i]['vol']=rng.integers(1000, 10000, mins)
 
# 2. 向量化计算因子 ---------------------------------------------------------
close = bars['close']            # shape (10, 240)
vol   = bars['vol']
 
# 日内波动率
intraday_vol = close.std(axis=1)
 
# 10 分钟 VWAP
window = 10
vwap = (close * vol).cumsum(axis=1) / vol.cumsum(axis=1)
vwap10 = vwap[:, window-1]
 
# 相对开盘涨幅
ret_vs_open = (close[:, -1] / close[:, 0]) - 1
 
result = pd.DataFrame({
    'stock': stock_ids,
    'ivol': intraday_vol,
    'vwap10': vwap10,
    'ret_open': ret_vs_open
})
print(result.head())

电商 RFM

数据:1 亿条订单记录(user_id, amount, ts),用内存映射避免一次性读入。 RFM:R=最近一次距今天数,F=90 天订单数,M=90 天金额。

import numpy as np, time, datetime as dt
 
# 0. 生成 1 亿条 mock 数据 ----------------------------------------------------
N = 100_000_000
rng = np.random.default_rng(42)
user_ids = rng.integers(0, 5_000_000, N)      # 500 万用户
amounts  = rng.lognormal(4, 1.2, N).astype('float32')
# 时间戳:最近 90 天
ts_max = np.datetime64('2024-07-28')
timestamps = ts_max - rng.integers(0, 90*24*3600, N).astype('timedelta64[s]')
 
# 1. 内存映射方式保存 *.npy ---------------------------------------------------
np.save('orders_user.npy', user_ids)
np.save('orders_amt.npy', amounts)
np.save('orders_ts.npy', timestamps.view('int64'))
 
# 2. 使用 NumPy 计算 RFM ------------------------------------------------------
tic = time.time()
uids = np.load('orders_user.npy', mmap_mode='r')
amts = np.load('orders_amt.npy',  mmap_mode='r')
tss  = np.load('orders_ts.npy',   mmap_mode='r').view('datetime64[s]')
 
mask = (ts_max - tss).astype('int64') < 90*24*3600  # 最近 90 天
f = np.bincount(uids[mask], minlength=5_000_000)
m = np.bincount(uids[mask], weights=amts[mask], minlength=5_000_000)
r_days = (ts_max - np.minimum.reduceat(tss, np.unique(uids, return_index=True)[1])) / np.timedelta64(1,'D')
 
# 打包结果
rfm = np.vstack([r_days, f, m]).T
print('耗时:', time.time()-tic, '秒')  # ~60 s on 8-core

医疗影像

CT 三维体素 Sobel 边缘

数据:使用 scikit-image 自带 3D 体数据(512×512×300)→ 演示用 128×128×64 子块。

from skimage import data
import numpy as np
from scipy.ndimage import sobel
 
# 1. 加载 3D 体数据 ----------------------------------------------------------
# 这里用 Shepp-Logan head phantom
vol = data.shepp_logan_phantom()
vol = vol[::4, ::4, ::4]          # 下采样 128x128x64
 
# 2. Sobel 边缘检测 ----------------------------------------------------------
gx = sobel(vol, axis=0)
gy = sobel(vol, axis=1)
gz = sobel(vol, axis=2)
g = np.sqrt(gx**2 + gy**2 + gz**2)
 
print('边缘体素占比:', (g>0.1).mean()*100, '%')
 

工业振动

实时 FFT 故障频率 数据:模拟 10 kHz 采样,1 s 窗口,100 条通道。

import numpy as np, time
from numba import njit
 
fs = 10_000
N = 100_000
channels = 100
rng = np.random.default_rng(42)
 
# 1. 生成带故障频率的振动信号 -----------------------------------------------
t = np.arange(N)/fs
signal = rng.normal(0, 1, (channels, N))
# 加 250 Hz 故障谐波
signal += 0.3*np.sin(2*np.pi*250*t)
 
# 2. 实时 FFT ---------------------------------------------------------------
@njit(cache=True, fastmath=True)
def batch_fft(x):
    return np.abs(np.fft.rfft(x, axis=1))
 
tic = time.time()
spec = batch_fft(signal)     # 100×5001
print('100 通道 1 s FFT 耗时:', time.time()-tic, '秒')
 
# 3. 找出故障
freqs = np.fft.rfftfreq(N, 1/fs)
peak = freqs[np.argmax(spec, axis=1)]
print('故障频率检测均值:', peak.mean(), 'Hz')

光伏预测

气象格点到电站插值

数据:ERA5 0.25° 网格 2 m 温度、10 m 风速,插值到 3000 个电站坐标。

import numpy as np
from scipy.interpolate import RegularGridInterpolator
 
# 1. mock 气象格点 -----------------------------------------------------------
lats = np.linspace(15, 55, 161)   # 0.25°
lons = np.linspace(70, 140, 281)
temp = 20 + 10*np.sin(np.pi*lats/90)[:,None] + 5*np.cos(2*np.pi*lons/180)[None,:]
wind = 3 + 1.5*np.random.rand(*temp.shape)
 
# 2. mock 电站坐标 -----------------------------------------------------------
n_plants = 3000
rng = np.random.default_rng(42)
plant_lat = rng.uniform(15, 55, n_plants)
plant_lon = rng.uniform(70, 140, n_plants)
 
# 3. 双线性插值 --------------------------------------------------------------
interp_T = RegularGridInterpolator((lats, lons), temp, bounds_error=False, fill_value=None)
interp_W = RegularGridInterpolator((lats, lons), wind, bounds_error=False, fill_value=None)
 
T_site = interp_T(np.c_[plant_lat, plant_lon])
W_site = interp_W(np.c_[plant_lat, plant_lon])
 
# 4. 简单线性回归预测功率(MW) ----------------------------------------------
power = 0.15*W_site + 0.4*T_site - 2  # 伪模型
print('预测功率均值:', power.mean(), 'MW')