目录
1、apply用法
apply在pandas里非常好用的,那在pyodps里如何去使用,还是有一些区别的,在pyodps中要对一行数据使用自定义函数,可以使用 apply 方法,axis 参数必须为 1,表示在行上操作。
apply 的自定义函数接收一个参数,为上一步 Collection 的一行数据,用户可以通过属性、或者偏移取得一个字段的数据。
iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types=\'float\').rename(\'sepaladd\').head(3) sepaladd 0 8.6 1 7.9 2 7.9
reduce
为 True 时,表示返回结果为Sequence,否则返回结果为Collection。 names
和 types
参数分别指定返回的Sequence或Collection的字段名和类型。 如果类型不指定,将会默认为string类型。
在 apply 的自定义函数中,reduce 为 False 时,也可以使用 yield
关键字来返回多行结果。
iris.count() 150 def handle(row): yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth yield row.petallength - row.petalwidth, row.petallength + row.petalwidth iris.apply(handle, axis=1, names=[\'iris_add\', \'iris_sub\'], types=[\'float\', \'float\']).count() 300
我们也可以在函数上来注释返回的字段和类型,这样就不需要在函数调用时再指定。
from odps.df import output @output([\'iris_add\', \'iris_sub\'], [\'float\', \'float\']) def handle(row): yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth yield row.petallength - row.petalwidth, row.petallength + row.petalwidth iris.apply(handle, axis=1).count() 300
也可以使用 map-only 的 map_reduce,和 axis=1 的apply操作是等价的。
iris.map_reduce(mapper=handle).count() 300
如果想调用 ODPS 上已经存在的 UDTF,则函数指定为函数名即可。
iris[\'name\', \'sepallength\'].apply(\'your_func\', axis=1, names=[\'name2\', \'sepallength2\'], types=[\'string\', \'float\'])
使用 apply 对行操作,且 reduce
为 False 时,可以使用 并列多行输出 与已有的行结合,用于后续聚合等操作。
并列多行输出:
对于 list 及 map 类型的列,explode 方法会将该列转换为多行输出。使用 apply 方法也可以输出多行。 为了进行聚合等操作,常常需要将这些输出和原表中的列合并。此时可以使用 DataFrame 提供的并列多行输出功能, 写法为将多行输出函数生成的集合与原集合中的列名一起映射。
并列多行输出的例子如下:
>>> df id a b 0 1 [a1, b1] [a2, b2, c2] 1 2 [c1] [d2, e2] >>> df[df.id, df.a.explode(), df.b] id a b 0 1 a1 [a2, b2, c2] 1 1 b1 [a2, b2, c2] 2 2 c1 [d2, e2] >>> df[df.id, df.a.explode(), df.b.explode()] id a b 0 1 a1 a2 1 1 a1 b2 2 1 a1 c2 3 1 b1 a2 4 1 b1 b2 5 1 b1 c2 6 2 c1 d2 7 2 c1 e2
如果多行输出方法对某个输入不产生任何输出,默认输入行将不在最终结果中出现。如果需要在结果中出现该行,可以设置 keep_nulls=True
。
此时,与该行并列的值将输出为空值:
>>> df
id a
0 1 [a1, b1]
1 2 []
>>> df[df.id, df.a.explode()]
id a
0 1 a1
1 1 b1
>>> df[df.id, df.a.explode(keep_nulls=True)]
id a
0 1 a1
1 1 b1
2 2 None
from odps.df import output @output([\'iris_add\', \'iris_sub\'], [\'float\', \'float\']) def handle(row): yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth yield row.petallength - row.petalwidth, row.petallength + row.petalwidth iris[iris.category, iris.apply(handle, axis=1)]
pyodps中有很多本来在pandas中一个API解决的东西却要想半天才能搞定。
pandas中在groupby后只要用first就可以去出分组后的第一行。
例如:
# 以student_id为分组列,然后取出分组后每组的第一条数据 df_stu_frist_course = df_stu_course.groupby(\'student_id\').first()
2、取分组排序后的第一条数据
然而pyodps中却很坑爹,没有什么first,只能自己想办法。这里我又添加了一个排序
例如:
首先用student_id进行分组,然后用student_id和gmt_create进行排序,最后用窗口函数nth_value取分组中的第一个值并改名first_course_id, 并将其他字段输出
df_group = df.groupby(\'student_id\') df_inst_stu_cou = df_inst_stu_cou[\'student_id\', df_group.sort([\'student_id\', \'gmt_create\'], ascending=[True, False]).course_id.nth_value(0).rename(\'first_course_id\')]
但是这是并不是取出第一行,而是将所有以student_id分为一组的数据的其他列数据都改为排序后的第一个值, 也就是说原df_inst_stu_cou还没有分组,只是添加了分组的后取出第一个值的一列,所以我们要以student_id分组去重。
所以我们只要再以student_id分组,然后用聚合函数cat将其他所有的列按照行进行连接(这里我的连接符选择了逗号),然后在map函数中用split分割成列表取第一个即可
df = df.groupby(\'student_id\').agg(df.first_course_id.cat(sep=\',\').rename(\'first_course_ids\')) df[\'first_course_name\'] = df.first_course_names.map(lambda x: x.split(\',\')[0], \'string\')
暂无评论内容