import {
  ensureTopicPartitionIsLoadedWithTopicDetails,
  fillPartitionedSubTopics,
  peekNthMessage,
  topicFullNameToDetail,
  fetchTopicStats,
  PARTITION_SEPARATOR,
  type TopicActionParameters,
  type TopicDetails,
  type TopicMap
} from '@/api/topics'
import {
  getAllTopics,
  createTopic as apiCreateTopic,
  updateTopic as apiUpdateTopic,
  deleteTopic as apiDeleteTopic,
  createSubscription as apiCreateSubscription,
  deleteSubscription as apiDeleteSubscription,
  resetCursor,
  skipAllMessages,
  getTopicBundle,
  getTopicBroker as apiGetTopicBroker,
  fetchAllSchemas,
  deleteSchema,
  postSchema,
  getInternalStats,
  unloadTopic as apiUnloadTopic,
  compactionStatus as apiCompactionStatus,
  compact,
  getPermissionsOnTopic,
  grantPermissionsOnTopic,
  revokePermissionsOnTopic
} from '@/api/topics'
import { i18n } from '@/lang'
import {
  LongRunningProcessStatusStatusEnum,
  type GetSchemaResponse,
  type GetSchemaResponseTypeEnum,
  type InlineResponse200,
  type NonPersistentTopicStats,
  type PartitionedTopicStatsImpl,
  type PersistentTopicInternalStats,
  type PartitionedTopicInternalStats,
  type TopicStats,
  type Policies
} from '@streamnative/pulsar-admin-client-typescript'
import type { Cluster } from './useCluster'
import type { PulsarState } from './usePulsarState'
import axios from 'axios'
import { useRbac } from './useRbac'
import { useTenantNamespace } from './useTenantNamespace'

type TopicBroker = InlineResponse200 & { brokerUrlTls?: string }

export type Message = {
  properties: string
  messageId: string
  message: string
  ledgerId: string
  entryId: string
  data: string
}

let lastSeen: string | undefined = undefined
const { t } = i18n.global
const topicMap = ref<TopicMap>({})
const topicStats = ref<PartitionedTopicStatsImpl | TopicStats | NonPersistentTopicStats>({})
const bundleRange = ref('')
const topicBroker = ref<TopicBroker>({
  brokerUrl: '',
  httpUrl: '',
  nativeUrl: '',
  brokerUrlSsl: '',
  brokerUrlTls: ''
})
const schemas = ref<Array<GetSchemaResponse>>([])
const topicInternalStats = ref<PersistentTopicInternalStats>({})
const partitionedTopicnternalStats = ref<PartitionedTopicInternalStats>({})
const compactionStatus = ref<LongRunningProcessStatusStatusEnum>(
  LongRunningProcessStatusStatusEnum.NOTRUN
)
const authorizedRoles = ref<Record<string, string[]>>({})
const messages = ref<Message[]>([])
const errorGettingTopics = ref<{ message: string; e: unknown } | undefined>(undefined)
const isTopicLoading = ref(true)

const resetTopicState = () => {
  topicMap.value = {}
  topicStats.value = {}
  bundleRange.value = ''
  topicBroker.value = {
    brokerUrl: '',
    httpUrl: '',
    nativeUrl: '',
    brokerUrlSsl: '',
    brokerUrlTls: ''
  }
  schemas.value = []
  topicInternalStats.value = {}
  partitionedTopicnternalStats.value = {}
  compactionStatus.value = LongRunningProcessStatusStatusEnum.NOTRUN
  authorizedRoles.value = {}
  messages.value = []
}

interface TenantNamespaceTopics {
  [tenant: string]: { [namespace: string]: { [topic: string]: Policies } }
}

/**
 * Given a list of topic names, fetch partition count if their partition count is undefined.
 * This will allow us lazily fetch partition counts if we don't know.
 * @param topicNames
 */
