忍者ブログ

【pyspark】DataFrameのレコード毎に処理を行う。UDF(User Defined Function)

2020年09月21日 00時10分07秒
DataFrameのレコード毎に特定の処理を行いたい場合、UDF(User Defined Function)の例。


from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark.sql.functions import struct, lit
from pyspark.sql.types import StringType
def f(row):
    str = f"{row['name']}は{row['age']}歳です"
    return str
udf_func = udf(f)
spark = SparkSession.builder.getOrCreate()
d = [
    {'name': 'Tanaka', 'age': 10},
    {'name': 'Suzuki', 'age': 15},
    {'name': 'Satou', 'age': 19}
]
df = spark.createDataFrame(d)
result_df = df.withColumn('udf_result', udf_func(struct(*df.schema.names)))
result_df.show(truncate=False)
+---+------+-------------------+
|age|name  |udf_result          |
+---+------+-------------------+
|10 |Tanaka|Tanakaは10歳です|
|15 |Suzuki|Suzukiは15歳です |
|19 |Satou |Satouは19歳です   |
+---+------+-------------------+



PR

pysparkでCSVを読み込んでDataFrameを作成

2020年09月13日 02時54分21秒
【読み込むCSV】
Name,Age,TestDate
tanaka,10,2020-08-01 01:00:00
suzuki,13,2020-08-02 02:00:00
satou,15,2020-08-03 03:00:00

【pythonモジュール】
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StructField, StructType, StringType, TimestampType
schema = StructType(
    [
        StructField('Name', StringType(), False),
        StructField('Age', IntegerType(), False),
        StructField('TestDate', TimestampType(), False)
    ]
)
spark = SparkSession.builder.getOrCreate()
df = spark.read.format(
    'csv'
).option(
    'timestampFormat', 'yyyy-MM-dd HH:mm:ss'
).load(
    '/tmp/test.csv', schema=schema
)

df.show()
+-------+----+-----------------------+
|  Name| Age|                  TestDate|
+-------+----+-----------------------+
|tanaka |  10|2020-08-01 01:00:00|
|suzuki  |  13|2020-08-02 02:00:00|
| satou  |  15|2020-08-03 03:00:00|
+------+----+------------------------+

df.schema
StructType(List(StructField(Name,StringType,true),StructField(Age,IntegerType,true),StructField(TestDate,TimestampType,true)))

【schemaを指定しない場合】
schemaを指定せずにCSVを読み込むことも可能。
むしろschemaの定義とCSVの値が一致しなければnullになってしまうため、指定しない方が楽。
が、schemaを指定しない場合は全カラムがStringとして扱われてしまう。

df = spark.read.format(
    'csv'
).load(
    '/tmp/test.csv'
)
df.schema
StructType(List(StructField(_c0,StringType,true),StructField(_c1,StringType,true),StructField(_c2,StringType,true)))



手越祐也 母子家庭に弁当無償で配達 本当の目的は・・・

2020年06月21日 16時26分46秒
元NEWS・手越 ボランティアは継続 母子家庭に弁当無償で配達
https://news.yahoo.co.jp/articles/df6bc219fee32d11514d872a5ce32d504f0e1be6

なんで「母子」限定なんだろう。
女好きの手越が母子限定かぁ。
手越祐也のイメージからして、簡単に落とせそうな綺麗な人を狙って・・・としか思えないんだよなぁ。


マックシェイクプッチンプリンが買えない

2020年05月04日 19時51分08秒
マックシェイクのプッチンプリン味が気になるが、買いに行っても売り切れ。
マックデリバリーの注文画面張り付いても売り切れのままで買えない。
外出自粛でマックデリバリー頼んでる人が増えてる&テレビでさんまさんがプッチンプリン味のことを喋ったから供給が追い付いていないのだろうか。
5月中旬までらしいから買えずに販売終了迎えそうだな。


リモートコンピューターと最初にネゴシエートするときに、セイキュリティ層で処理エラーが検出されたため、L2TP接続に失敗しました

2020年04月17日 19時18分24秒
コロナ対策として、うちの会社でもリモートワークをすることに。
急激にリモートワーク化が進んでいるが、みんなセキュリティのこと意識しているのだろうか?
コロナが落ち着いたとしても、情報漏洩多発で騒ぎだす予感しかしない・・・。

まぁよそ様のことはほっといて、うちはVPNを使ってリモートワークすることに。
いざ自宅から接続してみると、

リモートコンピューターと最初にネゴシエートするときに、セイキュリティ層で処理エラーが検出されたため、L2TP接続に失敗しました

と怒られて接続できない。
会社内ではテスト接続成功したんですが・・・。

とりあえず、エラーメッセージでググってみると、サービスに追加しろとか、レジストリに追加しろとか出てきましたが、解決せず。

そもそも会社内ではテスト接続で成功していたので、パソコン自体には問題ないはず。
そうなると自宅の回線に問題が?と思い、無線LANルーターをチェック。

ビンゴでした。

今回、Windows10で「VPN接続を追加」を行ったのですが、
その際、VPNの種類を「事前共有キーを使ったL2TP\IPsec」を選択しています。

で、自宅の無線LANルータはIOデータの「WN-G300R」を使用。
このWN-G300Rですが、デフォルト設定で「IPSecパススルー」が無効になっているようです。
IPSecパススルーを有効に設定したら無事VPN接続することが出来ました。