LoginSignup
7
3

Marketoに連携された癖が強すぎるEventHub視聴ログをBigQueryで整理する

Posted at

はじめに

弊社primeNumberは、2023/11/28にイベントプラットフォームであるEventHubで「01(zeroONE)2023」というオンラインイベントを開催しました。申込者5500名を達成した大規模なイベントとなりましたが、安定的に配信が行えてEventHubには非常に感謝しています。

さて、このEventHubでは、MAツールであるMarketoとAPI連携ができます。このとき、参加者管理を同期できるのはもちろん、視聴者の視聴ログをActivityに連携することもできます。私はこのデータを利用して、イベント終了後に視聴者の視聴状況の分析を行いました。

本記事では、その経験を踏まえて、Marketoに連携されたActivityデータをもとに、EventHubの視聴ログをBigQueryで整理する方法をご紹介します。データの仕様を検証してみたところあまりにも癖が強すぎたので、もしやろうと思ってくじけた方がいれば参考にしてもらえると嬉しいです。

こんな方におすすめ

  • MarketoのActivityに連携したEventHubのシングルトラックのライブ配信/疑似ライブ配信の視聴ログを分析したい方(めっちゃニッチ・・・)

事前準備

下記の準備を事前に行っていることを前提にします。

  • EventHubとMarketoをAPI連携する
  • MarketoのActivity、ActivityTypeをBigQueryにデータ転送する
    • trocco®を利用すると簡単にできるので、よろしければご利用ください!
  • 複数セッションある場合には、BigQueryでセッション別の開始時間と終了時間のマスタを作成する
    • image.png

データ処理方法

それでは、具体的なデータ処理を確認していきましょう。

①視聴ログのActivityを抽出する

まず、大量にあるActivityのデータから、視聴ログのデータのみを抽出します。

create or replace table `project-id.marketo.activity_event_watch_logs`
as
select
  a.marketoguid as marketo_guid,
  a.leadid as lead_id,
  a.activitydate as activity_timestamp,
  a.activitytypeid as activity_type_id,
  atype.name as activitytype,
  a.campaignid as campaign_id,
  a.primaryattributevalueid as primary_attribute_value_id,
  a.primaryattributevalue as primary_attribute_value,
  parse_json(a.attributes) as attributes  -- attributesがstringの場合
from
  `project-id.marketo.activity` a
left join
  `project-id.marketo.activity_type` atype
  on
    a.activitytypeid = atype.id
where
  a.activitydate between timestamp('2023-11-28') and timestamp('2023-11-29')  -- イベント実施期間
  and a.activitytypeid = ******  -- 対応するactivityTypeId(activityType = Video watch times)

視聴ログのデータは下記のように、attributesのなかにTotal watch minutesが記載されているようなデータになっています。これは、Activity_timestampの時点で、視聴者がTotal watch minutes分だけ視聴したと解釈すればよいでしょう。

image.png

②視聴ログをセッション別に切り分ける

次に、視聴ログをセッション別に切り分けていきますが、クエリが複雑なので、はじめに全体のクエリを提示したあとに、個別に処理を解説していきます。

全体のクエリ(折りたたまれています)
create table `project-id.marketo.activity_event_watch_logs_per_session`
as
with start_end as (
  select
    lead_id,
    datetime(timestamp_sub(activity_timestamp, interval cast(json_value(attributes, '$.Total watch minutes') as integer) minute), 'Asia/Tokyo') as start_watching_at_jst,
    datetime(activity_timestamp, 'Asia/Tokyo') as end_watching_at_jst,
  from
    `project-id.marketo.activity_event_watch_logs`
  order by  -- データチェックしやすいようにあえてorder by句を入れたままにしている、利用時には除外してください
    lead_id,
    end_watching_at_jst
),