const ensureTopicPartitionIsLoaded = async (topicNames: string[]) => {
  const _topicMap = await ensureTopicPartitionIsLoadedWithTopicDetails(
    topicNames.map(topicName => topicMap.value[topicName])
  )
  Object.assign(topicMap.value, _topicMap)
}

const ensureTopicMetricsIsLoaded = async (topicNames: string[]) => {
  await Promise.all(
    topicNames.map(async tn => {
      const params = apiPerTopicParamsWithTopicMap(tn, topicMap.value)
      try {
        const metrics = (await fetchTopicStats(params)).data
        if (topicMap.value[tn]) {
          topicMap.value[tn].metrics = {
            storageSize: metrics.storageSize || 0,
            backlogSize: metrics.backlogSize || 0
          }
        }
      } catch (error) {
        console.error(error)
        if (topicMap.value[tn]) {
          topicMap.value[tn].metrics = {
            storageSize: undefined,
            backlogSize: undefined
          }
        }
      }
    })
  )
}

const getTopics = async (params?: {
  organization?: string
  clusterUid?: string
  tenant?: string
  namespace?: string
}) => {
  isTopicLoading.value = true
  const { mustOrganization, mustClusterUid, mustTenant, mustNamespace } = usePulsarState()
  let topics = {}
  try {
    topics = await getAllTopics({
      organization: params?.organization || mustOrganization(),
      clusterUid: params?.clusterUid || mustClusterUid(),
      tenant: params?.tenant || mustTenant(),
      namespace: params?.namespace || mustNamespace()
    })
    errorGettingTopics.value = undefined
  } catch (e) {
    // eslint-disable-next-line @typescript-eslint/ban-ts-comment
    // @ts-ignore
    errorGettingTopics.value = {
      e,
      message: getErrorMessage(e, 'Unknown error')
    }
    throw e
  } finally {
    isTopicLoading.value = false
  }

  // for noPersPart, we can infer partition details from noPersNoPart if there are data for it.
  // if noPersNoPart is missing partition info, partition info will be laziy fetch via `ensureTopicPartitionIsLoaded`.
  // Object.values(noPersPart).forEach(detail => {
  //   if (detail.partitionId) {
  //     // increment partition count as persNoPart includes individual partitioned topic names
  //     const currentPartitionCount = noPersNoPart[detail.topicRootName].partitionCount
  //     noPersNoPart[detail.topicRootName].partitionCount = (currentPartitionCount ?? 0) + 1
  //   }
  // })
  topicMap.value = topics
}

const apiBaseParams = (
  params:
    | {
        organization?: string
        clusterUid?: string
        tenant?: string
        namespace?: string
      }
    | undefined
) => {
  const { mustOrganization, mustClusterUid, mustTenant, mustNamespace } = usePulsarState()
  return {
    organization: params?.organization || mustOrganization(),
    clusterUid: params?.clusterUid || mustClusterUid(),
    tenant: params?.tenant || mustTenant(),
    namespace: params?.namespace || mustNamespace()
  }
}

/**
 * This function will create params with the global topic map
 */
const apiPerTopicParams = (shortTopicName?: string) => {
  const { mustTopic } = usePulsarState()
  return apiPerTopicParamsWithTopicMap(shortTopicName ?? mustTopic(), topicMap.value)
}

/**
 * This function will create params with a specify topic map
 */
const apiPerTopicParamsWithTopicMap = (
  shortTopicName: string,
  topicMap: TopicMap
): TopicActionParameters => {
  const { mustOrganization, mustClusterUid, mustTenant, mustNamespace } = usePulsarState()
  const topicDetail = topicMap[shortTopicName]
  if (!topicDetail) {
    throw Error(`${shortTopicName} cannot be found`)
  }
  return {
    topicName: topicDetail.topicName,
    partitioned: topicDetail.partitioned,
    organization: topicDetail.organization ?? mustOrganization(),
    clusterUid: topicDetail.clusterUid ?? mustClusterUid(),
    tenant: topicDetail.tenant ?? mustTenant(),
    namespace: topicDetail.namespace ?? mustNamespace(),
    persistency: topicDetail.persistency
  }
}