overlap_check as (
  select
    lead_id,
    start_watching_at_jst,
    end_watching_at_jst,
    lag(start_watching_at_jst) over (partition by lead_id order by end_watching_at_jst) as previous_start_watching_at_jst,
    lag(end_watching_at_jst) over (partition by lead_id order by end_watching_at_jst) as previous_end_watching_at_jst,
    coalesce(
      start_watching_at_jst > lag(end_watching_at_jst) over (partition by lead_id order by end_watching_at_jst)
      or
      end_watching_at_jst < lag(start_watching_at_jst) over (partition by lead_id order by end_watching_at_jst),
      true) as is_not_overlap  -- 前のログの視聴時間と今のログの視聴時間に重なりがあるかを判定、Nullの場合はtrue
  from
    start_end
  order by  -- 同上
    lead_id,
    end_watching_at_jst
),

watch_group as (
  select
    *,
    count(case when is_not_overlap is true then 1 end) over (partition by lead_id order by end_watching_at_jst) as watch_group_no  -- 重複時間があるものをグループ化するための番号を振る
  from
    overlap_check
  order by
    lead_id,
    end_watching_at_jst
),

merge_log as (
  select
    lead_id,
    watch_group_no,
    min(start_watching_at_jst) as start_watching_at_jst,
    max(end_watching_at_jst) as end_watching_at_jst,
  from
    watch_group
  group by
    lead_id,
    watch_group_no
  order by
    lead_id,
    watch_group_no
)

select
  * except(session_id),
  greatest(start_watching_at_jst, session_start_at_jst) as session_start_watching_at_jst,
  least(end_watching_at_jst, session_end_at_jst) as session_end_watching_at_jst,
  datetime_diff(least(end_watching_at_jst, session_end_at_jst), greatest(start_watching_at_jst, session_start_at_jst), second) / 60 as session_watch_minutes,
  datetime_diff(least(end_watching_at_jst, session_end_at_jst), greatest(start_watching_at_jst, session_start_at_jst), second) / 60
   / session_minutes as session_watch_rate
from
  merge_log l
inner join
  `project-id.dataset_id.session_master` s
  on  -- セッション時間内に視聴時間が含まれている
    l.start_watching_at_jst between s.session_start_at_jst and s.session_end_at_jst
    or l.end_watching_at_jst between s.session_start_at_jst and s.session_end_at_jst
    or (l.start_watching_at_jst <= s.session_start_at_jst and s.session_end_at_jst <= l.end_watching_at_jst)
order by
  lead_id,
  watch_group_no,
  session_start_at_jst

視聴開始/終了時間を算出する

まず、どの時点で何分視聴したというデータが来ているので、時間を視聴時間分だけ引くことで視聴開始時間を算出します。

with start_end as (
  select
    lead_id,
    datetime(timestamp_sub(activity_timestamp, interval cast(json_value(attributes, '$.Total watch minutes') as integer) minute), 'Asia/Tokyo') as start_watching_at_jst,
    datetime(activity_timestamp, 'Asia/Tokyo') as end_watching_at_jst,
  from
    `project-id.marketo.activity_event_watch_logs`
  order by  -- データチェックしやすいようにあえてorder by句を入れたままにしている、利用時には除外してください
    lead_id,
    end_watching_at_jst
),

下記のデータは全て同じlead_idなのですが、もうこの時点で癖の強さが出ていますね。このデータを見るに、およそ5分単位で定期的にログが連携されているようです。

image.png

連携されるログ間での重なりを判別する

先ほどのままでは視聴し続けていることの判別ができないので、視聴ログを前後で比較して、重なりがあるかどうかを判別します。

overlap_check as (
  select
    lead_id,
    start_watching_at_jst,
    end_watching_at_jst,
    lag(start_watching_at_jst) over (partition by lead_id order by end_watching_at_jst) as previous_start_watching_at_jst,
    lag(end_watching_at_jst) over (partition by lead_id order by end_watching_at_jst) as previous_end_watching_at_jst,
    coalesce(
      start_watching_at_jst > lag(end_watching_at_jst) over (partition by lead_id order by end_watching_at_jst)
      or
      end_watching_at_jst < lag(start_watching_at_jst) over (partition by lead_id order by end_watching_at_jst),
      true) as is_not_overlap  -- 前のログの視聴時間と今のログの視聴時間に重なりがあるかを判定、Nullの場合はtrue
  from
    start_end
  order by  -- 同上
    lead_id,
    end_watching_at_jst
),