const createTopic = async (topicDetail: TopicDetails) => {
  if (!topicDetail.topicName) {
    throw Error(t('topic.topicPlaceholder'))
  }
  if (topicDetail.partitionCount === undefined || topicDetail.partitionCount < 0) {
    throw Error(t('topic.partitionPlaceholder'))
  }
  const params = apiBaseParams({ tenant: topicDetail.tenant, namespace: topicDetail.namespace })
  try {
    await apiCreateTopic({
      ...params,
      persistency: topicDetail.persistency,
      partitioned: (topicDetail.partitionCount as number) > 0,
      topicName: topicDetail.topicName,
      partitionCount: topicDetail.partitionCount as number
    })
    // add newly created topic to the topic map
    topicMap.value[topicDetail.topicName] = topicDetail
    fillPartitionedSubTopics(topicDetail, topicMap.value)
  } catch (e) {
    throw Error(getErrorMessage(e, t('topic.notification.createTopicFailed')))
  }
}

const updateTopic = async (topicDetail: TopicDetails) => {
  if (!topicDetail.topicName) {
    throw Error(t('topic.topicPlaceholder'))
  }
  if (topicDetail.partitionCount === undefined || topicDetail.partitionCount < 0) {
    throw Error(t('topic.partitionPlaceholder'))
  }
  const params = apiBaseParams({ tenant: topicDetail.tenant, namespace: topicDetail.namespace })
  try {
    await apiUpdateTopic({
      ...params,
      persistency: topicDetail.persistency,
      partitioned: (topicDetail.partitionCount as number) > 0,
      topicName: topicDetail.topicName,
      partitionCount: topicDetail.partitionCount as number
    })
    // add newly created topic to the topic map
    topicMap.value[topicDetail.topicName] = topicDetail
    fillPartitionedSubTopics(topicDetail, topicMap.value)
  } catch (e) {
    throw Error(getErrorMessage(e, t('topic.notification.updateTopicFailed')))
  }
}

const deleteTopic = async (shortTopicName: string) => {
  await ensureTopicPartitionIsLoaded([shortTopicName])
  const params = apiPerTopicParams(shortTopicName)
  await apiDeleteTopic(params)
  delete topicMap.value[params.topicName]
}

/**
 * This function will not impact the global topic stats, it just return the topic stats data from the response
 */
const getTopicStats = async (shortTopicName: string) => {
  Object.assign(
    topicMap,
    await ensureTopicPartitionIsLoadedWithTopicDetails([topicMap.value[shortTopicName]])
  )
  const params = apiPerTopicParamsWithTopicMap(shortTopicName, topicMap.value)
  try {
    const stats = (await fetchTopicStats(params)).data
    topicStats.value = stats
    if (topicMap.value[shortTopicName]) {
      topicMap.value[shortTopicName].metrics = stats
    }
  } catch (e) {
    throw Error(
      getErrorMessage(
        e,
        t(
          `topic.notification.${
            params.partitioned ? 'getPartitionedTopicStats' : 'getTopicsStatsFailed'
          }`
        )
      )
    )
  }
}

const createSubscription = async (shortTopicName: string, subscriptionName: string) => {
  if (!subscriptionName) {
    throw Error(t('topic.subscription.subNotification'))
  }
  Object.assign(
    topicMap,
    await ensureTopicPartitionIsLoadedWithTopicDetails([topicMap.value[shortTopicName]])
  )
  const params = apiPerTopicParamsWithTopicMap(shortTopicName, topicMap.value)

  try {
    await apiCreateSubscription({
      ...params,
      subscriptionName
    })
  } catch (e) {
    throw Error(getErrorMessage(e, t('topic.subscription.failedToCreate')))
  }

  await getTopicStats(shortTopicName)
}

const deleteSubscription = async (shortTopicName: string, subscriptionName: string) => {
  await apiDeleteSubscription({
    ...apiPerTopicParams(shortTopicName),
    subscriptionName
  })
  const topicSubs = topicStats.value.subscriptions ?? {}
  delete topicSubs[subscriptionName]
  topicStats.value = { ...topicStats.value, subscriptions: topicSubs }
}