重複状況に応じてグルーピングする

重複の有無に応じて、グループNoを振ります。Window関数ではデフォルトでrows between unbounded preceding and current rowになるので、下記の処理をするとログデータの取得時間で並べたときのis_not_overlat is trueの最初から現在の順番までの数をカウントすることになります。

watch_group as (
  select
    *,
    count(case when is_not_overlap is true then 1 end) over (partition by lead_id order by end_watching_at_jst) as watch_group_no  -- 重複時間があるものをグループ化するための番号を振る
  from
    overlap_check
  order by
    lead_id,
    end_watching_at_jst
),

下記のデータは全て同じlead_idなのですが、確かに途中の離脱部分でグループを分けることができています。

image.png

個々のグループで視聴時間を算出する

個々のグループで開始時間の最小値と終了時間の最大値を取ることで、グループ単位で視聴ログを集約することができます。

merge_log as (
  select
    lead_id,
    watch_group_no,
    min(start_watching_at_jst) as start_watching_at_jst,
    max(end_watching_at_jst) as end_watching_at_jst,
  from
    watch_group
  group by
    lead_id,
    watch_group_no
  order by
    lead_id,
    watch_group_no
)

セッションのマスタと紐づける

最後に、視聴時間とセッションの時間を比較することで、個別のセッションを視聴していたかどうかを判別します。

select
  * except(session_id),
  greatest(start_watching_at_jst, session_start_at_jst) as session_start_watching_at_jst,
  least(end_watching_at_jst, session_end_at_jst) as session_end_watching_at_jst,
  datetime_diff(least(end_watching_at_jst, session_end_at_jst), greatest(start_watching_at_jst, session_start_at_jst), second) / 60 as session_watch_minutes,
  datetime_diff(least(end_watching_at_jst, session_end_at_jst), greatest(start_watching_at_jst, session_start_at_jst), second) / 60
   / session_minutes as session_watch_rate
from
  merge_log l
inner join
  `project-id.dataset_id.session_master` s
  on  -- セッション時間内に視聴時間が含まれている
    l.start_watching_at_jst between s.session_start_at_jst and s.session_end_at_jst
    or l.end_watching_at_jst between s.session_start_at_jst and s.session_end_at_jst
    or (l.start_watching_at_jst <= s.session_start_at_jst and s.session_end_at_jst <= l.end_watching_at_jst)
order by
  lead_id,
  watch_group_no,
  session_start_at_jst

③視聴ログを1分ごとに切り分ける

さらに、先ほどのセッション別の視聴データをもとにして、セッション内での細かい視聴ログを算出します。セッションの視聴開始時間と視聴終了時間を1分ごとに分割することで、1分ごとの視聴有無を判別します。

create table `project-id.marketo.activity_event_watch_logs_per_minute`
as
select
  *,
  datetime(session_watch_minute) as session_watch_datetime_jst,
  row_number() over (partition by lead_id, watch_group_no order by session_start_at_jst, session_watch_minute) = 1 as is_start_watching_minute,
  row_number() over (partition by lead_id, watch_group_no order by session_start_at_jst desc, session_watch_minute desc) = 1 as is_end_watching_minute,
from
  `project-id.marketo.activity_watch_logs_per_session`
left join
  -- generate_datetime_arrayがないので・・・
  unnest(generate_timestamp_array(timestamp_trunc(timestamp(session_start_watching_at_jst), minute), timestamp(session_end_watching_at_jst), interval 1 minute)) session_watch_minute
order by
  lead_id,
  watch_group_no,
  session_start_at_jst,
  session_watch_datetime_jst

このデータで1分ごとにcount(distinct lead_id)とすることで、1分ごとの視聴者数を算出できるようになります。下記はとあるセッションの1分ごとの視聴者数を算出したものですが・・・あれ、なんか周期性がある・・・??

image.png

癖の強さを検証する

なんとなく怪しい予感がしたところで、改めてデータの仕様を検証してみましょう。

Activityの分数を見てみる

下記はとある時間帯のActivityの分数に着目したものです。分数を5で割った余りが、大半が0、一部が4、ほんの少しだけ1になっています。そんなことあるかい。

image.png

Activityの秒数を見てみる

下記は全体の時間でのActivityに1分足した分数を5で割ったときに、0を基準としたときの秒数を算出したものです。なんだか正規分布みたいですね(笑)

image.png

Total watch minutesの分数を見てみる

下記はActivityのattributesにあったTotal watch minutesの分数を3で割った余りです。全て0ということは、3の倍数しか入っていないですね!?!?

image.png

視聴ログの技術仕様とその影響を推測する

ここまでの内容を踏まえると、視聴ログの技術仕様とその影響が推測されてきます。以下は動画配信に全く詳しくない私の推測でしかないので、怪しそうなところがあればご指摘くださいませ。

技術仕様を推測する

image.png

動画配信では、配信サーバーに対してリクエストが発生してから配信が始まり、視聴中は少し先の分までバッファリングしながら視聴を進めていくものと考えられます。そのため、例えば11:00まで視聴している段階で、11:00+αまでの視聴データがクライアント端末に存在すると考えられます。

配信ログの連携に関しては、EventHubの配信サーバーを基準にして、5分ごとにログがMarketoのActivityに連携されているようです。EventHub→Marketoの連携にあたっては、ネットワーク経由で最低限必要な時間(不明)に加えて、平均して1分程度の遅延が生じていると想定されます。

総視聴時間については、視聴開始のリクエストからEventHubサーバーでの配信ログ連携時刻におけるバッファリングのデータの送付有無を基準にして、3分単位で切り捨てられた数値が算出されているのではと推測します。なぜそうなのかというと全く謎なのですが。

影響を推測する

視聴終了時間に対する影響としては、EventHub→Marketoでのネットワークの影響により、平均して1分+αだけ遅延して連携されています。その分だけ後ろ倒しになっている可能性があります。

加えて、「EventHubサーバーでの配信ログ連携時刻におけるバッファリングのデータの送付有無を基準にして」が正しいとすると、視聴終了というイベントベースでのデータ連携がないことには注意が必要です。配信ログの連携時刻の直前に離脱したとして、バッファリングの時間が仮に10秒だとすると、(5分-10秒)-(1分+α)だけ視聴時間を計測できていない時間がある可能性があります。

視聴開始時間に対する影響としては、算出の基準となっている視聴終了時刻に対する影響に加えて、「3分単位で切り捨てられた数値が算出されている」影響があります。3×N分+2分59秒であった場合、3×N分で算出されており、計算した視聴時間がかなり短く算出されている可能性があります。

両者を総合すると、終了時刻に由来する(5分-10秒)-(1分+α)の影響に加えて、最終連携時刻の際の2分59秒を加えると、最大で7分程度の計測漏れがある可能性があります。

もちろん、これらは全体の視聴開始と視聴終了時間で生じることなので、継続視聴時間に含まれているセッションについては影響しないものにはなりますが。

やってもらえると嬉しいこと

視聴ログの計測という観点では、視聴開始と視聴終了のログが取れるのが一番データ量の少ない計測方法になります。この形であればデータ処理の楽なので非常に助かりますが、視聴終了のログが適切に取れるかという点では難しいことが容易に想像できるので、それは致し方ないでしょうか。

今回のような仕様を考えると、ログが算出された時間と、視聴時間が正しく取れれば視聴時間をより適切に取得できるようになります。そういう意味では、連携されるデータのなかにログを生成した時間と、秒単位の視聴時間を入れてもらえると、計算したときの視聴時間の誤差が非常に小さくなるので、この形にしてもらえると嬉しく思います。

さいごに

ここまで、癖が強すぎるMarketoに連携されたEventHub視聴ログについて解説してきました。厳密に扱うには少し問題がありますが、とはいえ傾向を把握する上では許容できるレベルの問題ともいえるでしょう。技術仕様や影響については推測なのでどこまで正しいかわかりませんが、データ処理の方法については確かだと思われるので、同じような処理をされる方はぜひご活用ください。

7
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
3