const resetAllSubMessage = async (
  shortTopicName: string,
  subscriptionName: string,
  minutes: number
) => {
  const timestamp = Math.floor(new Date().getTime()) - minutes * 60000
  try {
    await resetCursor({ ...apiPerTopicParams(shortTopicName), subscriptionName, timestamp })
    await getTopicStats(shortTopicName)
  } catch (e) {
    throw Error(getErrorMessage(e, t('topic.subscription.failedToReset')))
  }
}

const clearAllSubMessage = async (shortTopicName: string, subscriptionName: string) => {
  try {
    await skipAllMessages({ ...apiPerTopicParams(shortTopicName), subscriptionName })
  } catch (e) {
    throw Error(getErrorMessage(e))
  }
}

const getBundleRange = async (shortTopicName: string) => {
  try {
    const _bundleRange = await getTopicBundle(apiPerTopicParams(shortTopicName))
    bundleRange.value = _bundleRange.data
  } catch (e) {
    throw Error(getErrorMessage(e, 'getBundleRange Error'))
  }
}

const getTopicBroker = async (shortTopicName: string) => {
  try {
    const _topicBroker = await apiGetTopicBroker(apiPerTopicParams(shortTopicName))
    topicBroker.value = _topicBroker.data
  } catch (e) {
    throw Error(getErrorMessage(e, 'getAllSchemas Error'))
  }
}

const getAllSchemas = async (shortTopicName: string) => {
  try {
    const _schemas = await fetchAllSchemas(apiPerTopicParams(shortTopicName))
    schemas.value = _schemas.data.getSchemaResponses ?? []
  } catch (e) {
    throw Error(getErrorMessage(e, 'getAllSchemas Error'))
  }
}

const deleteAllSchemas = async (shortTopicName: string) => {
  try {
    await deleteSchema(apiPerTopicParams(shortTopicName))
    schemas.value = []
  } catch (e) {
    throw Error(getErrorMessage(e, t('schema.deleteSchemaFailed')))
  }
}

const createSchema = async (
  shortTopicName: string,
  type: GetSchemaResponseTypeEnum,
  data: string,
  propertyEntries: [string, string][]
) => {
  const keys = new Set()
  propertyEntries.forEach(([key, value]) => {
    if (!key || !value) {
      throw Error('A key and a value must be defined for all properties')
    }
    keys.add(key)
  })

  if (keys.size !== propertyEntries.length) {
    throw Error(t('schema.keyIsExistNotification'))
  }

  const properties = Object.fromEntries(propertyEntries)
  const res = await postSchema({
    ...apiPerTopicParams(shortTopicName),
    body: {
      type,
      schema: data,
      properties
    }
  })

  const newSchema: GetSchemaResponse = {
    // eslint-disable-next-line @typescript-eslint/ban-ts-comment
    // @ts-ignore
    version: res.data?.version?.version,
    type,
    data,
    properties
  }
  const newSchemas = schemas.value
  newSchemas.push(newSchema)
  schemas.value = newSchemas
}

const getTopicStatsInternal = async (shortTopicName: string) => {
  const params = apiPerTopicParams(shortTopicName)
  const { data } = await getInternalStats(params)
  if (params.partitioned) {
    partitionedTopicnternalStats.value = data as PartitionedTopicInternalStats
    topicInternalStats.value = {}
  } else {
    partitionedTopicnternalStats.value = {}
    topicInternalStats.value = data as PersistentTopicInternalStats
  }
}

const unloadTopic = async (shortTopicName: string) => {
  await apiUnloadTopic(apiPerTopicParams(shortTopicName))
}

const getCompactionStatus = async (shortTopicName: string) => {
  try {
    const { data } = await apiCompactionStatus(apiPerTopicParams(shortTopicName))
    compactionStatus.value = data.status ?? LongRunningProcessStatusStatusEnum.NOTRUN
  } catch (e) {
    throw Error(getErrorMessage(e, t('topic.notification.compactionFailed')))
  }
}

const compactTopic = async (shortTopicName: string) => {
  await compact(apiPerTopicParams(shortTopicName))
  await getCompactionStatus(shortTopicName)
}

const getPermissions = async (shortTopicName: string) => {
  try {
    const permissions = await getPermissionsOnTopic(apiPerTopicParams(shortTopicName))
    authorizedRoles.value = permissions.data
  } catch (e) {
    throw Error(getErrorMessage(e, 'getPermissions error'))
  }
}

const grantPermissions = async ({ role, permissions }: { role: string; permissions: string[] }) => {
  await grantPermissionsOnTopic({ ...apiPerTopicParams(), role, permissions })
  authorizedRoles.value = { ...authorizedRoles.value, [role]: permissions }
}

const revokePermissions = async ({ role }: { role: string }) => {
  await revokePermissionsOnTopic({ ...apiPerTopicParams(), role })

  const newRoles = { ...authorizedRoles.value }
  delete newRoles[role]
  authorizedRoles.value = newRoles
}
const viewMessages = async (shortTopicName: string, subscriptionName: string, count: number) => {
  if (count <= 0) throw t('topic.subscription.messageGreaterThanZero')
  if (count > 100) throw t('schema.maxMessageNotification')
  if (!subscriptionName) throw t('topic.selectSubMessage')

  // Below code is extraction of logics at pulsar admin client
  // https://github.com/streamnative/pulsar/blob/master/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java#L880-L913
  messages.value = []
  try {
    const params = apiPerTopicParams()
    params.topicName = shortTopicName || params.topicName
    for (let i = 1; i <= count; i++) {
      const res = await peekNthMessage({
        ...params,
        subscriptionName,
        n: i
      })

      const properties: Record<string, string | string[]> = {}
      Object.entries(res.headers).forEach(([key, value]) => {
        if (key.startsWith('x-pulsar-property-')) {
          properties[key.slice('x-pulsar-property-'.length)] = value
        } else if (key === 'x-pulsar-batch-size' || key === 'x-pulsar-num-batch-message') {
          properties[key] = value
        }
      })
      const messageId = res.headers['x-pulsar-message-id']
      const tokenizedMessageId = messageId.split(':')
      const payload = new Uint8Array(res.data as unknown as ArrayBuffer)
      // returned binary is java serialized object.
      // there is no good library at npm to deserialize java object without defining java object schema.
      // so simply find the the separator for the payload and extract value is the simplest way.
      const data = payload.slice(payload.indexOf(2) + 1)
      const dataString = new TextDecoder().decode(data)

      messages.value = [
        ...messages.value,
        {
          properties: JSON.stringify(properties, null, 2),
          messageId: res.headers['x-pulsar-message-id'],
          message: dataString,
          ledgerId: tokenizedMessageId[0],
          entryId: tokenizedMessageId[1],
          data: dataString
        }
      ]
    }
  } catch (e) {
    // peekNthMessage() will respond with 404 when n is bigger than message it has.
    // in such case, simply catch and ignore as no more messages to read.
    if (!axios.isAxiosError(e) || e.response?.status !== 404) {
      throw e
    }
  }
}

export const init = (initialState: PulsarState) => {
  const { organization, tenant, namespace } = usePulsarState()
  const { activeCluster, isActiveClusterAvailable } = useCluster()
  const { abilityUpdating } = useRbac()
  const valueChanged = async ([org, clus, ten, ns, ab]: [
    string | undefined,
    Cluster | undefined,
    string | undefined,
    string | undefined,
    boolean | undefined
  ]) => {
    if (ab) {
      return
    }

    const isClusterAvailable = isActiveClusterAvailable.value
    const clusterUid = clus?.metadata?.uid ?? undefined
    if (!org || !clusterUid || !isClusterAvailable || !ten || !ns) {
      resetTopicState()
      lastSeen = undefined
      return
    }
    const seen = `${org}/${clusterUid}/${ten}/${ns}`
    if (lastSeen !== seen || !ab) {
      await getTopics({
        organization: org,
        clusterUid: clusterUid,
        tenant: ten,
        namespace: ns
      })
    }

    lastSeen = seen
  }

  watch([organization, activeCluster, tenant, namespace, abilityUpdating], valueChanged)
  return valueChanged([
    initialState.organization,
    activeCluster.value,
    initialState.tenant,
    initialState.namespace,
    abilityUpdating.value
  ])
}

const topicCreatedSuccess = (topicDetail: TopicDetails) => {
  topicMap.value[topicDetail.topicName] = topicDetail
  fillPartitionedSubTopics(topicDetail, topicMap.value)
}

export const systemTopicNames = [
  '__change_events',
  '__transaction_buffer_snapshot',
  '__transaction_buffer_snapshot_segments',
  '__transaction_buffer_snapshot_indexes',
  '__transaction_pending_ack',
  '__pending_ack_state',
  '__transaction_log_',
  '_confluent-ksql-',
  '__kafka_connect_offset_storage'
]

const topicRootDetails = computed(() => {
  // only contains root topics.  i.e. [c1, c2]
  return Object.values(topicMap.value)
    .filter(detail => detail.partitionId === undefined)
    .sort((a, b) => a.topicName.localeCompare(b.topicName))
})

const allTopicsWithoutSystemTopics = computed(() => {
  return topicRootDetails.value.filter(details => !systemTopicNames.includes(details.topicName))
})

const topicStatsPartitions = computed(() => {
  const partitions = ((topicStats.value as PartitionedTopicStatsImpl)?.partitions ?? {}) as Record<
    string,
    PartitionedTopicStatsImpl
  >

  return Object.keys(partitions)
    .map(partitionKey => {
      const partition = partitions[partitionKey]
      const name = partitionKey.split('/')[4]
      const partitionId = name.split(PARTITION_SEPARATOR)[1]

      return {
        ...partition,
        name,
        partitionId,
        producerCounts: partition.publishers?.length ?? 0,
        subscriptionsCounts: Object.keys(partition?.subscriptions ?? {}).length
      }
    })
    .sort((a, b) => a.name.localeCompare(b.name))
})

export const useTopic = () => {
  return {
    topicMap,
    topicDetails: computed(() => {
      // contains all topics: i.e. [c1, c2, c2-partition-0, c2-partition-1]
      return Object.values(topicMap.value).sort((a, b) => a.topicName.localeCompare(b.topicName))
    }),
    topicRootDetails,
    allTopicsWithoutSystemTopics,
    // selectedTopicDetail,
    systemTopicNames,
    topicStats,
    bundleRange,
    topicBroker,
    schemas,
    topicInternalStats,
    partitionedTopicnternalStats,
    compactionStatus,
    authorizedRoles,
    messages,
    isTopicLoading,
    getTopics,
    createTopic,
    updateTopic,
    deleteTopic,
    getTopicStats,
    createSubscription,
    deleteSubscription,
    resetAllSubMessage,
    clearAllSubMessage,
    getBundleRange,
    getTopicBroker,
    getAllSchemas,
    deleteAllSchemas,
    createSchema,
    getTopicStatsInternal,
    unloadTopic,
    compactTopic,
    getPermissions,
    grantPermissions,
    revokePermissions,
    viewMessages,
    resetTopicState,
    getCompactionStatus,
    ensureTopicPartitionIsLoaded,
    topicFullNameToDetail,
    ensureTopicMetricsIsLoaded,
    init,
    errorGettingTopics,
    topicCreatedSuccess,
    isSystemTopic: (tenant?: string, namespace?: string, topicName?: string) => {
      const { isSystemTenantNamespace } = useTenantNamespace()
      return (
        (tenant && namespace && isSystemTenantNamespace(tenant, namespace)) ||
        (topicName && systemTopicNames.includes(topicName))
      )
    },
    topicStatsPartitions
  }
}
